一种小资源情况下RDS数据实时同步StarRocks方案
一、背景目前需要将阿里云RDS数据库的数据同步到自建的StarRocks集群。之前使用DolphinScheduler通过定时调度Datax任务,将数据同步到StarRocks集群中,但是随着业务的发展,这种方式出现了三个问题:
1.为了满足系统三级等保的要求,阿里云RDS不再支持通过公网进行访问,只能在阿里云内网中进行访问。
2.随着业务的发展,批量的数据同步已经无法满足业务对数据更新频率的要求。
为了解决以上的问题,诞生了如下的数据同步架构。
二、数据同步架构
为了解决上面面临的问题,设计了如下的数据同步架构,来进行数据的实时同步。具体架构如下:
1.使用一台4C8G的阿里云服务器,该服务器可以访问内网的RDS服务器。
2.将KAFKA集群开通公网访问。
3.在这台阿里云服务器上面部署数据实时同步的脚步,一边实时读取RDS的binlog,将其解析加密之后发送到KAFKA中。
4.在公司内网环境中创建KAFKA CONNECTOR集群,创建connector将kafka数据解密之后同步到公司自建的StarRocks中。
三、现有方案的调研
建设初期调研了一些现在主流的方案,但是发现各个方案都存在一定的问题吗,从而选择了目前的这种方式,调用的现有解决方案有:
[*]Flink-CDC
Flink-CDC是目前最流行的实时数据同步方案,但是经过调研发现Flink集群所需资源太大,目前只有一台4C8G的阿里云服务器,而且这已经是能得到的最高配置。
用于目前业务的分库分表多种多样(按照范围分表, 按照组织分表,按照年度分表,按照月度分表,按照组织月度组合分表等等),业务表因为历史原因存在大量的字段不规范问题(全大写、全小写、驼峰、下划线等等),采用Flink-CDC如果分表建设task,则资源根本不够,如果耦合在一起,则需要进行大量的编码,后续修改复杂,因此放弃该方案。
[*]Apache SeaTunnel
Apache SeaTunnel是目前流行的另外一个实时数据同步工具,但是其目前无法支持表的模糊匹配,由于业务系统中存储多种方式的分库分表技术,而且分表数量巨大,有些表分表数量成百上千,有些按照组织分表的则是随时可能新增表,导致其很难进行兼容,需要进行上千张表的配置,基本没有可行性,所以放弃该方案。
四、核心步骤技术方案
[*]binlog实时消费
binlog的实时数据同步采用开源项目python-mysql-replication进行实现,python-mysql-replication是在PyMYSQL之上构建的MySQL复制协议的纯Python实现,通过其可以很简单的实时消费RDS数据库的binlog。
Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL. This allows you to receive event like insert, update, delete with their datas and raw SQL queries.
[*]数据加解密
为了保证数据在两个内网直接传输的安全,要求需要对进行传输的数据进行加密,经过调研之后选择了 AES-GCM对称加密,AES-GCM是一种 高效,支持硬件加速 ,适用于大数据量加密、文件加密、流加密 。
[*]数据同步到StarRocks
从kafka消费数据到StarRocks,采用的使用StarRocks官方支持的starrocks-connector-for-kafka,但是由于我们的数据进行了加密操作,所以需要对该组件进行扩展,再其中加入进行数据解密的操作。
[*]kafka内外网映射
由于RDS和StarRocks在两个不同的内网之中,为了连通两个内网,使用kafka进行数据的中转操作。这就需要kafka能够提供公网的访问。通过配置不同的advertised.listeners来进行实现。
[*]批量同步
在进行一张历史已经存在的表数据同步的时候,需要先同步历史已经存在的数据,然后再按照binlog实时进行新数据的同步工作,历史数据的同步采用DataX来进行同步。
[*]DataX写Kafka
DataX属于批量数据同步的组件,而Kafka属于流式数据同步的组件,两者的定位不一致,因此DataX官方并没有用于Kafka的Writer,这就需要我们自己进行扩展,编写Kafka-Writer,来进行支持。
[*]StarRocks表的增删改
StarRocks中存在主键表模型,该模型支持数据的增删改操作,同时starrocks-connector-for-kafka底层采用StreamLoad进行实现,StreamLoad支持通过在数据中增加对应的__op字段来支持对表的数据进行增删改。
[*]分库分表的支持
系统存在多种方式的分库分表,由于分库分表之后的主键可能重复,因此可以在数据同步的时候,对分库分表进行分析,设计以 (表名,原表主键) 或者 (库名,表名,原表主键)作为对应StarRocks表的主键,来进行对应的支持操作。
五、数据同步过程说明
下面以一张已经存在的表如何进行数据同步为例,进行整个数据同步过程的说明:
[*]根据要同步的RDS中表的结构信息,在StarRocks中创建对应的表。
[*]在kafka中创建对应的进行历史批量数据同步的topic和binlog增量同步的topic。
[*]在进行增量同步的脚步中新增这张表的binlog同步配置,将binlog数据写入用于增量同步的kafka的topic中。
[*]使用DataX将历史数据全量同步到用于批量同步的kafka的topic中。
[*]创建用于同步历史数据到StarRocks表中的Connector,消费批量topic中的数据。
[*]根据DataX返回的同步数据量和StarRocks中已经接收到的数据量进行比对,如果一致则表明历史数据已经全部同步完成,此时可以删除删除用于历史数据同步的topic和Connector,也可以保留不管。
[*]创建用于增量同步的Connector,消费binlog数据,实时接入StarRocks。
六、具体的实现
[*]DataX的kafkawriter实现
public class KafkaWriter extends Writer { public static class Job extends Writer.Job { private static final Logger logger = LoggerFactory.getLogger(Job.class); private Configuration conf = null; @Override public List<Configuration> split(int mandatoryNumber) { List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber); for (int i = 0; i < mandatoryNumber; i++) { configurations.add(conf); } return configurations; } private void validateParameter() { this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE); this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE); } @Override public void init() { this.conf = super.getPluginJobConf(); logger.info("kafka writer params:{}", conf.toJSON()); this.validateParameter(); } @Override public void destroy() { } } public static class Task extends Writer.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); private Producer<String, String> producer; private String fieldDelimiter; private Configuration conf; private Properties props; private AesEncryption aesEncryption; private List<String> columns; @Override public void init() { this.conf = super.getPluginJobConf(); fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null); columns = conf.getList(Key.COLUMN_LIST, new ArrayList<>(), String.class); props = new Properties(); props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS)); props.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。 props.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "5", null)); props.put("retry.backoff.ms", "1000"); props.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null)); props.put("linger.ms", 100); props.put("connections.max.idle.ms", 300000); props.put("max.in.flight.requests.per.connection", 5); props.put("socket.keepalive.enable", true); props.put("key.serializer", conf.getUnnecessaryValue(Key.KEYSERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null)); props.put("value.serializer", conf.getUnnecessaryValue(Key.VALUESERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null)); producer = new KafkaProducer<String, String>(props); String encryptKey = conf.getUnnecessaryValue(Key.ENCRYPT_KEY, null, null); if(encryptKey != null){ aesEncryption = new AesEncryption(encryptKey); } } @Override public void prepare() { AdminClient adminClient = AdminClient.create(props); ListTopicsResult topicsResult = adminClient.listTopics(); String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE); try { if (!topicsResult.names().get().contains(topic)) { new NewTopic( topic, Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)), Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null)) ); List<NewTopic> newTopics = new ArrayList<NewTopic>(); adminClient.createTopics(newTopics); } adminClient.close(); } catch (Exception e) { throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription()); } } @Override public void startWrite(RecordReceiver lineReceiver) { logger.info("start to writer kafka"); Record record = null; while ((record = lineReceiver.getFromReader()) != null) { if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null) .equalsIgnoreCase(WriteType.TEXT.name())) { producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC), Md5Encrypt.md5Hexdigest(recordToString(record)), aesEncryption ==null ? recordToString(record): JSONObject.toJSONString(aesEncryption.encrypt(recordToString(record)))) ); } else if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null) .equalsIgnoreCase(WriteType.JSON.name())) { producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC), Md5Encrypt.md5Hexdigest(recordToString(record)), aesEncryption ==null ? recordToJsonString(record) : JSONObject.toJSONString(aesEncryption.encrypt(recordToJsonString(record)))) ); } producer.flush(); } } @Override public void destroy() { if (producer != null) { producer.close(); } } /** * 数据格式化 * * @param record * @return */ private String recordToString(Record record) { int recordLength = record.getColumnNumber(); if (0 == recordLength) { return NEWLINE_FLAG; } Column column; StringBuilder sb = new StringBuilder(); for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); sb.append(column.asString()).append(fieldDelimiter); } sb.setLength(sb.length() - 1); sb.append(NEWLINE_FLAG); return sb.toString(); } /** * 数据格式化 * * @param record 数据 * */ private String recordToJsonString(Record record) { int recordLength = record.getColumnNumber(); if (0 == recordLength) { return "{}"; } Map<String, Object> map = new HashMap<>(); for (int i = 0; i < recordLength; i++) { String key = columns.get(i); Column column = record.getColumn(i); map.put(key, column.getRawData()); } return JSONObject.toJSONString(map); } }}进行数据加密的实现:
public class AesEncryption { private SecretKey secretKey; private static final int GCM_TAG_LENGTH = 16; // 16字节 (128位) public AesEncryption(String secretKey) { byte[] keyBytes = hexStringToByteArray(secretKey); this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES"); } public ResultModel encrypt(String data) { try { byte[] nonce = new byte; new SecureRandom().nextBytes(nonce); Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); GCMParameterSpec gcmSpec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce); cipher.init(Cipher.ENCRYPT_MODE, secretKey, gcmSpec); byte[] encryptedBytes = cipher.doFinal(data.getBytes()); return new ResultModel(bytesToHex(nonce), bytesToHex(encryptedBytes)); } catch (Exception e) { throw new RuntimeException(e); } } /** * 将 16 进制字符串转换为字节数组 */ private byte[] hexStringToByteArray(String s) { int len = s.length(); byte[] data = new byte; for (int i = 0; i < len; i += 2) { data = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16)); } return data; } // 将字节数组转换为 16 进制字符串 public static String bytesToHex(byte[] bytes) { StringBuilder hexString = new StringBuilder(); for (byte b : bytes) { String hex = Integer.toHexString(0xff & b); if (hex.length() == 1) hexString.append('0'); hexString.append(hex); } return hexString.toString(); }}
[*]starrocks-connector-for-kafka的实现
package com.starrocks.connector.kafka.transforms;public class DecryptJsonTransformation <R extends ConnectRecord<R>> implements Transformation<R> { private static final Logger LOG = LoggerFactory.getLogger(DecryptJsonTransformation.class); private AesEncryption aesEncryption; private interface ConfigName { String SECRET_KEY = "secret.key"; } public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.SECRET_KEY, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "secret key"); @Override public R apply(R record) { if (record.value() == null) { return record; } String value = (String) record.value(); try { String newValue = aesEncryption.decrypt(value); JSONObject jsonObject = JSON.parseObject(newValue, JSONReader.Feature.UseBigDecimalForDoubles); return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, jsonObject, record.timestamp()); } catch (Exception e) { return record; } } @Override public ConfigDef config() { return CONFIG_DEF; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map); String secretKey = config.getString(ConfigName.SECRET_KEY); aesEncryption = new AesEncryption(secretKey); }}public class AesEncryption { private SecretKeySpec secretKey; public AesEncryption(String secretKey) { byte[] keyBytes = hexStringToByteArray(secretKey); this.secretKey = new SecretKeySpec(keyBytes, "AES"); } public String encrypt(String data) { try { Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); cipher.init(Cipher.ENCRYPT_MODE, secretKey); byte[] encryptedBytes = cipher.doFinal(data.getBytes()); return Base64.getEncoder().encodeToString(encryptedBytes); } catch (Exception e) { throw new RuntimeException(e); } } public String decrypt(String encryptedData) throws Exception { JSONObject jsonMessage = JSONObject.parseObject(encryptedData); byte[] ciphertext = hexStringToByteArray(jsonMessage.getString("ciphertext")); byte[] nonce = hexStringToByteArray(jsonMessage.getString("nonce")); return decryptData(ciphertext, nonce); } /** * 使用 AES-GCM 解密数据 * @param ciphertext 密文 * @param nonce 随机 IV(nonce) * @return 解密后的明文 * @throws Exception */ privateString decryptData(byte[] ciphertext, byte[] nonce) throws Exception { // 创建 GCMParameterSpec 对象,用于解密时的认证标签验证 GCMParameterSpec gcmSpec = new GCMParameterSpec(128, nonce); // 128 位标签 // 创建 AES Cipher 对象,设置为解密模式 Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); cipher.init(Cipher.DECRYPT_MODE, secretKey , gcmSpec); // 解密数据 // 2. 拼接ciphertext和tag byte[] decryptedData = cipher.doFinal(ciphertext); // 返回解密后的明文 return new String(decryptedData); } /** * 将 16 进制字符串转换为字节数组 */ private byte[] hexStringToByteArray(String s) { int len = s.length(); byte[] data = new byte; for (int i = 0; i < len; i += 2) { data = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16)); } return data; }}
[*]Kafka的公网配置
Kafka的内外网配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。
# 配置kafka的监听端口,同时监听9093和9092listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://kafka节点3内网IP:9092# 配置kafka的对外广播地址, 同时配置内网的9093和外网的19092advertised.listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://公网IP:19092# 配置地址协议listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT# 指定broker内部通信的地址inter.broker.listener.name=INTERNAL
[*]Kafka-Connector的部署流程
a. 创建一个目录,来存放connect的
# 创建文件mkdir /opt/kafka-connect # 将connect文件解压到该目录中b. 修改kafka的配置文件config/connect-distributed.properties(以1台为例)
# 配置kafka的地址信息bootstrap.servers=192.168.20.41:9093,192.168.20.42:9093,192.168.20.43:9093# 配置connect的地址plugin.path=/vmimg/opt/kafka-connect/starrocks-kafka-connectorc. 启动connect(以1台为例)
nohup bin/connect-distributed.sh config/connect-distributed.properties > start_connect.log 2>&1 &
[*]监听数据库binlog文件并加密发送到kafka
import osimport jsonimport binasciiimport loggingimport refrom typing import List, Dictfrom datetime import datetime, date, timedeltafrom decimal import Decimalfrom Crypto.Cipher import AESfrom pymysqlreplication import BinLogStreamReaderfrom pymysqlreplication.row_event import BINLOGfrom kafka import KafkaProducerlogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s', filename='mzt-stream-rds1.log', filemode='w')class TableConfig: """ 表配置信息 """ def __init__(self, key: str, topic: str, need_table: bool, complete_regex: bool, regex: str, column_mapping: dict ): """ :param key:表的唯一id :param topictopic名单 :param need_table是否需要将表名称作为一个字段写入表,用于解决分库分别问题 :param complete_regex是否完全匹配还是正则匹配 :param regex匹配表的正则表达式 :param column_mapping列的对应关系 """ self.key = key self.topic = topic self.need_table = need_table self.complete_regex = complete_regex self.regex = regex self.column_mapping = column_mappingclass TableConfigReader: """ 解析表的配置文件 """ def __init__(self, directory_path: str): """ :param directory_path 配置文件的目录 """ self.directory_path = directory_path # 配置列表 self.table_config_list: List = [] def read(self): """ 读取所有的配置文件,转换为配置列表 """ entries = os.listdir(self.directory_path) # 过滤出所有文件 files = logging.info(f"读取配置文件数量:{len(files)}") for file in files: file_path = os.path.join(self.directory_path, file) with open(file_path, 'r', encoding='utf-8') as f: content = f.read() json_data = json.loads(content) self.table_config_list.append(TableConfig(json_data['key'], json_data['topic'], json_data['need_table'], json_data['complete_regex'],json_data['regex'], json_data['column_mapping']))class PrefixTrie: """ 用于匹配表名称 """ def __init__(self, complete_regex_map: Dict, not_complete_list:List): """ :param complete_regex_map 完全匹配的表配置信息字典 :param not_complete_list 正则匹配的列表 """ self.complete_regex_map = complete_regex_map self.not_complete_list = not_complete_list def search(self, text): if text in self.complete_regex_map.keys(): # 完全匹配 return self.complete_regex_map for data in self.not_complete_list: # 正则匹配 match = re.match(data.regex, text) if match: return data return Noneclass MysqlConfig: """ MYSQL的连接 """ def __init__(self, host:str, port:int, user:str, password:str, service_id:int): """ :param host 数据库的host :param port 数据库的port :param user 数据库的user :param password 数据库的password :param service_id 数据库的server_id,用于binlog同步 """ self.host = host self.port = port self.user = user self.password = password self.service_id = service_idclass KafkaBinlogStreamer: """ 真正的binlog消费 """ def __init__(self, kafka_server: str, mysql_config:MysqlConfig, trie_tree: PrefixTrie, aes_key): """ :param kafka_serverkafka的地址 :param mysql_configMYSQL的配置信息 :param trie_tree名称匹配的信息 :param aes_key数据加密的秘钥 """ self.producer = self._init_kafka_producer(kafka_server) self.mysql_config = mysql_config self.stream = None self.trie_tree = trie_tree self.aes_key = aes_key def _init_kafka_producer(self, kafka_server): """ 初始化 Kafka Producer :param kafka_server kafka的地址 """ producer = KafkaProducer( bootstrap_servers=kafka_server, batch_size=16384,# 批量发送的大小(字节) linger_ms=100,# 等待时间(毫秒),等待更多消息达到 batch_size max_request_size=10485760,# 最大请求大小(字节) acks=1,# 等待所有副本的确认 retries=3,# 重试次数 value_serializer=lambda v: json.dumps(v, default=self.json_serial, ensure_ascii=False).encode('utf-8') ) logging.info("Kafka producer initialized.") return producer @staticmethod def json_serial(obj): """ JSON serializer for objects not serializable by default json code """ if isinstance(obj, (datetime, date)): return obj.strftime('%Y-%m-%d %H:%M:%S') if isinstance(obj, Decimal): return float(obj) if isinstance(obj, bytes): return obj.decode('utf-8') if isinstance(obj, timedelta): # 将 timedelta 类型转换为字符串格式 return str(obj) if isinstance(obj, dict): return {KafkaBinlogStreamer.json_serial(k): KafkaBinlogStreamer.json_serial(v) for k, v in obj.items()} # 处理列表类型 if isinstance(obj, list): return logging.warning(f"Type '{obj.__class__}' for '{obj}' not serializable") return None @staticmethod def convert_bytes_keys_to_str(data): """ 递归转换字典中的 bytes 键为 str """ if isinstance(data, dict): # 将字典中的 bytes 类型的键转换为 str return { (k.decode('utf-8') if isinstance(k, bytes) else k): KafkaBinlogStreamer.convert_bytes_keys_to_str(v) for k, v in data.items() } elif isinstance(data, list): # 对列表中的每个元素递归转换 return elif isinstance(data, bytes): # 对 bytes 类型的值进行转换 return data.decode('utf-8') elif isinstance(data, datetime): # 将 datetime 类型转换为 ISO 格式字符串 return data.strftime('%Y-%m-%d %H:%M:%S') else: return data def build_message(self, binlog_evt, trie_tree: PrefixTrie): """ 构建消息 :param binlog_evt binlog事件 :param trie_tree 匹配树 """ schema = str(f"{getattr(binlog_evt, 'schema', '')}.{getattr(binlog_evt, 'table', '')}") table_name = str(f"{getattr(binlog_evt, 'table', '')}") # 获取配置 table_config = trie_tree.search(schema) if table_config is None: return None topic = table_config.topic if binlog_evt.event_type == BINLOG.WRITE_ROWS_EVENT_V1: # Insert data_rows = binlog_evt.rows data_list = [] for data_row in data_rows: data_list.append(self._map_columns(self.convert_bytes_keys_to_str(data_row['values']), table_name, table_config, 0)) return {'event': 'INSERT', 'headers': {'topic': topic}, 'data_list': data_list} elif binlog_evt.event_type == BINLOG.UPDATE_ROWS_EVENT_V1: # Update data_rows = binlog_evt.rows data_list = [] for data_row in data_rows: data_list.append(self._map_columns(self.convert_bytes_keys_to_str(data_row['after_values']), table_name, table_config, 0)) return {'event': 'INSERT', 'headers': {'topic': topic}, 'data_list': data_list} elif binlog_evt.event_type == BINLOG.DELETE_ROWS_EVENT_V1: # Delete data_rows = binlog_evt.rows data_list = [] for data_row in data_rows: data_list.append(self._map_columns(self.convert_bytes_keys_to_str(data_row['values']), table_name, table_config, 1)) return {'event': 'DELETE', 'headers': {'topic': topic}, 'data_list': data_list} return None @staticmethod def _map_columns(values, table_name: str, table_config: TableConfig, op_data: int): """ 对列名进行映射 :param values 数据 :param table_name 表名称 :param table_config 表的配置 :param op_data op操作 """ column_mapping = table_config.column_mapping need_table = table_config.need_table mapped_values = {} for column, value in values.items(): # 如果列名在映射字典中,则替换为映射的列名 if column in column_mapping: mapped_column = column_mapping.get(column) mapped_values = value if need_table: mapped_values['table_name'] = table_name mapped_values['__op'] = op_data return mapped_values def encrypt_data(self, data): """ 使用 AES-GCM 加密数据 :param data 待加密数据 """ cipher = AES.new(self.aes_key, AES.MODE_GCM) ciphertext, tag = cipher.encrypt_and_digest(data.encode('utf-8')) return { 'nonce': cipher.nonce.hex(), 'ciphertext': ciphertext.hex()+ tag.hex() } def start_stream(self): """ 开始监听 MySQL Binlog 流 """ logging.info("Starting binlog stream...") mysql_settings = { 'host': self.mysql_config.host, 'port': self.mysql_config.port, 'user': self.mysql_config.user, 'password': self.mysql_config.password, } self.stream = BinLogStreamReader( connection_settings=mysql_settings, server_id=self.mysql_config.service_id, resume_stream=True, blocking=True, # only_events= ) try: for evt in self.stream: msg = self.build_message(evt, self.trie_tree) if msg: topic = msg['headers']['topic'] data_list = msg['data_list'] for data_row in data_list: try: self.producer.send(topic, value=self.encrypt_data(json.dumps(data_row,default=self.json_serial, ensure_ascii=False))) except Exception as e: logging.info(e) logging.info(topic) logging.info(data_row) raise e except KeyboardInterrupt: logging.info("Binlog stream interrupted by user.") finally: self.close() def close(self): """ 关闭资源 """ if self.stream: self.stream.close() logging.info("Binlog stream closed.") self.producer.close() self.producer.flush() logging.info("Kafka producer closed.")if __name__ == "__main__": # 读取表配置 BASE_PATH = "/opt/py38/data-job-stream" CONFIG_PATH = BASE_PATH + "/" + "config/rds1" table_config_data_list = TableConfigReader(CONFIG_PATH) table_config_data_list.read() complete_regex_table_dict = {} not_complete_regex_table_dict_list = [] for table_config in table_config_data_list.table_config_list: if table_config.complete_regex: complete_regex_table_dict = table_config else: not_complete_regex_table_dict_list.append(table_config) # 构建 Trie 树 trie = PrefixTrie(complete_regex_table_dict, not_complete_regex_table_dict_list) # 配置参数 KAFKA_SERVER = "ip:19092,ip2:19093,ip3:19094" mysql_config = MysqlConfig("*.mysql.rds.aliyuncs.com", 3306, "username", "password", 100) # 对称加密的秘钥 hex_key = "6253c3d*************8b5deba549" key_bytes = binascii.unhexlify(hex_key) # 创建并启动 Kafka Binlog Streamer streamer = KafkaBinlogStreamer(KAFKA_SERVER, mysql_config, trie, key_bytes) streamer.start_stream()脚本中依赖的版本信息
kafka_python==2.0.2mysql_replication==0.45.1pycryptodome==3.21.0七、配套生态脚本
[*]批量与增量配置文件的生成
在同步一张新表的时候,可以修改改脚本中的RDS数据库的信息,运行该脚本,自动生成各个数据同步步骤的配置文件和脚本信息。
如果 数据库名称:test 表名称为:test_1,StarRocks中表名称为:ods_test_1,运行该脚本之后会生成如下的文件
[*]test.test_1.json 该文件是用于binlog同步的配置文件。
[*]mzt_ods_cjm_all.test_1_connect.json 该文件是用于历史数据批量同步的KafkaConnector配置。
[*]mzt_ods_cjm_all.test_1_datax_config.json 该文件是用于历史数据批量同步的DataX配置。
[*]mzt_ods_cjm_all.test_1_datax_shell.sh 该文件是用于执行DataX任务的启动脚本。
[*]mzt_ods_cjm_stream.ods_test_1-connect.json 该文件是用于增量数据同步的KafkaConnector配置。
[*]ods_test_1_create_table.sql 该文件是用于在StarRocks中建表的SQL脚本文件。
import osimport shutilimport pymysqlimport reimport jsonclass MySQLMATEDATA(): """ mysql 列的元数据信息 """ def __init__(self, column_name: str, is_nullable: str, data_type: str,character_maximum_length: int,column_key: int, numeric_precision: int, comment: str): """ 初始化 :param column_name 列名称 :param is_nullable 是否可以为空 :param data_type 字段类型 :param character_maximum_length 字符最大长度 :param column_key 键 :param numeric_precision 数字精度 :param comment 注释 """ self.column_name = column_name self.new_column_name = MySQLMATEDATA.camel_to_snake(self.column_name) self.is_nullable = True if is_nullable == 'NO' else False self.data_type = data_type self.character_maximum_length = character_maximum_length self.column_key = column_key self.numeric_precision = numeric_precision self.is_primary_key = True if column_key == 'PRI' else False self.comment = comment def transform_to_starrcocks(self) -> str: """ 转换为StarRocks的列 :return: StarRocks的列 """ column_str = self.new_column_name if self.data_type == 'timestamp': column_str += ' DATETIME ' else: if self.character_maximum_length is not None: column_str += ' ' + self.data_type + '( ' + str(self.character_maximum_length * 3) + ") " elif self.numeric_precision is not None: column_str += ' ' + self.data_type + '( ' + str(self.numeric_precision + 1) + ') ' else: column_str += ' ' + self.data_type + ' ' if self.is_primary_key: column_str += ' NOT NULL ' if self.comment is not None: column_str += ' COMMENT "' + self.comment + '"' return column_str + ',' def transform_to_datax(self) -> str: """ 转换列名称为 DATAX 需要的列 :return: 新的列 """ type= str(self.data_type).lstrip().lower() if type == 'datetime' or type == 'timestamp': return 'DATE_FORMAT('+ self.column_name + ', \'%Y-%m-%d %H:%i:%s\') AS ' + self.new_column_name eliftype == 'date' : return 'DATE_FORMAT(' + self.column_name + ', \'%Y-%m-%d\') AS ' + self.new_column_name return self.column_name @staticmethod def camel_to_snake(name): """ 在大写字母前面加上下划线,并转换为小写 :param name: 列名称 :return: 新的列名称 """ snake_case = re.sub(r'(?<!^)(?=)', '_', name).lower() return snake_caseclass ConfigGeneral(object): """ 数据同步相关的配置文件生成 """ def __init__(self, database: str, table: str, topic: str, n_table: str, s_topic: str, mysql_host: str, mysql_port: int, mysql_user: str, mysql_passwd: str): """ 初始化 :param database mysql数据库名称 :param table mysql表名称 :param topic 批量同步的topic名称 :param n_table StarRocks表名称 :param s_topic 流式同步的topic的名称 :param mysql_host mysql的host :param mysql_port mysql的port :param mysql_user mysql的用户名 :param mysql_passwd mysql的密码 """ self.database = database self.table = table self.topic = topic self.n_table = n_table self.s_topic = s_topic self.mysql_host = mysql_host self.mysql_port = mysql_port self.mysql_user = mysql_user self.mysql_passwd = mysql_passwd self.column_list = [] self.jdbc_url = 'jdbc:mysql://{}:{}/{}?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false'.format(self.mysql_host, self.mysql_port, self.database) def search_column_name(self): """ 查询mysql中表的元数据 :return: 元数据列表 """ sql ="""SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, COLUMN_KEY, NUMERIC_PRECISION, COLUMN_COMMENT FROM information_schema.`COLUMNS` WHERE table_schema='{}' AND table_name = '{}' ORDER BY ORDINAL_POSITION ASC""".format(self.database, self.table) table_name_list = [] conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user, passwd=self.mysql_passwd, db=self.database, charset='utf8', connect_timeout=200, autocommit=True, read_timeout=2000 ) with conn.cursor() as cursor: cursor.execute(query=sql) while 1: res = cursor.fetchone() if res is None: break table_name_list.append(MySQLMATEDATA(res, res, res, res, res, res, res)) self.column_list = table_name_list conn.close() def create_all_datax_config(self) -> str: """ 生成DATAX的配置文件 :return: 配置文件地址 """ datax_config ={ "job":{ "setting" :{ "speed":{ "channel":1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "", "password": "", "column": [], "connection": [ { "table": [], "jdbcUrl": [] } ], } }, "writer": { "name": "kafkawriter", "parameter": { "bootstrapServers": "IP1:19092,IP2:19093,IP3:19094", "topic": "", "ack": "all", "batchSize": 1000, "retries": 3, "keySerializer": "org.apache.kafka.common.serialization.StringSerializer", "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer", "fieldDelimiter": ",", "writeType": "json", "topicNumPartition": 1, "topicReplicationFactor": 1, "encryptionKey": "6253c3************a549", "column": [] } } } ] } } new_column_name_list =[] new_column_name_list1 =[] for column in self.column_list: new_column_name = column.new_column_name new_column_name_list.append(column.transform_to_datax()) new_column_name_list1.append(new_column_name) datax_config['job']['content']['reader']['parameter']['column'] = new_column_name_list datax_config['job']['content']['writer']['parameter']['column'] = new_column_name_list1 datax_config['job']['content']['reader']['parameter']['connection']['table'] = datax_config['job']['content']['reader']['parameter']['connection']['jdbcUrl'] = datax_config['job']['content']['writer']['parameter']['topic'] = self.topic with open('config/' + self.topic + '_datax_config.json', 'w', encoding='utf-8') as f: f.write(json.dumps(datax_config, ensure_ascii=False, indent=2)) return self.topic + '_datax_config.json' def create_all_datax_shell(self, config_path: str): """ 生成DATAX的执行脚本文件 :param config_path: 配置文件的路径 :return: 脚本文件路径 """ text = """python3 /opt/datax-k/bin/datax.py {} """.format(config_path) with open('config/' + self.topic + '_datax_shell.sh', 'w', encoding='utf-8') as f: f.write(text) return self.topic + '_datax_shell.sh' def create_all_connect(self) -> str: """ 生成全量同步的kafka-connect的配置文件 :return:配置文件路径 """ connect_config = { "name": "", "config": { "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector", "topics": "", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "false", "starrocks.http.url": "IP1:8050,IP2:8050,IP3:8050", "starrocks.topic2table.map": "", "starrocks.username": "", "starrocks.password": "", "starrocks.database.name": "ods_cjm", "sink.properties.strip_outer_array": "true", "sink.properties.columns": "", "sink.properties.jsonpaths": "", "transforms": "decrypt", "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation", "transforms.decrypt.secret.key": "6253****************a549" } } connect_config['name'] = self.topic + "-connect" connect_config['config']['topics'] = self.topic connect_config['config']['starrocks.topic2table.map'] = self.topic + ":" + self.n_table connect_config['config']['sink.properties.columns'] = ",".join(list(map(lambda x: x.new_column_name ,self.column_list))) connect_config['config']['sink.properties.jsonpaths'] = '['+ (",".join(list(map(lambda x : ("\"$." + x.column_name+ "\"") ,self.column_list)))) + "]" with open('config/' + self.topic + '_connect.json', 'w', encoding='utf-8') as f: f.write(json.dumps(connect_config, ensure_ascii=False, indent=2)) return self.topic + '_connect.json' def search_table_create_sql(self): """ 生成StarRocks的建表语句 :return: 文件路径 """ table_sql_list = " CREATE TABLE " + self.n_table +" (" + "\n" primary_key = '' for column in self.column_list: table_sql_list= table_sql_list + column.transform_to_starrcocks() +"\n" if column.is_primary_key: primary_key = column.column_name table_sql_list = table_sql_list +")\n" table_sql_list = table_sql_list +"PRIMARY KEY ("+ primary_key+")\n" table_sql_list = table_sql_list +"DISTRIBUTED BY HASH ("+ primary_key+");\n" with open('config/' +self.n_table + '_create_table.sql', 'w', encoding='utf-8') as f: f.write(table_sql_list) return self.n_table + '_create_table.sql' def create_stream_config(self): """ 生成mysql binlog同步的配置文件 :return: 文件路径 """ config = { "key": "", "topic": "", "need_table": False, "complete_regex": True, "regex": "", "column_mapping": {} } column_mapping = {} for column in self.column_list: column_mapping = column.new_column_name config['column_mapping'] = column_mapping config['key'] = self.database + '.' + self.table config['topic'] = self.s_topic with open('config/' + self.database + '.' + self.table + '.json', 'w', encoding='utf-8') as f: f.write(json.dumps(config, ensure_ascii=False, indent=2)) def create_stream_connect(self): """ 生成流式同步的kafka-connector的配置文件 :return: 文件路径 """ connect_config = { "name": "", "config": { "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector", "topics": "", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "false", "starrocks.http.url": "IP1:8050,IP2:8050,IP3:8050", "starrocks.topic2table.map": "", "starrocks.username": "", "starrocks.password": "", "starrocks.database.name": "ods_cjm", "sink.properties.strip_outer_array": "true", "sink.properties.columns": "", "sink.properties.jsonpaths": "", "transforms": "decrypt", "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation", "transforms.decrypt.secret.key": "6253********549" } } connect_config['name'] = self.s_topic + "-connect" connect_config['config']['topics'] = self.s_topic connect_config['config']['starrocks.topic2table.map'] = self.s_topic + ":" + self.n_table connect_config["config"]["sink.properties.columns"] = ",".join(list(map(lambda x : x.new_column_name ,self.column_list))) +",__op" connect_config["config"]["sink.properties.jsonpaths"] = '['+ (",".join(list(map(lambda x : ("\"$." + x.new_column_name+ "\"") ,self.column_list)))) + ",\"$.__op\"]" with open('config/' + self.s_topic + '-connect.json', 'w', encoding='utf-8') as f: f.write(json.dumps(connect_config, ensure_ascii=False, indent=2)) return self.s_topic + '_connect.json'def delete_all_files_in_folder(folder_path): """ 删除某个文件夹下面的所有文件 :param folder_path: 文件夹路径 :return: NONE """ # 检查文件夹是否存在 if not os.path.exists(folder_path): print("文件夹不存在") return # 遍历文件夹中的所有文件和子文件夹 for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) try: # 如果是文件,则删除 if os.path.isfile(file_path) or os.path.islink(file_path): os.remove(file_path) print(f"删除文件: {file_path}") # 如果是子文件夹,则删除子文件夹及其内容 elif os.path.isdir(file_path): shutil.rmtree(file_path) print(f"删除文件夹及其内容: {file_path}") except Exception as e: print(f"删除时出错: {file_path},错误信息: {e}")if __name__ == '__main__': delete_all_files_in_folder("config") DATABASE = 'hydra_marketing' TABLE ='t_data_overview' TOPIC ='mzt_ods_cjm_all.' + TABLE NEW_TABLE = 'ods_marketing_t_data_overview' STREAM_TOPIC = "mzt_ods_cjm_stream." + NEW_TABLE MYSQL_HOST = "" MYSQL_PORT = 3306 USER_NAME = "" PASSWD="" config = ConfigGeneral(DATABASE,TABLE, TOPIC, NEW_TABLE, STREAM_TOPIC, MYSQL_HOST, MYSQL_PORT, USER_NAME, PASSWD) config.search_column_name() config.search_table_create_sql() config_path = config.create_all_datax_config() config.create_all_datax_shell(config_path) config.create_all_connect() config.create_stream_config() config.create_stream_connect()
[*]Kafka-Connector操作脚本
该脚本包含了Kafka Connector操作的各个API,可以很方便的进行Kafka Connector相关的操作或者各个任务的状态查询。
import jsonimport requestsfrom typing import List, Mappingclass KafkaConnectAll: """ KAFKA Connect 相关操作 """ def __init__(self, base_url: str): """ 初始化 :param base_url: kafka-connector的地址 """ self.base_url = base_url def query_all(self) -> List: """ 查询全部的connector :return: connector名称列表 """ url = self.base_url + '/connectors' data_json = requests.get(url).json() for data in data_json: print(data) return data_json def delete_connector(self, connector_name: str): """ 删除指定的connector :param connector_name: connector名称 :return: None """ url = self.base_url + '/connectors/' + connector_name requests.delete(url) def query_status(self, connector_name: str) -> Mapping: """ 查询指定connector的状态 :param connector_name: connector名称 :return: 状态信息 """ url = self.base_url + '/connectors/' + connector_name + '/status' result = requests.get(url) connect_state = result.json()['connector']['state'] task_states = [] for task in result.json()['tasks']: task_states.append({ 'id': task['id'], 'state': task['state'], }) print("connector状态", connect_state) print("tasks状态", task_states) return { "connector_status": connect_state, "task_states": task_states } def create_connector(self, connector_config: json): """ 创建connector :param connector_config: 配置文件 :return: NONE """ url = self.base_url + '/connectors' headers = {"Content-Type": "application/json"} try: # 发送 POST 请求创建 Connector response = requests.post(url, headers=headers, data=json.dumps(connector_config)) if response.status_code == 201: print("Connector 创建成功") print(response.json()) elif response.status_code == 409: print("Connector 已经存在") else: print(f"Connector 创建失败,状态码: {response.status_code}") print(response.json()) except Exception as e: print(f"请求失败: {e}") def query_connector(self, connector_name: str) -> json: """ 查询指定的connector :param connector_name: connector名称 :return: 内容 """ url = self.base_url + '/connectors/' + connector_name result = requests.get(url).json() print(json.dumps(result, indent=4)) return result def query_connector_config(self, connector_name: str) -> json: """ 查询指定connector的配置文件 :param connector_name: connector名称 :return: 配置 """ url = self.base_url + '/connectors/' + connector_name + "/config" result = requests.get(url).json() print(json.dumps(result, indent=4)) return result def update_connector_config(self, connector_name: str, connector_config: json): """ 修改指定connector的配置 :param connector_name: 指定connector名称 :param connector_config: 配置 :return: NONE """ url = self.base_url + '/connectors/' + connector_name + "/config" headers = {"Content-Type": "application/json"} try: # 发送 POST 请求创建 Connector response = requests.put(url, headers=headers, data=json.dumps(connector_config)) if response.status_code == 201: print("Connector 更新成功") print(response.json()) elif response.status_code == 409: print("Connector 已经存在") else: print(f"Connector 更新失败,状态码: {response.status_code}") print(response.json()) except Exception as e: print(f"请求失败: {e}") def query_connectors_task(self, connector_name: str) -> List: """ 查询指定connector的task列表 :param connector_name: 指定connector名称 :return: taskId列表 """ url = self.base_url + '/connectors/' + connector_name + '/tasks' result = requests.get(url).json() print(json.dumps(result, indent=4)) task_id = [] for task in result: task_id.append(task['id']['task']) return task_id def query_connectors_tasks_status(self, connector_name: str, task_id: int) -> json: """ 查询task的状态 :param connector_name: 指定connector的名称 :param task_id: task id :return: 结果 """ url = self.base_url + '/connectors/' + connector_name + '/tasks/' + str(task_id) + '/status' result = requests.get(url).json() print(json.dumps(result, indent=4)) return result def pause_connector(self, connector_name: str): """ 暂停connector :param connector_name: connector 名称 :return: NONE """ url = self.base_url + '/connectors/' + connector_name + '/pause' requests.put(url).json() def resume_connector(self, connector_name: str): """ 恢复 :param connector_name: connector名称 :return: NONE """ url = self.base_url + '/connectors/' + connector_name + '/resume' requests.put(url).json() def restart_connector(self, connector_name: str): """ 重启 :param connector_name: connector名称 :return: NONE """ url = self.base_url + '/connectors/' + connector_name + '/restart' requests.post(url).json() def restart_connector_task(self, connector_name: str, task_id: int): """ 重启task :param connector_name: connector名称 :param task_id: task id :return: NONE """ url = self.base_url + '/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart' requests.post(url).json()if __name__ == '__main__': base_url = 'http://IP:8083' kafka_connector_all = KafkaConnectAll(base_url) kafka_connector_all.query_connectors_tasks_status('user_sys_org-connect', 0)
[*]StarRocks表最新日期检测脚本
该脚本用于检测StarRocks各个表中的最新的数据的时间,可以用于判断当前数据同步是否正常时使用。
import pymysqlfrom typing import Tupleimport jsonclass StarRocksTableCheck: """ starrocks表数据最新日期检测 """ def __init__(self, host: str, port: int, user: str, password: str, database: str): """ 初始化 :param host: 数据库host :param port: 数据库端口 :param user: 数据源用户名称 :param password: 数据库用户密码 :param database: 数据库名称 """ self.host = host self.port = port self.user = user self.password = password self.database = database # 数据库连接 self.conn = None def connect(self) -> None: """ 创建连接 :return: None """ self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.password, db=self.database, charset='utf8', connect_timeout=200, autocommit=True, read_timeout=2000 ) def close(self) -> None: """ 关闭数据库连接 :return: NONE """ self.conn.close() def query(self, table: str, time_filed: str) -> Tuple: """ 查询最大日期 :param table: 数据表名称 :param time_filed: 时间字段名称 :return: 表名称-最新时间 """ sql = "SELECT MAX({}) AS MAX_TIME FROM {}".format(time_filed, table) with self.conn.cursor() as cursor: cursor.execute(query=sql) while 1: res = cursor.fetchone() if res is None: break print('{} {}'.format(table, res)) return str(res), str(res)class ReaderFile: """ 读取文件 """ def __init__(self,file_path: str): """ 初始化 :param file_path: 文件路径 """ self.file_path = file_path def read_file_content(self) -> str: """ 读取文件内容 :return: 文件内容 """ with open(self.file_path, 'r', encoding='utf-8') as file: content = file.read() return contentif __name__ == '__main__': starRocksTableCheck = StarRocksTableCheck("ip", 9030, "username", "password", "ods_cjm") starRocksTableCheck.connect() try: content = ReaderFile(r'E:\pycode\python_scripts_new\超级码\stream\config\starrocks_table.json').read_file_content() for table in json.loads(content): table_name = table['TABLE_NAME'] column_name = table['COLUMN_NAME'] starRocksTableCheck.query(table_name, column_name) finally: starRocksTableCheck.close()表信息的配置文件,配置需要检测的表和对应的时间字段。
[{ "TABLE_NAME": "ods_codemanager_codeapply", "COLUMN_NAME": "create_date"},{ "TABLE_NAME": "ods_t_integral_account", "COLUMN_NAME": "update_time"}]
[*]增量同步任务检测脚本
该脚本用于检测当前的数据同步任务脚本是否正常运行,未运行可以直接启动脚本,可以配置crontab实现服务的异常终止直接启动操作,可以加入消息告警。
#!/bin/bash# 要检测的 Python 脚本列表PROCESS_LIST=("mzt-transform-stream-kafka-rds1.py" "mzt-transform-stream-kafka-rds2.py")# 启动命令列表START_CMD_LIST=( "nohup python3 mzt-transform-stream-kafka-rds1.py > /opt/py38/data-job-stream/nohup1.out 2>&1 &" "nohup python3 mzt-transform-stream-kafka-rds2.py > /opt/py38/data-job-stream/nohup2.out 2>&1 &")# 目录BASE_PATH="/opt/py38/data-job-stream"# 虚拟环境路径VENV_PATH="/opt/py38/bin/activate"# 检测进程是否存在check_process() { local process_name="$1" if pgrep -f "$process_name" > /dev/null; then echo "$(date) - 进程 '$process_name' 正在运行。" return 0 else echo "$(date) - 进程 '$process_name' 未运行。" return 1 fi}# 启动进程start_process() { local start_cmd="$1" echo "$(date) - 正在启动命令: $start_cmd" # 进入目录 cd "$BASE_PATH" || exit 1 # 激活虚拟环境 source "$VENV_PATH" # 执行启动命令 eval "$start_cmd" if [ $? -eq 0 ]; then echo "$(date) - 命令启动成功。" else echo "$(date) - 命令启动失败!" fi}# 主逻辑for index in "${!PROCESS_LIST[@]}"; do process_name="${PROCESS_LIST[$index]}" start_cmd="${START_CMD_LIST[$index]}" echo "----- 检测进程:$process_name -----" if ! check_process "$process_name"; then start_process "$start_cmd" fidone
[*]增量同步的配置文件示例
这是一个用于增量数据同步的配置文件,其配置了具体的某张表的增量数据同步规则。
{"key": "app.device","topic": "mzt_ods_cjm_stream.ods_device","need_table": false,"complete_regex": true,"regex": "","column_mapping": { "Id": "id", "CompanyName": "company_name", "CompanyId": "company_id", "SecretKey": "secret_key", "Brand": "brand", "ModelType": "model_type", "Enable": "enable", "CreateTime": "create_time", "UpdateTime": "update_time"}}KEY说明key表的唯一id,用于匹配binlog日志,组成为库.表 或者 模糊匹配开头topic数据写入的kafka的topic名称need_table列中是否需要加入表明,如何为true,则列中会加入一个字段table_name,值为当前RDS中表的名称,用于解决分库分表的问题complete_regex是否完整匹配表名称,如果为true,则根据key完整匹配表名称,用于解决分库分表的问题, 如果为false,则根据regex的值进行正常匹配regex匹配binlog表的正则表达式column_mapping表的列字段的映射,key为RDS中表字段名称。value为StarRocks中表字段的名称
[*]kafkaConnector的配置示例
{ "name": "mzt_ods_cjm_stream.ods_device-connect", "config": { "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector", "topics": "mzt_ods_cjm_stream.ods_device", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "false", "starrocks.http.url": "IP1:8050,IP2:8050,IP3:8050", "starrocks.topic2table.map": "mzt_ods_cjm_stream.ods_device:ods_device", "starrocks.username": "", "starrocks.password": "", "starrocks.database.name": "ods_cjm", "sink.properties.strip_outer_array": "true", "sink.properties.columns": "id,company_name,company_id,secret_key,brand,model_type,enable,create_time,update_time,__op", "sink.properties.jsonpaths": "[\"$.id\",\"$.company_name\",\"$.company_id\",\"$.secret_key\",\"$.brand\",\"$.model_type\",\"$.enable\",\"$.create_time\",\"$.update_time\",\"$.__op\"]", "transforms": "decrypt", "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation", "transforms.decrypt.secret.key": "6253******a549" }KEY说明namekafka connector的唯一名称config.connector.class连接器 默认值config.topics要消费的topic列表,多个使用,分隔config.key.converterkey的转换器,保持默认config.value.convertervalue的转换器,保持默认config.key.converter.schemas.enable是否需要转换keyconfig.value.converter.schemas.enable是否需要转换valueconfig.starrocks.http.urlStarRocks用于streamLoad的地址config.starrocks.topic2table.maptopic与表的映射, 格式为 topic名称:表名, 多个直接使用,分隔config.starrocks.usernameStarRocks的用户名config.starrocks.passwordStarRocks的密码config.starrocks.database.nameStarRocks数据库的名称config.sink.properties.strip_outer_array是否展开JSON数组config.sink.properties.columns列字段的列表config.sink.properties.jsonpathsJSON字段的列表,可列字段的列表一一对应config.transforms数据的转换器config.transforms.decrypt.type转换器的实现类config.transforms.decrypt.secret.key数据解密的秘钥八、备注
[*]python-mysql-replicationpython实现的用于binlog同步的库。
[*]starrocks-connector-for-kafkaKafka Connector是StarRocks数据源连接器
[*]DataX批量数据同步工具
[*]kafka-console-ui Kakfa可视化控制台
[*]StarRocks-kafka-Connector通过kafkaConnector导入数据到StarRocks
[*]StreamLoad实现数据增删改
[*]Kafka Connector的API列表
方法路径说明GET/connectors返回活动连接器的列表POST/connectors创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象GET/connectors/获取有关特定连接器的信息GET/connectors/{name}/config获取特定连接器的配置参数PUT/connectors/{name}/config更新特定连接器的配置参数GET/connectors/{name}/status获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态GET/connectors/{name}/tasks获取当前为连接器运行的任务列表GET/connectors/{name}/tasks/{taskid}/status获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息PUT/connectors/{name}/pause暂停连接器及其任务,停止消息处理,直到连接器恢复PUT/connectors/{name}/resume恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)POST/connectors/{name}/restart重新启动连接器(通常是因为失败)POST/connectors/{name}/tasks/{taskId}/restart重启个别任务(通常是因为失败)DELETE/connectors/删除连接器,停止所有任务并删除其配置
[*]加密算法参考
加密类型推荐算法优点缺点适用场景对称加密AES-GCM, ChaCha20高效,支持硬件加速需要安全管理密钥大数据量加密、文件加密、流加密非对称加密RSA, ECDSA无需共享密钥,高安全性速度慢,不适合大数据加密密钥交换、数字签名流加密ChaCha20高效,低延迟不适合文件加密实时通信、视频流加密哈希算法SHA-256, BLAKE3不可逆,速度快不能用于加解密数据校验、数字签名九、踩坑记录
[*]python虚拟环境pip报错,没有SSL模块。
解决:使用支持http的pip源进行安装
pip3 install pymysql -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
[*]DATAX同步时间戳字段,Kafka中为数字,无法写入StarRocks的datetime类型字段中。
解决:在DATAX的同步字段映射中,使用DATA_FORMAT将其转换为字符串。
"column": [ "id", "name", "DATE_FORMAT(timestamp_column, '%Y-%m-%d %H:%i:%s') AS timestamp_column"],
[*]AES-GCM在python端和Java端的实现问题。
解决:在python中加密会生成(nonce, chiphertext, tag)三元组信息,但是Java中解密会报错,在python中将 ciphertext和tag拼接起来,在Java中可以直接解密。
def encrypt_data(self, data): """使用 AES-GCM 加密数据""" cipher = AES.new(self.aes_key, AES.MODE_GCM) ciphertext, tag = cipher.encrypt_and_digest(data.encode('utf-8')) return { 'nonce': cipher.nonce.hex(), 'ciphertext': ciphertext.hex()+ tag.hex() }public String decrypt(String encryptedData) throws Exception { JSONObject jsonMessage = JSONObject.parseObject(encryptedData); // 解析密文、认证标签和 IV byte[] ciphertext = hexStringToByteArray(jsonMessage.getString("ciphertext")); byte[] nonce = hexStringToByteArray(jsonMessage.getString("nonce")); return decryptData(ciphertext, nonce);}
[*]BigDecimal字段类型,starrocks-connector-for-kafka无法解析报错。
解决:在starrocks-connector-for-kafka中进行解密的时候,FastJSON配置将BigDecimal转换为Double类型。
public R apply(R record) { if (record.value() == null) { return record; } String value = (String) record.value(); try { String newValue = aesEncryption.decrypt(value); // 转换BigDecimal为Double JSONObject jsonObject = JSON.parseObject(newValue, JSONReader.Feature.UseBigDecimalForDoubles); return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, jsonObject, record.timestamp()); } catch (Exception e) { return record; } }
[*]自定义打包的starrocks-connector-for-kafka,kafka Connector无法加载。
解决:必须使用Java8进行打包,使用了Java21打包,导致无法加载。
[*]使用最新版的python-mysql-replication读取binlog,解析不到表字段。
解决:不要使用最新版本,使用0.45.1版本,可参考:issue#612
[*]python3.6无法运行python-mysql-replication。
解决:python-mysql-replication不支持python3.6,至少需要3.7版本,本项目使用3.8.4版本
[*]python JSON转换不支持byte,日期格式。
解决:自定义python的JSON转换格式。
def convert_bytes_keys_to_str(data): if isinstance(data, dict): # 将字典中的 bytes 类型的键转换为 str return { (k.decode('utf-8') if isinstance(k, bytes) else k): KafkaBinlogStreamer.convert_bytes_keys_to_str(v) for k, v in data.items() } elif isinstance(data, list): # 对列表中的每个元素递归转换 return elif isinstance(data, bytes): # 对 bytes 类型的值进行转换 return data.decode('utf-8') elif isinstance(data, datetime): # 将 datetime 类型转换为 ISO 格式字符串 return data.strftime('%Y-%m-%d %H:%M:%S') else: return data
[*]设置python-mysql-replication的only_events导致消费不到任何binlog。
解决:经过测试,最终舍弃该参数的配置。
[*]脚本运行过程中占用内存过大,导致其被系统kill
解决:可以手动触发GC垃圾回收,主动回收释放内存
gc.collect()十、目前数据同步情况
指标KEY指标值RDS实例数2同步逻辑表数量56同步物理表数量5274数据延迟1分钟以内
页:
[1]