同步PostgreSQL数据到mysql
大约 2 分钟
由于之前的分析系统和大屏的展示数据是基于MySQL进行开发和设置的,不支持其他的数据库,就导致最新的Zabbix数据无法进入到系统中,所以需要定时刷一下数据到MySQL中。
这里主要用 pymysql 和threading。
- pymysql 用来做CRUD
- threading 用来处理并发
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import requests
import logging
import pymysql
from threading import Thread
#
DBHOST = "192.168.1.32"
DBUSRE = "root"
DBPASSWORD = "xxxxxx"
DBPORT = 3306
PROMETHEUS = "http://192.168.3.130:9090/"
# db: jkprometheus
# table: alert_history_data
logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.INFO)
class DBConnect(object):
def __init__(self, host, user, password, port):
self.host = host
self.user = user
self.password = password
self.port = port
def db_init(self):
create_db = "CREATE DATABASE " \
"IF NOT EXISTS jkprometheus " \
"DEFAULT CHARSET utf8 " \
"COLLATE utf8_general_ci"
conn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port)
cursor = conn.cursor()
cursor.execute(create_db)
conn.close()
def table_init(self):
use_db = "USE jkprometheus"
alert_history_data = "CREATE TABLE " \
"IF NOT EXISTS alert_history_data" \
"( " \
"id int primary key auto_increment, " \
"project varchar(30) not null default '河南县域', " \
"instance varchar(30), " \
"job varchar(100), " \
"created datetime not null default CURRENT_TIMESTAMP, " \
"description varchar(2222)," \
"summary varchar(2222)," \
"alert_state varchar(2222)," \
"severity varchar(11)," \
"active_at varchar(100)" \
")"
conn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port)
cursor = conn.cursor()
cursor.execute(use_db)
cursor.execute(alert_history_data)
conn.close()
def run_sql(self, sql):
use_db = "USE jkprometheus"
conn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port)
cursor = conn.cursor()
cursor.execute(use_db)
try:
result = cursor.execute(sql)
conn.commit()
return True, result
except Exception as e:
logging.error(repr(e))
return True, repr(e)
conn.close()
class JkPrometheus(object):
def __init__(self, username=None, password=None, server=None, verify=False):
self.username = username
self.password = password
self.server = server
self._login = False
self._token = None
self._header = None
@staticmethod
def set_header(self, k, v):
self._header[k] = v
def login(self):
self.set_header('Content-Type', 'application/json')
return True
def _is_login(self):
# get_token_url = '{}/oauth/token'.format(self._server)
self._login = True if self.login() else False
def alert_data(self):
url = "{}/api/v1/alerts".format(self.server)
response = requests.get(url).json()
# logging.info(response)
if response['data']['alerts']:
alerts = response['data']['alerts']
return True, alerts
else:
return False, None
def alert_history_insert(self, instance, job, description, summary,active_at, alert_state, severity):
sql = 'select id from alert_history_data where active_at=\"{}\" and alert_state=\"{}\"'.format(active_at,alert_state)
success, data = DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).run_sql(sql=sql)
if not data:
sql = 'INSERT INTO alert_history_data(instance, job, description, summary,active_at,alert_state,severity) ' \
'VALUES (\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\"' \
');'.format(
instance,
job,
description,
summary,
active_at,
alert_state,
severity
)
else:
sql = 'UPDATE alert_history_data SET severity=\"{}\" WHERE active_at=\"{}\" and alert_state=\"{}\"'.format(severity,active_at,alert_state)
DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).run_sql(sql=sql)
if __name__ == '__main__':
logging.info('alert_data - start')
threads = list()
DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).db_init()
DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).table_init()
prom = JkPrometheus(server=PROMETHEUS)
success, alerts = prom.alert_data()
def prom_to_mysql(instance, job, description, summary, active_at, alert_state, severity):
prom.alert_history_insert(
instance=instance,
job=job,
description=description,
summary=summary,
active_at=active_at,
alert_state=alert_state,
severity=severity
)
for alert in alerts:
instance = alert.get('labels').get('instance')
severity = alert.get('labels').get('severity')
job = alert.get('labels').get('job')
description = alert.get('annotations').get('description')
summary = alert.get('annotations').get('summary')
active_at = alert.get('activeAt')
alert_state = alert.get('state')
threads.append(Thread(target=prom_to_mysql, args=(instance, job, description, summary,active_at, alert_state,severity )))
threads[-1].start()
for thread in threads:
thread.join()
logging.info('alert_data - done')