dingbot
Contents
钉钉机器人
一、shell脚本
1、基于关键字或者 ip 版
发送txt格式消息
info='hello'
logFile='/var/log/dingbot.log'
#发送消息
sendMsg() {
token='1e18ffe069052b56f5a0f8fe9b6c058373e7df7ef4xxxxxxxxxxxxxxx'
result=$(curl -s "https://oapi.dingtalk.com/robot/send?access_token=$token" \
-H 'Content-Type: application/json' \
-d "{'msgtype': 'text','text': {'content': 'msg:\n$*'}}")
[ $(echo $result | grep "errmsg.*ok") ] && echo 'send succees!'
echo "$(date +'%Y-%m-%d %H:%M.%S') state: $result msg: $*" >>$logFile
}
sendMsg $info
发送 markdown 格式消息
logFile='/var/log/DingBot.log'
token='1e18ffe069052b56f5a0f8fe9b6c058373xxxxxxxxxxxxxx'
#发送消息
sendMsg() {
local info=$*
result=$(curl -s "https://oapi.dingtalk.com/robot/send?access_token=$token" \
-H 'Content-Type: application/json' \
-d "{'msgtype': 'text',
'text': {
'content': '$info'
}
}")
[ $(echo $result | grep "errmsg.*ok") ] && echo 'send succees!'
echo "$(date +'%Y-%m-%d %H:%M.%S') state: $result MessagesType: text [ text: $* ]" >>$logFile
}
SendMsgByMD() {
local info=$1 # $info markdown的标题
local infoMsg=$2 # $infoMsg 内容
# token='1e18ffe069052b56f5a0f8fe9b6c058373e7df7xxxxxxxxxxxxxx'
result=$(curl -s "https://oapi.dingtalk.com/robot/send?access_token=$token" \
-H 'Content-Type: application/json' \
-d "{
'msgtype': 'markdown',
'markdown': {
'title':'$info',
'text': '$infoMsg'
},
'at': {
'atMobiles': [
'156xxxx8827',
'189xxxx8325'
],
'isAtAll': true
}
}")
[ $(echo $result | grep "errmsg.*ok") ] && echo 'send succees!'
echo "$(date +'%Y-%m-%d %H:%M.%S') state: $result MessagesType: markdown [ title: $info text: $infoMsg ]" >>$logFile
}
#main()
(sendMsg 'zabbix') &
(SendMsgByMD 'zabbix' '# send msg') &
2、签名计算版
计算签名是使用的date是linux版的,而mac上的date是unix版的,在mac上需要替换为linux版的date才能正常计算签名。
## 钉钉机器人配置
dingbot_secret='SECa87axxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
dingbot_url='https://oapi.dingtalk.com/robot/send?access_token=cd3xxxxxxxxxxxxx'
## secret_type keywords || sign
ding_secret_type='sign'
## 需要艾特的人的手机号码,以空格隔开
atMobiles=(13346123456 13346123457)
## encode url
function url_encode() {
t="${1}"
if [[ -n "${1}" && -n "${2}" ]];then
if ! echo 'xX' | grep -q "${t}";then
t='x'
fi
echo -n "${2}" | od -t d1 | awk -v a="${t}" '{for (i = 2; i <= NF; i++) {printf(($i>=48 && $i<=57) || ($i>=65 &&$i<=90) || ($i>=97 && $i<=122) ||$i==45 || $i==46 || $i==95 || $i==126 ?"%c" : "%%%02"a, $i)}}'
else
echo -e '$1 and $2 can not empty\n$1 ==> 'x' or 'X', x ==> lower, X ==> toupper.\n$2 ==> Strings need to url encode'
fi
}
## Dingbot
function dingbot(){
send_strs="${1}"
new_url="${dingbot_url}"
at_who=''
for i in ${atMobiles[*]}
do
if [ -n "${at_who}" ];then
at_who="${at_who},\"${i}\""
else
at_who="\"${i}\""
fi
done
if [ "${ding_secret_type}" == 'keywords' ];then
curl -s -X POST -H 'Content-Type: application/json' "${new_url}" \
-d "{\"at\":{\"atMobiles\":[${at_who}]},\"msgtype\":\"text\",\"text\":{\"content\":\"${send_strs}\"}}"
elif [ "${ding_secret_type}" == 'sign' ];then
timestamp=$(date "+%s%3N")
dingbot_sign=$(echo -ne "${timestamp}\n${dingbot_secret}" | openssl dgst -sha256 -hmac "${dingbot_secret}" -binary | base64)
dingbot_sign=$(url_encode 'X' "${dingbot_sign}")
post_url="${dingbot_url}×tamp=${timestamp}&sign=${dingbot_sign}"
curl -s -X POST -H 'Content-Type: application/json' "${post_url}" \
-d "{\"at\":{\"atMobiles\":[${at_who}]},\"msgtype\":\"text\",\"text\":{\"content\":\"${send_strs}\"}}"
else
echo "secret_type 未知,请检查配置"
fi
}
dingbot "hello"
二、python版
1、基于关键字或者 ip
#!/usr/bin/python3
# encoding: utf-8
import requests
import json
def dingmsg():
#钉钉机器人的url
ding_url = 'https://oapi.dingtalk.com/robot/send?access_token='
#钉钉机器人的token
token = 'ba9b7c169caebb1048a66845fa8344348b3e8fdaefxxxxxxxxxxx'
#请求的url,webhook的地址
webhook = ding_url+token
#构建请求的头部
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
#构建请求数据
text = "this is first python to test dingding"
msg = {
"msgtype": "text",
"text": {
"content": text,
}
}
#对请求的数据进行封装
msg_json = json.dumps(msg)
#发起请求
info = requests.post(url=webhook,data=msg_json,headers=header)
#打印返回的结果
print(info.text)
if __name__ == "__main__":
dingmsg()
2、基于加签计算
#!/usr/bin/python3
# encoding: utf-8
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
class dingRobot:
def __init__(self):
self.dingUrl = 'https://oapi.dingtalk.com/robot/send?access_token='
self.__secret = 'SECc95942d1139feaefcdef6abf33e6497eb3d0120b4ca3dxxxxxxxxxx'
self.__token = 'ba9b7c169caebb1048a66845fa8344348b3e8fdaef264e6exxxxxxxxxxxdc'
self.__sign = ''
self.__timestamp = ''
def createTimestampSign(self):
self.__timestamp = str(round(time.time() * 1000))
secret_enc = self.__secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(self.__timestamp, self.__secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
self.__sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
def sendMsg(self,text):
self.createTimestampSign()
# 请求的url,webhook的地址
webhook = self.dingUrl + self.__token + '×tamp=' + self.__timestamp + '&sign=' + self.__sign
# 构建请求的头部
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
# 构建请求数据
# text = "this is first python to test dingding"
msg = {
"msgtype": "text",
"text": {
"content": text,
}
}
# 对请求的数据进行封装
msg_json = json.dumps(msg)
# 发起请求
info = requests.post(url=webhook, data=msg_json, headers=header)
return info
if __name__ == "__main__":
robot = dingRobot()
result = robot.sendMsg('只是用于测试')
if json.loads(result.text)['errcode'] == 0:
print("发送消息成功")
else:
print("发送消息失败")