From bf06b73afeb7e35d2094dedc1a9e411cd0ce18be Mon Sep 17 00:00:00 2001 From: xunhanliu <1638081534@qq.com> Date: Thu, 30 Jun 2022 18:28:14 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E7=B4=A2?= =?UTF-8?q?=E5=BC=95=E5=92=8C=20=E4=B8=AD=E9=97=B4=E4=BB=B6=E7=9A=84?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MANIFEST.in | 2 +- README.md | 18 +- README_CN.md | 37 +- es_sync/__init__.py | 520 +---------------------------- src/__init__.py => es_sync/mian.py | 256 ++++++++++---- es_sync/sample.yaml | 44 +-- es_sync/utils.py | 29 ++ middleware/keyword.py | 17 + src/sample.yaml | 45 --- test.py | 6 + 10 files changed, 298 insertions(+), 676 deletions(-) rename src/__init__.py => es_sync/mian.py (57%) create mode 100644 es_sync/utils.py create mode 100644 middleware/keyword.py delete mode 100644 src/sample.yaml create mode 100644 test.py diff --git a/MANIFEST.in b/MANIFEST.in index c625213..c7b6dee 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,2 @@ include README.md LICENSE -recursive-include src *.yaml \ No newline at end of file +recursive-include es_sync *.yaml \ No newline at end of file diff --git a/README.md b/README.md index 2abf719..a4200f0 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ +tips: orginal project: [orginal project](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync) + # py-mysql-elasticsearch-sync Simple and fast MySQL to Elasticsearch sync tool, written in Python. -[中文文档](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/README_CN.md) +[中文文档](https://github.com/xunhanliu/py-mysql-elasticsearch-sync/blob/master/README_CN.md) ## Introduction This tool helps you to initialize MySQL dump table to Elasticsearch by parsing mysqldump, then incremental sync MySQL table to Elasticsearch by processing MySQL Binlog. @@ -38,7 +40,7 @@ pip install py-mysql-elasticsearch-sync ``` ## Configuration -There is a [sample config](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/es_sync/sample.yaml) file in repo, you can start by editing it. +There is a [sample config](https://github.com/xunhanliu/py-mysql-elasticsearch-sync/blob/master/es_sync/sample.yaml) file in repo, you can start by editing it. ## Running Simply run command @@ -60,14 +62,8 @@ es-sync path/to/your/config.yaml --fromfile to start sync, when xml sync is over, it will also start binlog sync. ## Deployment -We provide an [upstart script]((https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/upstart.conf)) to help you deploy this tool, you can edit it for your own condition, besides, you can deploy it in your own way. +We provide an [upstart script]((https://github.com/xunhanliu/py-mysql-elasticsearch-sync/blob/master/upstart.conf)) to help you deploy this tool, you can edit it for your own condition, besides, you can deploy it in your own way. ## MultiTable Supporting -Now Multi-table is supported through setting tables in config file, the first table is master as default and the others are slave. - -Master table and slave tables must use the same primary key, which is defined via _id. - -Table has higher priority than tables. - -## TODO -- [ ] MultiIndex Supporting +already supported . +see zh doc-> [中文文档](https://github.com/xunhanliu/py-mysql-elasticsearch-sync/blob/master/README_CN.md) diff --git a/README_CN.md b/README_CN.md index e2d05e4..6a35ac0 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,3 +1,5 @@ +tips: 原始项目为: [原始项目](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync) + # py-mysql-elasticsearch-sync 一个从MySQL向Elasticsearch同步数据的工具,使用Python实现。 @@ -35,7 +37,7 @@ pip install py-mysql-elasticsearch-sync ``` ## 配置 -你可以通过修改[配置文件示例](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/es_sync/sample.yaml)来编写自己的配置文件 +你可以通过修改[配置文件示例](https://github.com/xunhanliu/py-mysql-elasticsearch-sync/blob/master/es_sync/sample.yaml)来编写自己的配置文件 ## 运行 运行命令 @@ -60,14 +62,25 @@ es-sync path/to/your/config.yaml --fromfile 启动从xml导入,当从xml导入完毕后,它会开始同步binlog ## 服务管理 -我们写了一个[upstart脚本](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync/blob/master/upstart.conf)来管理本工具的运行,你也可以用你自己的方式进行部署运行 - -## 多表支持 -你可以在config文件中配置tables以支持多表,默认tables中第一张表为主表,其余表为从表。 - -主表和从表主键必须相同,均为_id字段。 - -当同时设置table和tables时,table优先级较高。 - -## TODO -- [ ] 多索引支持 +我们写了一个[upstart脚本](https://github.com/xunhanliu/py-mysql-elasticsearch-sync/blob/master/upstart.conf)来管理本工具的运行,你也可以用你自己的方式进行部署运行 + +## 对配置文件新增多索引和middleware的说明 +```yaml +# 配置映射 +table_mappings: + - mysql_table_name: keyword + es_index: test + es_type: _doc + middlewares: + - "middleware.keyword:process_id" + mapping: + _id: id + - mysql_table_name: user + es_index: user + es_type: _doc + mapping: + _id: id + middlewares: +``` +mysql_table_name :keyword 表示从keyword表中读取数据, +在每行数据处理过程中,先经过 mapping 操作,然后经过 middleware.keyword:process_id 的中间件的操作,最后导入到es /test/_doc 中, \ No newline at end of file diff --git a/es_sync/__init__.py b/es_sync/__init__.py index eb92e77..01c42a1 100644 --- a/es_sync/__init__.py +++ b/es_sync/__init__.py @@ -1,520 +1,4 @@ -from __future__ import print_function, unicode_literals -from future.builtins import str, range -import sys - -PY2 = sys.version_info[0] == 2 - -if PY2: - import os - DEVNULL = open(os.devnull, 'wb') -else: - from subprocess import DEVNULL - - -def encode_in_py2(s): - if PY2: - return s.encode('utf-8') - return s - -import os.path -import yaml -import signal -import requests -import subprocess -import json -import logging -import shlex -import datetime -import decimal -from lxml.etree import iterparse -from functools import reduce -from pymysqlreplication import BinLogStreamReader -from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent -from pymysqlreplication.event import RotateEvent, XidEvent - -__version__ = '0.4.2' - - -# The magic spell for removing invalid characters in xml stream. -REMOVE_INVALID_PIPE = r'tr -d "\00\01\02\03\04\05\06\07\10\13\14\16\17\20\21\22\23\24\25\26\27\30\31\32\33\34\35\36\37"' - -DEFAULT_BULKSIZE = 100 -DEFAULT_BINLOG_BULKSIZE = 1 - - -class ElasticSync(object): - table_structure = {} - log_file = None - log_pos = None - - @property - def is_binlog_sync(self): - rv = bool(self.log_file and self.log_pos) - return rv - - def __init__(self): - try: - self.config = yaml.load(open(sys.argv[1])) - except IndexError: - print('Error: not specify config file') - exit(1) - - mysql = self.config.get('mysql') - if mysql.get('table'): - self.tables = [mysql.get('table')] - self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} ' \ - '--default-character-set=utf8 -X --opt --quick'.format(**mysql) - elif mysql.get('tables'): - self.tables = mysql.get('tables') - mysql.update({ - 'tables': ' '.join(mysql.get('tables')) - }) - self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} --database {db} --tables {tables} ' \ - '--default-character-set=utf8 -X --opt --quick'.format(**mysql) - else: - print('Error: must specify either table or tables') - exit(1) - self.master = self.tables[0] # use the first table as master - self.current_table = None - - self.binlog_conf = dict( - [(key, self.config['mysql'][key]) for key in ['host', 'port', 'user', 'password', 'db']] - ) - - self.endpoint = 'http://{host}:{port}/{index}/{type}/_bulk'.format( - host=self.config['elastic']['host'], - port=self.config['elastic']['port'], - index=self.config['elastic']['index'], - type=self.config['elastic']['type'] - ) # todo: supporting multi-index - - self.mapping = self.config.get('mapping') or {} - if self.mapping.get('_id'): - self.id_key = self.mapping.pop('_id') - else: - self.id_key = None - - self.ignoring = self.config.get('ignoring') or [] - - record_path = self.config['binlog_sync']['record_file'] - if os.path.isfile(record_path): - with open(record_path, 'r') as f: - record = yaml.load(f) - self.log_file = record.get('log_file') - self.log_pos = record.get('log_pos') - - self.bulk_size = self.config.get('elastic').get('bulk_size') or DEFAULT_BULKSIZE - self.binlog_bulk_size = self.config.get('elastic').get('binlog_bulk_size') or DEFAULT_BINLOG_BULKSIZE - - self._init_logging() - self._force_commit = False - - def _init_logging(self): - logging.basicConfig(filename=self.config['logging']['file'], - level=logging.INFO, - format='[%(levelname)s] - %(filename)s[line:%(lineno)d] - %(asctime)s %(message)s') - self.logger = logging.getLogger(__name__) - logging.getLogger("requests").setLevel(logging.WARNING) # disable requests info logging - - def cleanup(*args): - self.logger.info('Received stop signal') - self.logger.info('Shutdown') - sys.exit(0) - - signal.signal(signal.SIGINT, cleanup) - signal.signal(signal.SIGTERM, cleanup) - - def _post_to_es(self, data): - """ - send post requests to es restful api - """ - resp = requests.post(self.endpoint, data=data) - if resp.json().get('errors'): # a boolean to figure error occurs - for item in resp.json()['items']: - if list(item.values())[0].get('error'): - logging.error(item) - else: - self._save_binlog_record() - - def _bulker(self, bulk_size): - """ - Example: - u = bulker() - u.send(None) #for generator initialize - u.send(json_str) # input json item - u.send(another_json_str) # input json item - ... - u.send(None) force finish bulk and post - """ - while True: - data = "" - for i in range(bulk_size): - item = yield - if item: - data = data + item + "\n" - else: - break - if self._force_commit: - break - # print(data) - print('-'*10) - if data: - self._post_to_es(data) - - self._force_commit = False - - def _updater(self, data): - """ - encapsulation of bulker - """ - if self.is_binlog_sync: - u = self._bulker(bulk_size=self.binlog_bulk_size) - else: - u = self._bulker(bulk_size=self.bulk_size) - - u.send(None) # push the generator to first yield - for item in data: - u.send(item) - u.send(None) # tell the generator it's the end - - def _json_serializer(self, obj): - """ - format the object which json not supported - """ - if isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date): - return obj.isoformat() - elif isinstance(obj, decimal.Decimal): - return str(obj) - raise TypeError('Type not serializable for obj {obj}'.format(obj=obj)) - - def _processor(self, data): - """ - The action must be one of the following: - create - Create a document only if the document does not already exist. - index - Create a new document or replace an existing document. - update - Do a partial update on a document. - delete - Delete a document. - """ - for item in data: - if self.id_key: - action_content = {'_id': item['doc'][self.id_key]} - else: - action_content = {} - for field in self.ignoring: - try: - item['doc'].pop(field) - except KeyError: - pass - meta = json.dumps({item['action']: action_content}) - if item['action'] == 'index': - body = json.dumps(item['doc'], default=self._json_serializer) - rv = meta + '\n' + body - elif item['action'] == 'update': - body = json.dumps({'doc': item['doc']}, default=self._json_serializer) - rv = meta + '\n' + body - elif item['action'] == 'delete': - rv = meta + '\n' - elif item['action'] == 'create': - body = json.dumps(item['doc'], default=self._json_serializer) - rv = meta + '\n' + body - else: - logging.error('unknown action type in doc') - raise TypeError('unknown action type in doc') - yield rv - - def _mapper(self, data): - """ - mapping old key to new key - """ - for item in data: - if self.mapping: - for k, v in self.mapping.items(): - try: - item['doc'][k] = item['doc'][v] - del item['doc'][v] - except KeyError: - continue - # print(doc) - yield item - - def _formatter(self, data): - """ - format every field from xml, according to parsed table structure - """ - for item in data: - for field, serializer in self.table_structure.items(): - if field in item['doc'] and item['doc'][field]: - try: - item['doc'][field] = serializer(item['doc'][field]) - except ValueError as e: - self.logger.error( - "Error occurred during format, ErrorMessage:{msg}, ErrorItem:{item}".format( - msg=str(e), - item=str(item))) - item['doc'][field] = None - except TypeError as e: - item['doc'][field] = None - # print(item) - yield item - - def _binlog_loader(self): - """ - read row from binlog - """ - if self.is_binlog_sync: - resume_stream = True - logging.info("Resume from binlog_file: {file} binlog_pos: {pos}".format(file=self.log_file, - pos=self.log_pos)) - else: - resume_stream = False - - stream = BinLogStreamReader(connection_settings=self.binlog_conf, - server_id=self.config['mysql']['server_id'], - only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, RotateEvent, XidEvent], - only_tables=self.tables, - resume_stream=resume_stream, - blocking=True, - log_file=self.log_file, - log_pos=self.log_pos) - for binlogevent in stream: - self.log_file = stream.log_file - self.log_pos = stream.log_pos - - # RotateEvent to update binlog record when no related table changed - if isinstance(binlogevent, RotateEvent): - self._save_binlog_record() - continue - - if isinstance(binlogevent, XidEvent): # event_type == 16 - self._force_commit = True - continue - - for row in binlogevent.rows: - if isinstance(binlogevent, DeleteRowsEvent): - if binlogevent.table == self.master: - rv = { - 'action': 'delete', - 'doc': row['values'] - } - else: - rv = { - 'action': 'update', - 'doc': {k: row['values'][k] if self.id_key and self.id_key == k else None for k in row['values']} - } - elif isinstance(binlogevent, UpdateRowsEvent): - rv = { - 'action': 'update', - 'doc': row['after_values'] - } - elif isinstance(binlogevent, WriteRowsEvent): - if binlogevent.table == self.master: - rv = { - 'action': 'create', - 'doc': row['values'] - } - else: - rv = { - 'action': 'update', - 'doc': row['values'] - } - else: - logging.error('unknown action type in binlog') - raise TypeError('unknown action type in binlog') - yield rv - # print(rv) - stream.close() - raise IOError('mysql connection closed') - - def _parse_table_structure(self, data): - """ - parse the table structure - """ - for item in data.iter(): - if item.tag == 'field': - field = item.attrib.get('Field') - type = item.attrib.get('Type') - if 'int' in type: - serializer = int - elif 'float' in type: - serializer = float - elif 'datetime' in type: - if '(' in type: - serializer = lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f') - else: - serializer = lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') - elif 'char' in type: - serializer = str - elif 'text' in type: - serializer = str - else: - serializer = str - self.table_structure[field] = serializer - - def _parse_and_remove(self, f, path): - """ - snippet from python cookbook, for parsing large xml file - """ - path_parts = path.split('/') - doc = iterparse(f, ('start', 'end'), recover=False, encoding='utf-8', huge_tree=True) - # Skip the root element - next(doc) - tag_stack = [] - elem_stack = [] - for event, elem in doc: - if event == 'start': - if elem.tag == 'table_data': - self.current_table = elem.attrib['name'] - tag_stack.append(elem.tag) - elem_stack.append(elem) - elif event == 'end': - if tag_stack == ['database', 'table_data']: - self.current_table = None - if tag_stack == path_parts: - yield elem - elem_stack[-2].remove(elem) - if tag_stack == ['database', 'table_structure']: - # dirty hack for getting the tables structure - self._parse_table_structure(elem) - elem_stack[-2].remove(elem) - try: - tag_stack.pop() - elem_stack.pop() - except IndexError: - pass - - def _xml_parser(self, f_obj): - """ - parse mysqldump XML streaming, convert every item to dict object. - 'database/table_data/row' - """ - for row in self._parse_and_remove(f_obj, 'database/table_data/row'): - doc = {} - for field in row.iter(tag='field'): - k = field.attrib.get('name') - v = field.text - doc[k] = v - if not self.current_table or self.current_table == self.master: - yield {'action': 'create', 'doc': doc} - else: - yield {'action': 'update', 'doc': doc} - - def _save_binlog_record(self): - if self.is_binlog_sync: - with open(self.config['binlog_sync']['record_file'], 'w') as f: - logging.info("Sync binlog_file: {file} binlog_pos: {pos}".format( - file=self.log_file, - pos=self.log_pos) - ) - yaml.safe_dump({"log_file": self.log_file, - "log_pos": self.log_pos}, - f, - default_flow_style=False) - - def _xml_dump_loader(self): - mysqldump = subprocess.Popen( - shlex.split(encode_in_py2(self.dump_cmd)), - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - - remove_invalid_pipe = subprocess.Popen( - shlex.split(encode_in_py2(REMOVE_INVALID_PIPE)), - stdin=mysqldump.stdout, - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - - return remove_invalid_pipe.stdout - - def _xml_file_loader(self, filename): - f = open(filename, 'rb') # bytes required - - remove_invalid_pipe = subprocess.Popen( - shlex.split(encode_in_py2(REMOVE_INVALID_PIPE)), - stdin=f, - stdout=subprocess.PIPE, - stderr=DEVNULL, - close_fds=True) - return remove_invalid_pipe.stdout - - def _send_email(self, title, content): - """ - send notification email - """ - if not self.config.get('email'): - return - - import smtplib - from email.mime.text import MIMEText - - msg = MIMEText(content) - msg['Subject'] = title - msg['From'] = self.config['email']['from']['username'] - msg['To'] = ', '.join(self.config['email']['to']) - - # Send the message via our own SMTP server. - s = smtplib.SMTP() - s.connect(self.config['email']['from']['host']) - s.login(user=self.config['email']['from']['username'], - password=self.config['email']['from']['password']) - s.sendmail(msg['From'], msg['To'], msg=msg.as_string()) - s.quit() - - def _sync_from_stream(self): - logging.info("Start to dump from stream") - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, - self._processor], - self._xml_dump_loader()) - self._updater(docs) - logging.info("Dump success") - - def _sync_from_file(self): - logging.info("Start to dump from xml file") - logging.info("Filename: {}".format(self.config['xml_file']['filename'])) - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, - self._processor], - self._xml_file_loader(self.config['xml_file']['filename'])) - self._updater(docs) - logging.info("Dump success") - - def _sync_from_binlog(self): - logging.info("Start to sync binlog") - docs = reduce(lambda x, y: y(x), [self._mapper, - self._processor], - self._binlog_loader()) - self._updater(docs) - - def run(self): - """ - workflow: - 1. sync dump data - 2. sync binlog - """ - try: - if not self.is_binlog_sync: - if len(sys.argv) > 2 and sys.argv[2] == '--fromfile': - self._sync_from_file() - else: - self._sync_from_stream() - self._sync_from_binlog() - except Exception: - import traceback - logging.error(traceback.format_exc()) - self._send_email('es sync error', traceback.format_exc()) - raise - - -def start(): - instance = ElasticSync() - instance.run() +from .mian import ElasticSync,start if __name__ == '__main__': - start() + start() \ No newline at end of file diff --git a/src/__init__.py b/es_sync/mian.py similarity index 57% rename from src/__init__.py rename to es_sync/mian.py index 0f59283..35cd6b5 100644 --- a/src/__init__.py +++ b/es_sync/mian.py @@ -1,6 +1,7 @@ from __future__ import print_function, unicode_literals from future.builtins import str, range import sys + PY2 = sys.version_info[0] == 2 if PY2: @@ -8,6 +9,8 @@ DEVNULL = open(os.devnull, 'wb') else: from subprocess import DEVNULL + + def encode_in_py2(s): if PY2: return s.encode('utf-8') @@ -21,13 +24,15 @@ def encode_in_py2(s): import json import logging import shlex -from datetime import datetime +import datetime +import decimal from lxml.etree import iterparse from functools import reduce from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent +from pymysqlreplication.event import RotateEvent, XidEvent -__version__ = '0.3.3.1' +__version__ = '0.4.2' # The magic spell for removing invalid characters in xml stream. @@ -49,30 +54,43 @@ def is_binlog_sync(self): def __init__(self): try: - self.config = yaml.load(open(sys.argv[1])) + with open(sys.argv[1], 'r', encoding='utf-8') as f: + self.config = yaml.load(f, Loader=yaml.FullLoader) + except IndexError: print('Error: not specify config file') exit(1) - self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} ' \ - '--default-character-set=utf8 -X'.format(**self.config['mysql']) + mysql = self.config.get('mysql') + self.table_mappings=self.config.get("table_mappings") + self.mysqltb2es={item["mysql_table_name"]:item for item in self.table_mappings} + self.table_structure={item["mysql_table_name"]:{} for item in self.table_mappings} + self.tables=[item.get("mysql_table_name") for item in self.table_mappings if item.get("mysql_table_name")] + if len(self.tables)==1: + self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} {db} {table} ' \ + '--default-character-set={character_set} -X --quick'.format(**mysql,table=self.tables[0]) # --opt + elif len(self.tables)>1: + mysql.update({ + 'tables': ' '.join(self.tables) + }) + self.dump_cmd = 'mysqldump -h {host} -P {port} -u {user} --password={password} --database {db} --tables {tables} ' \ + '--default-character-set={character_set} -X --quick'.format(**mysql) # --opt + else: + print('Error: must specify either table or tables') + exit(1) + self.current_table = None self.binlog_conf = dict( [(key, self.config['mysql'][key]) for key in ['host', 'port', 'user', 'password', 'db']] ) - self.endpoint = 'http://{host}:{port}/{index}/{type}/_bulk'.format( + self.endpoint = "http://{host}:{port}/_bulk".format( host=self.config['elastic']['host'], port=self.config['elastic']['port'], - index=self.config['elastic']['index'], - type=self.config['elastic']['type'] - ) # todo: supporting multi-index + ) - self.mapping = self.config.get('mapping') or {} - if self.mapping.get('_id'): - self.id_key = self.mapping.pop('_id') - else: - self.id_key = None + + self.ignoring = self.config.get('ignoring') or [] record_path = self.config['binlog_sync']['record_file'] if os.path.isfile(record_path): @@ -85,11 +103,12 @@ def __init__(self): self.binlog_bulk_size = self.config.get('elastic').get('binlog_bulk_size') or DEFAULT_BINLOG_BULKSIZE self._init_logging() + self._force_commit = False def _init_logging(self): logging.basicConfig(filename=self.config['logging']['file'], level=logging.INFO, - format='[%(levelname)s] %(asctime)s %(message)s') + format='[%(levelname)s] - %(filename)s[line:%(lineno)d] - %(asctime)s %(message)s') self.logger = logging.getLogger(__name__) logging.getLogger("requests").setLevel(logging.WARNING) # disable requests info logging @@ -101,15 +120,23 @@ def cleanup(*args): signal.signal(signal.SIGINT, cleanup) signal.signal(signal.SIGTERM, cleanup) - def _post_to_es(self, data): + def _post_to_es(self,es_index,es_type, data): """ send post requests to es restful api """ - resp = requests.post(self.endpoint, data=data) - if resp.json().get('errors'): # a boolean to figure error occurs - for item in resp.json()['items']: + headers = { + 'Content-Type': 'application/json', + } + resp = requests.post(self.endpoint, data=data,headers=headers) + resp_json = resp.json() + if resp_json.get('errors'): # a boolean to figure error occurs + for item in resp_json['items']: if list(item.values())[0].get('error'): logging.error(item) + print("出错啦",item) + if resp_json.get('error'): + logging.error(resp_json.get('error')) + print("出错啦",resp_json.get('error')) else: self._save_binlog_record() @@ -118,32 +145,40 @@ def _bulker(self, bulk_size): Example: u = bulker() u.send(None) #for generator initialize - u.send(json_str) # input json item - u.send(another_json_str) # input json item + u.send({"es_index":"","es_type":"" ,"data":""}) # input json item + u.send({"es_index":"","es_type":"" ,"data":""}) # input json item ... u.send(None) force finish bulk and post """ while True: - data = "" + data_object={} for i in range(bulk_size): item = yield if item: - data = data + item + "\n" + if (item["es_index"],item["es_type"]) in data_object: + data_object[(item["es_index"],item["es_type"])]+=item["data"]+"\n" + else: + data_object[(item["es_index"], item["es_type"])]= item["data"] + "\n" else: break + if self._force_commit: + break # print(data) print('-'*10) - if data: - self._post_to_es(data) + if data_object: + for (es_index,es_type),data in data_object.items(): + self._post_to_es(es_index,es_type,data) # 对数据进行分流 + + self._force_commit = False def _updater(self, data): """ encapsulation of bulker """ if self.is_binlog_sync: - u = self._bulker(bulk_size=self.binlog_bulk_size) + u = self._bulker(bulk_size=self.binlog_bulk_size) else: - u = self._bulker(bulk_size=self.bulk_size) + u = self._bulker(bulk_size=self.bulk_size) u.send(None) # push the generator to first yield for item in data: @@ -154,9 +189,11 @@ def _json_serializer(self, obj): """ format the object which json not supported """ - if isinstance(obj, datetime): + if isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date): return obj.isoformat() - raise TypeError('Type not serializable') + elif isinstance(obj, decimal.Decimal): + return str(obj) + raise TypeError('Type not serializable for obj {obj}'.format(obj=obj)) def _processor(self, data): """ @@ -169,55 +206,91 @@ def _processor(self, data): Do a partial update on a document. delete Delete a document. + return [{"es_index":"","es_type":"" ,"data":""}] """ for item in data: - if self.id_key: - action_content = {'_id': item['doc'][self.id_key]} + if not item["doc"]: + print("数据为空") + continue + if "_id" in item['doc']: + action_content = {'_id': item['doc']["_id"],"_index":item["es_index"],"_type":item["es_type"]} else: - action_content = {} + action_content = {"_index":item["es_index"],"_type":item["es_type"]} + for field in self.ignoring: + try: + item['doc'].pop(field) + except KeyError: + pass meta = json.dumps({item['action']: action_content}) if item['action'] == 'index': body = json.dumps(item['doc'], default=self._json_serializer) rv = meta + '\n' + body elif item['action'] == 'update': + if "_id" in item['doc']: + if "id" not in item['doc']: + item['doc']["id"]=item['doc']["_id"] + del item['doc']["_id"] body = json.dumps({'doc': item['doc']}, default=self._json_serializer) rv = meta + '\n' + body elif item['action'] == 'delete': rv = meta + '\n' elif item['action'] == 'create': + if "_id" in item['doc']: + if "id" not in item['doc']: + item['doc']["id"]=item['doc']["_id"] + del item['doc']["_id"] body = json.dumps(item['doc'], default=self._json_serializer) rv = meta + '\n' + body else: logging.error('unknown action type in doc') raise TypeError('unknown action type in doc') - yield rv + # print("入库数据:", rv) + yield {"es_index":item["es_index"],"es_type":item["es_type"] ,"data":rv} def _mapper(self, data): """ - mapping old key to new key + mapping old key to new key or process middleware """ + from .utils import ref_to_obj for item in data: - if self.mapping: - for k, v in self.mapping.items(): - item['doc'][k] = item['doc'][v] - del item['doc'][v] - # print(doc) - yield item + map_item=self.mysqltb2es[item["table"]] + item["es_index"] = map_item["es_index"] + item["es_type"] = map_item["es_type"] + if map_item["mapping"]: + for k, v in map_item["mapping"].items(): + try: + item['doc'][k] = item['doc'][v] + # del item['doc'][v] + except KeyError: + continue + # print("mapper结果:",item) + middlewares=map_item.get("middlewares",[]) + if middlewares: + # 处理middleware + yield reduce(lambda x, y: y(x), + [ref_to_obj(middleware_str) for middleware_str in middlewares] , + item) + else: + yield item def _formatter(self, data): """ format every field from xml, according to parsed table structure """ for item in data: - for field, serializer in self.table_structure.items(): - if item['doc'][field]: + table_name=item["table"] + for field, serializer in self.table_structure[table_name].items(): + if field in item['doc'] and item['doc'][field]: try: item['doc'][field] = serializer(item['doc'][field]) except ValueError as e: - self.logger.error("Error occurred during format, ErrorMessage:{msg}, ErrorItem:{item}".format( + self.logger.error( + "Error occurred during format, ErrorMessage:{msg}, ErrorItem:{item}".format( msg=str(e), item=str(item))) item['doc'][field] = None + except TypeError as e: + item['doc'][field] = None # print(item) yield item @@ -234,8 +307,8 @@ def _binlog_loader(self): stream = BinLogStreamReader(connection_settings=self.binlog_conf, server_id=self.config['mysql']['server_id'], - only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], - only_tables=[self.config['mysql']['table']], + only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent, RotateEvent, XidEvent], + only_tables=self.tables, resume_stream=resume_stream, blocking=True, log_file=self.log_file, @@ -243,22 +316,38 @@ def _binlog_loader(self): for binlogevent in stream: self.log_file = stream.log_file self.log_pos = stream.log_pos + + # RotateEvent to update binlog record when no related table changed + if isinstance(binlogevent, RotateEvent): + self._save_binlog_record() + continue + + if isinstance(binlogevent, XidEvent): # event_type == 16 + self._force_commit = True + continue + for row in binlogevent.rows: if isinstance(binlogevent, DeleteRowsEvent): rv = { 'action': 'delete', - 'doc': row['values'] + 'doc': row['values'], + "table":binlogevent.table } + + elif isinstance(binlogevent, UpdateRowsEvent): rv = { 'action': 'update', - 'doc': row['after_values'] + 'doc': row['after_values'], + "table": binlogevent.table } elif isinstance(binlogevent, WriteRowsEvent): rv = { - 'action': 'index', - 'doc': row['values'] - } + 'action': 'create', + 'doc': row['values'], + "table": binlogevent.table + } + else: logging.error('unknown action type in binlog') raise TypeError('unknown action type in binlog') @@ -281,16 +370,22 @@ def _parse_table_structure(self, data): serializer = float elif 'datetime' in type: if '(' in type: - serializer = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f') + serializer = lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f') else: - serializer = lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S') + serializer = lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S') elif 'char' in type: serializer = str elif 'text' in type: serializer = str else: serializer = str - self.table_structure[field] = serializer + if not self.current_table: + print('未解析到表名,就开始解析表结构') + exit(1) + if self.current_table not in self.table_structure: + self.table_structure[self.current_table]={field:serializer} + else: + self.table_structure[self.current_table][field] = serializer def _parse_and_remove(self, f, path): """ @@ -304,13 +399,19 @@ def _parse_and_remove(self, f, path): elem_stack = [] for event, elem in doc: if event == 'start': + if elem.tag == 'table_data': + self.current_table = elem.attrib['name'] tag_stack.append(elem.tag) elem_stack.append(elem) elif event == 'end': + if tag_stack == ['database', 'table_data']: + self.current_table = None if tag_stack == path_parts: yield elem elem_stack[-2].remove(elem) - if tag_stack == ['database', 'table_structure']: # dirty hack for getting the tables structure + if tag_stack == ['database', 'table_structure']: + # dirty hack for getting the tables structure + self.current_table = elem_stack[-1].attrib['name'] self._parse_table_structure(elem) elem_stack[-2].remove(elem) try: @@ -321,7 +422,8 @@ def _parse_and_remove(self, f, path): def _xml_parser(self, f_obj): """ - parse mysqldump XML streaming, convert every item to dict object. 'database/table_data/row' + parse mysqldump XML streaming, convert every item to dict object. + 'database/table_data/row' """ for row in self._parse_and_remove(f_obj, 'database/table_data/row'): doc = {} @@ -329,8 +431,11 @@ def _xml_parser(self, f_obj): k = field.attrib.get('name') v = field.text doc[k] = v - yield {'action': 'index', 'doc': doc} - + if self.current_table :# 解析xml中只有裸的数据,没有其他操作 + yield {'action': 'create', 'doc': doc,"table":self.current_table} + else: + print('Error: xml 中没有表名') + exit(1) def _save_binlog_record(self): if self.is_binlog_sync: with open(self.config['binlog_sync']['record_file'], 'w') as f: @@ -338,7 +443,10 @@ def _save_binlog_record(self): file=self.log_file, pos=self.log_pos) ) - yaml.safe_dump({"log_file": self.log_file, "log_pos": self.log_pos}, f, default_flow_style=False) + yaml.safe_dump({"log_file": self.log_file, + "log_pos": self.log_pos}, + f, + default_flow_style=False) def _xml_dump_loader(self): mysqldump = subprocess.Popen( @@ -358,7 +466,14 @@ def _xml_dump_loader(self): def _xml_file_loader(self, filename): f = open(filename, 'rb') # bytes required - return f + + remove_invalid_pipe = subprocess.Popen( + shlex.split(encode_in_py2(REMOVE_INVALID_PIPE)), + stdin=f, + stdout=subprocess.PIPE, + stderr=DEVNULL, + close_fds=True) + return remove_invalid_pipe.stdout def _send_email(self, title, content): """ @@ -385,20 +500,20 @@ def _send_email(self, title, content): def _sync_from_stream(self): logging.info("Start to dump from stream") - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, - self._processor], + docs = reduce(lambda x, y: y(x), [self._xml_parser, #yield {'action': 'create', 'doc': doc,"table":"XXX"} + self._formatter, # yield {'action': 'create', 'doc': doc,"table":"XXX"} + self._mapper, # yield {'action': 'create', 'doc': doc,"table":"XXX","es_index":"", "es_type":""} + self._processor],#yield : [{"es_index":"","es_type":"" ,"data":""}] self._xml_dump_loader()) - self._updater(docs) + self._updater(docs) # need : [{"es_index":"","es_type":"" ,"data":""}] logging.info("Dump success") def _sync_from_file(self): logging.info("Start to dump from xml file") logging.info("Filename: {}".format(self.config['xml_file']['filename'])) - docs = reduce(lambda x, y: y(x), [self._xml_parser, - self._formatter, - self._mapper, + docs = reduce(lambda x, y: y(x), [self._xml_parser, + self._formatter, + self._mapper, self._processor], self._xml_file_loader(self.config['xml_file']['filename'])) self._updater(docs) @@ -406,7 +521,9 @@ def _sync_from_file(self): def _sync_from_binlog(self): logging.info("Start to sync binlog") - docs = reduce(lambda x, y: y(x), [self._mapper, self._processor], self._binlog_loader()) + docs = reduce(lambda x, y: y(x), [self._mapper, + self._processor], + self._binlog_loader()) self._updater(docs) def run(self): @@ -433,5 +550,4 @@ def start(): instance = ElasticSync() instance.run() -if __name__ == '__main__': - start() + diff --git a/es_sync/sample.yaml b/es_sync/sample.yaml index 5a6a64f..992a689 100644 --- a/es_sync/sample.yaml +++ b/es_sync/sample.yaml @@ -1,36 +1,42 @@ # The mysql database which you want to sync mysql: - host: 127.0.0.1 - port: 3306 - user: foo - password: bar - db: mydb - table: mytable - tables: # support multi-table here, you can set tables instead of table and the first one will be set to master as default - - table1 + host: host + port: port + user: user + password: password + db: db server_id: 1 # this should be unique + character_set: utf8 elastic: - host: 127.0.0.1 - port: 9200 + host: host + port: post bulk_size: 200 # the update bulk size when mysqldump, default is 100 if not specified binlog_bulk_size: 10 # the update bulk size when syncing binlog, default is 1 if not specified - index: article - type: article + # path to your own xml file, if you want to initialize dump from xml file. run with argument --fromfile in command xml_file: filename: a.xml +# 配置映射 +table_mappings: + - mysql_table_name: keyword + es_index: test + es_type: _doc + middlewares: + - "middleware.keyword:process_id" + mapping: + _id: id + - mysql_table_name: user + es_index: user + es_type: _doc + mapping: + _id: id + middlewares: + # If you want to map your column, put the column name as the value, and es field name as the key, # Particularly , if you set _id as follows, it will use myid column as the index doc's id, or ES will generate an id as default -mapping: - _id: myid - es_field_name: mysql_column_name - -# You can set ignoring fields here, and these fields will not be post to ES. -ignoring: - - ignoring_field # The log file's path logging: diff --git a/es_sync/utils.py b/es_sync/utils.py new file mode 100644 index 0000000..df1536d --- /dev/null +++ b/es_sync/utils.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +def ref_to_obj(ref): + """ + Returns the object pointed to by ``ref``. + + :type ref: str + + """ + if not isinstance(ref, str): + raise TypeError('References must be strings') + if ':' not in ref: + raise ValueError('Invalid reference') + + modulename, rest = ref.split(':', 1) + try: + obj = __import__(modulename, fromlist=[rest]) + except ImportError: + a=10 + raise LookupError('Error resolving reference %s: could not import module' % ref) + # except Exception as e: + # b=10 + # a=100 + try: + for name in rest.split('.'): + obj = getattr(obj, name) + return obj + except Exception: + raise LookupError('Error resolving reference %s: error looking up object' % ref) diff --git a/middleware/keyword.py b/middleware/keyword.py new file mode 100644 index 0000000..3ca433a --- /dev/null +++ b/middleware/keyword.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +''' +@File : keyword.py +@License : (C)Copyright 2022 , 社交数字人 +''' + + +def process_id(item): + """ + 收到的数据是这样的。middleware 安排在了_mapping 后面怕 + {'action': 'create', 'doc': doc,"table":"XXX","es_index":"", "es_type":""} + mapping old key to new key + """ + # 把storage_t 重置为0 + item["doc"]["storage_t"]=100 + return item \ No newline at end of file diff --git a/src/sample.yaml b/src/sample.yaml deleted file mode 100644 index 8756f82..0000000 --- a/src/sample.yaml +++ /dev/null @@ -1,45 +0,0 @@ -# The mysql database which you want to sync -mysql: - host: 127.0.0.1 - port: 3306 - user: foo - password: bar - db: mydb - table: mytable - server_id: 1 # this should be unique - -elastic: - host: 127.0.0.1 - port: 9200 - bulk_size: 200 # the update bulk size when mysqldump, default is 100 if not specified - binlog_bulk_size: 10 # the update bulk size when syncing binlog, default is 1 if not specified - index: article - type: article - -# path to your own xml file, if you want to initialize dump from xml file. run with argument --fromfile in command -xml_file: - filename: a.xml - -# If you want to map your column, put the column name as the value, and es field name as the key, -# Particularly , if you set _id as follows, it will use myid column as the index doc's id, or ES will generate an id as default -mapping: - _id: myid - es_field_name: mysql_column_name - -# The log file's path -logging: - file: mylog.log - -# The record file's path, which record the latest synced binlog file and position -binlog_sync: - record_file: binlog.info - -# If you want to email notification when error occurs, fill this -email: - from: # the sender's email, uses smtp protocol - host: smtp.example.com - username: sender@example.com - password: senderpassword - to: # a list of notification recipients - - first_recipient@example.com - - second_recipient@example.com \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..a2999b6 --- /dev/null +++ b/test.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +from es_sync import start +start() + +# 这个脚本启动选要指定一个sample.yaml 参数 \ No newline at end of file From 7de2561d9ea64a5fe0cb391a4a0593043e514b71 Mon Sep 17 00:00:00 2001 From: xunhanliu <1638081534@qq.com> Date: Fri, 1 Jul 2022 14:06:10 +0800 Subject: [PATCH 2/3] 'dockerfile' --- Dockerfile | 20 ++++++++++++++++++++ README_CN.md | 5 +++++ binlog.info | 2 ++ es_sync/mian.py | 2 +- requirements.txt | 10 +++++----- test.py => run.py | 0 setup.py | 12 ++++++------ 7 files changed, 39 insertions(+), 12 deletions(-) create mode 100644 Dockerfile create mode 100644 binlog.info rename test.py => run.py (100%) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2aa95a2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM rackspacedot/python37 +LABEL MAINTAINER="xunhanliu<1638081534@qq.com>" +# 修改时区、 pip>10 可以config 换源 +RUN echo "Asia/Shanghai" > /etc/timezone \ + && pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/ \ + && python -m pip install --upgrade pip + + + +RUN mkdir -p /usr/src/middleware + +WORKDIR /usr/src + +COPY es_sync . +COPY run.py . + +ADD requirements.txt /usr/src +RUN pip install -r /usr/src/requirements.txt +CMD ["python","run.py", "config.yaml"] +# docker build 示例: docker build -t py-mysql-elasticsearch-sync:latest . \ No newline at end of file diff --git a/README_CN.md b/README_CN.md index 6a35ac0..c76258d 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,3 +1,8 @@ +docker部署需要挂载 middleware 目录,config.yaml binlog.info 文件 + + + + tips: 原始项目为: [原始项目](https://github.com/zhongbiaodev/py-mysql-elasticsearch-sync) # py-mysql-elasticsearch-sync diff --git a/binlog.info b/binlog.info new file mode 100644 index 0000000..a854154 --- /dev/null +++ b/binlog.info @@ -0,0 +1,2 @@ +log_file: +log_pos: diff --git a/es_sync/mian.py b/es_sync/mian.py index 35cd6b5..d5a2e2c 100644 --- a/es_sync/mian.py +++ b/es_sync/mian.py @@ -32,7 +32,7 @@ def encode_in_py2(s): from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent from pymysqlreplication.event import RotateEvent, XidEvent -__version__ = '0.4.2' +__version__ = '0.5.0' # The magic spell for removing invalid characters in xml stream. diff --git a/requirements.txt b/requirements.txt index 84be5a5..f8e1582 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -PyMySQL==0.6.7 -mysql-replication>=0.8 -requests>=2.9.1 -PyYAML>=3.11 -lxml>=3.5.0 +PyMySQL==1.0.2 +mysql-replication==0.30 +requests==2.26.0 +PyYAML==5.3 +lxml==4.5.0 future>=0.15.2 #for py2 compat diff --git a/test.py b/run.py similarity index 100% rename from test.py rename to run.py diff --git a/setup.py b/setup.py index ded9589..c5c1205 100644 --- a/setup.py +++ b/setup.py @@ -11,12 +11,12 @@ author_email='windfarer@gmail.com', description='MySQL to Elasticsearch sync tool', install_requires=[ - 'PyMySQL==0.6.7', - 'mysql-replication==0.9', - 'requests==2.9.1', - 'PyYAML==3.11', - 'lxml==3.5.0', - 'future==0.15.2' + 'PyMySQL==1.0.2', + 'mysql-replication==0.30', + 'requests==2.26.0', + 'PyYAML==5.3', + 'lxml==4.5.0', + 'future==0.15.2', ], entry_points={ 'console_scripts': [ From abefeef6bec255f57528fec4a04e3520b2e81453 Mon Sep 17 00:00:00 2001 From: xunhanliu <1638081534@qq.com> Date: Fri, 1 Jul 2022 14:08:39 +0800 Subject: [PATCH 3/3] dockerfile --- Dockerfile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2aa95a2..b77374b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,16 +5,17 @@ RUN echo "Asia/Shanghai" > /etc/timezone \ && pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/ \ && python -m pip install --upgrade pip +RUN apt-get update && apt-get install mysql-client -y RUN mkdir -p /usr/src/middleware WORKDIR /usr/src -COPY es_sync . +COPY es_sync/* ./es_sync/ COPY run.py . ADD requirements.txt /usr/src RUN pip install -r /usr/src/requirements.txt CMD ["python","run.py", "config.yaml"] -# docker build 示例: docker build -t py-mysql-elasticsearch-sync:latest . \ No newline at end of file +# docker build 示例: docker build -t py-mysql-elasticsearch-sync:latest .