架构图
通过定时任务采集解析MQ XML数据存储到MYSQL数据库中,当前MQ积累值超过100时,说明消费异常,通过企业微信报警,MQ.php可查询历史记录。
预览
告警页面
查询页面,显示最后1000行数据
系统组成
MQchecktouch.py 初始化数据库
MQcheck.py 监控主程序
MQ.php 历史记录查询程序
初始化数据库
首先手动创建库和用户,通过MQchecktouch.py初始化,生成表
import mysql.connector
mqdb = mysql.connector.connect(
host="127.0.0.1",
user="mquser",
passwd="mqpasswd",
database="mq"
)
mqcursor = mqdb.cursor()
mqcursor.execute("CREATE TABLE mqdata (id INT AUTO_INCREMENT PRIMARY KEY,time VARCHAR(255), name VARCHAR(255), number VARCHAR(255))")
mqcursor.execute("CREATE TABLE configkey (name VARCHAR(255), config VARCHAR(255))")
insert_sql = "INSERT INTO configkey (name, config) VALUES ('config','1')"
mqcursor.execute(insert_sql)
mqdb.commit()
mqcursor.close()
初始化后的数据库
监控主程序
通过定时任务运行主程序
import requests,time
import xml.etree.cElementTree as ET
import mysql.connector
def get_mqxml():
mqreq = requests.post(url='http://你的MQ地址:8161/admin/xml/queues.jsp', auth=('admin', 'admin'))
mqxml = ET.fromstring(mqreq.content.decode())
mqname_list = []
mqnumbers_list = []
for queue in mqxml.iter('queue'):
mqname_list.append(queue.get("name"))
mqnumbers_list.append(queue.find('stats').get("size"))
return mqname_list,mqnumbers_list
def post_weixin(num,mqnumber):
if num == 0:
data = "6啊,MQ恢复了,当前累计值:"
else:
data = "G了,MQ当前累计值:"
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的企业微信机器人'
body = {
"msgtype": "news",
"news": {
"articles": [
{
"title": "MQ监控机器人",
"description": data+str(mqnumber),
"url": "90apt.com",
"picurl": "你自己的图片"
}
]
}
}
headers = {"Content-Type": "application/json"}
response = requests.post(url, json=body)
print(response.text)
print(response.status_code)
def save_mysql():
total = 0
mqdb = mysql.connector.connect(
host="127.0.0.1",
user="mquser",
passwd="mqpasswd",
database="mq"
)
mqcursor = mqdb.cursor()
nowtime = time.asctime()
for ele in range(0, len(mqnumbers_list)):
sql = "INSERT INTO mqdata (time ,name, number) VALUES (%s, %s, %s)"
val = (nowtime, mqname_list[ele], mqnumbers_list[ele])
mqcursor.execute(sql, val)
total = total + int(mqnumbers_list[ele])
mqdb.commit()
print(total)
if total > 10:
getconfig_sql = "SELECT * FROM configkey WHERE name ='config'"
mqcursor.execute(getconfig_sql)
mqconfig = mqcursor.fetchall()[0][1]
if mqconfig == "0":
updateconfig_sql = "UPDATE configkey SET config = '1' WHERE name = 'config'"
mqcursor.execute(updateconfig_sql)
mqdb.commit()
mqcursor.close()
post_weixin(1,total)
else:
None
else:
getconfig_sql = "SELECT * FROM configkey WHERE name ='config'"
mqcursor.execute(getconfig_sql)
mqconfig = mqcursor.fetchall()[0][1]
if mqconfig == "1":
updateconfig_sql = "UPDATE configkey SET config = '0' WHERE name = 'config'"
mqcursor.execute(updateconfig_sql)
post_weixin(0, total)
mqdb.commit()
mqcursor.close()
else:
None
mqname_list = get_mqxml()[0]
mqnumbers_list = get_mqxml()[1]
save_mysql()
历史记录查询程序
<?php
$con=mysqli_connect("localhost","mquser","mqpasswd","mq");
// 检测连接
if (mysqli_connect_errno())
{
echo "连接失败: " . mysqli_connect_error();
}
$result = mysqli_query($con,"SELECT * FROM mqdata order by time desc limit 1,1000");
while($row = mysqli_fetch_array($result))
{
echo $row['time'] . " " . $row['name'] . " " . $row['number'];
echo "<br>";
}
$conn = null;
?>
评论