同步PostgreSQL数据到mysql

约 579 字大约 2 分钟

背景

由于之前的分析系统和大屏的展示数据是基于MySQL进行开发和设置的,不支持其他的数据库,就导致最新的Zabbix数据无法进入到系统中,所以需要定时刷一下数据到MySQL中。

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import requests
import logging
import pymysql
from threading import Thread

#
DBHOST = "192.168.1.32"
DBUSRE = "root"
DBPASSWORD = "xxxxxx"
DBPORT = 3306
PROMETHEUS = "http://192.168.3.130:9090/"

# db: jkprometheus
# table: alert_history_data

logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                    level=logging.INFO)

class DBConnect(object):

    def __init__(self, host, user, password, port):
        self.host = host
        self.user = user
        self.password = password
        self.port = port

    def db_init(self):

        create_db = "CREATE DATABASE " \
                    "IF NOT EXISTS jkprometheus  " \
                    "DEFAULT CHARSET utf8 " \
                    "COLLATE utf8_general_ci"

        conn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port)
        cursor = conn.cursor()
        cursor.execute(create_db)
        conn.close()

    def table_init(self):
        use_db = "USE jkprometheus"
        alert_history_data = "CREATE TABLE " \
                       "IF NOT EXISTS alert_history_data" \
                       "( " \
                       "id int primary key auto_increment, " \
                       "project varchar(30) not null default '河南县域', " \
                       "instance varchar(30), " \
                       "job varchar(100), " \
                       "created datetime not null default CURRENT_TIMESTAMP, " \
                       "description varchar(2222)," \
                       "summary varchar(2222)," \
                       "alert_state varchar(2222)," \
                       "severity varchar(11)," \
                       "active_at varchar(100)" \
                       ")"
        conn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port)
        cursor = conn.cursor()
        cursor.execute(use_db)
        cursor.execute(alert_history_data)
        conn.close()

    def run_sql(self, sql):
        use_db = "USE jkprometheus"
        conn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port)
        cursor = conn.cursor()
        cursor.execute(use_db)
        try:
            result = cursor.execute(sql)
            conn.commit()
            return True, result
        except Exception as e:
            logging.error(repr(e))
            return True, repr(e)
        conn.close()


class JkPrometheus(object):

    def __init__(self, username=None, password=None, server=None, verify=False):
        self.username = username
        self.password = password
        self.server = server

        self._login = False
        self._token = None
        self._header = None

    @staticmethod
    def set_header(self, k, v):
        self._header[k] = v

    def login(self):
        self.set_header('Content-Type', 'application/json')

        return True

    def _is_login(self):
        # get_token_url = '{}/oauth/token'.format(self._server)

        self._login = True if self.login() else False

    def alert_data(self):
        url = "{}/api/v1/alerts".format(self.server)
        response = requests.get(url).json()
        # logging.info(response)

        if response['data']['alerts']:
            alerts = response['data']['alerts']

            return True, alerts
        else:
            return False, None


    def alert_history_insert(self, instance, job, description, summary,active_at, alert_state, severity):


        sql = 'select id from alert_history_data where active_at=\"{}\" and alert_state=\"{}\"'.format(active_at,alert_state)

        success, data =  DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).run_sql(sql=sql)

        if not data:
            sql = 'INSERT INTO alert_history_data(instance, job, description, summary,active_at,alert_state,severity) ' \
                  'VALUES (\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\"' \
                  ');'.format(
                        instance,
                        job,
                        description,
                        summary,
                        active_at,
                        alert_state,
                        severity
                )
        else:
            sql = 'UPDATE alert_history_data SET  severity=\"{}\" WHERE active_at=\"{}\" and alert_state=\"{}\"'.format(severity,active_at,alert_state)

        DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).run_sql(sql=sql)

if __name__ == '__main__':

    logging.info('alert_data - start')

    threads = list()

    DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).db_init()
    DBConnect(user=DBUSRE, password=DBPASSWORD, host=DBHOST, port=DBPORT).table_init()

    prom = JkPrometheus(server=PROMETHEUS)
    success, alerts = prom.alert_data()


    def prom_to_mysql(instance, job, description, summary, active_at, alert_state, severity):
        prom.alert_history_insert(
            instance=instance,
            job=job,
            description=description,
            summary=summary,
            active_at=active_at,
            alert_state=alert_state,
            severity=severity
        )

    for alert in alerts:
        instance = alert.get('labels').get('instance')
        severity = alert.get('labels').get('severity')
        job = alert.get('labels').get('job')
        description = alert.get('annotations').get('description')
        summary = alert.get('annotations').get('summary')
        active_at = alert.get('activeAt')
        alert_state = alert.get('state')
        threads.append(Thread(target=prom_to_mysql, args=(instance, job, description, summary,active_at, alert_state,severity )))
        threads[-1].start()

    for thread in threads:
        thread.join()

    logging.info('alert_data - done')