人工智能机械臂 发表于 2025-2-7 02:00:05

离线数据同步变迁

第一代-基于Hadoop体系的离线数据同步

一、背景

随着业务的发展,系统进行了微服务的差分,导致数据越来越分散,很难进行一个完整的生命周期的数据查询,对于某些业务的需求支持变得越来越难,越来越复杂,也越来越难以进行职责划分。对着业务的发展,数据量越来越大之后,为了良好的业务支持,进行了分库分表,分库分表规则五花八门,一旦脱离了业务逻辑,很难确定某一条数据在哪个库哪个表。
基于这样的问题和情况,为了满足业务需求,很自然的就想到了使用大数据服务,将业务数据归集到一起,建立完整的数据仓库,便于数据的查询。
二、数据同步架构

为了追求简单和通用,由于自身的认识现在,选择了最标准的大数据架构,即基于Hadoop的大数据体现。整个集群采用三节点,通过CDH进行集群的部署和维护。

整个数据链路为:
通过Azkaban调用Spark应用,将数据从RDS同步到Hive,运营平台和报表系统采用Presto加速访问Hive的数据。
三、数据同步详细过程

数据同步采用Spark任务来进行,将任务打包之后,上传到Azkaban调度平台,使用Azkaban进行定时调度,完成T+1级别的数据同步工作。
数据同步代码示例:
object MarketMysqlToHiveEtl extends SparkHivePartitionOverwriteApplication{/**   * 删除已存在的分区   *   * @param spark SparkSessions实例   * @param date 日期   * @param properties 数据库配置   */def delete_partition(spark: SparkSession, properties:Properties, date: String):Unit={    val odsDatabaseName = properties.getProperty("hive.datasource.ods")    DropPartitionTools   .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_money_record","ds",date)    DropPartitionTools   .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_account","ds",date)}/**   * 抽取数据   * @param spark SparkSession实例   * @param properties 数据库配置   * @param date 日期   */def loadData(spark: SparkSession, properties:Properties, date: String): Unit ={    // 删除历史数据,解决重复同步问题    delete_partition(spark,properties,date)    // 获取数据源配置    val odsDatabaseName = properties.get("hive.datasource.ods")    val dataSource = DataSourceUtils.getDataSourceProperties(FinalCode.MARKET_MYSQL_FILENAME,properties)    var sql = s"select id,account_id,type,original_id,original_code,money,reason,user_type,user_id,organization_id," +    s"create_time,update_time,detail,deleted,parent_id,counts,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"    // 同步数据    MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_money_record"),                                          s"${odsDatabaseName}.ods_t_money_record",SaveMode.Append,"ds")    sql = s"select id,code,customer_code,name,mobile,type,organization_id,organization_name,create_time,update_time,deleted,status,customer_name," +    s"customer_id,channel_type,nike_name,version,register_Time,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"    MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_account"),                                          s"${odsDatabaseName}.ods_t_account",SaveMode.Append,"ds")}/**   * 数据etl   * @param spark SparkSession实例   * @param SparkSession 数据库配置   */def etl(spark: SparkSession, properties:Properties): Unit = {    val sparkConf = spark.sparkContext.getConf    // 获取同步的日期    var lastDate = sparkConf.get("spark.etl.last.day", DateUtils.getLastDayString)    val dateList = newListBuffer()    if(lastDate.isEmpty){      // 未配置,设置为前一天      lastDate = DateUtils.getLastDayString    }    if(lastDate.contains("~")){      // 如果是时间段,获取时间段中的每一天,解析为时间list      val dateArray = lastDate.split("~")      DateUtils.findBetweenDates(dateArray(0), dateArray(1)).foreach(it => dateList.append(it))    }else if(lastDate.contains(",")){      // 如果是使用,分隔的多个日期,解析为时间list      lastDate.split(",").foreach(it => dateList.append(it))    }else{      // 添加进时间列表      dateList.append(lastDate)    }    // 循环同步每天的数据    dateList.foreach(it =>loadData(spark, properties, it))}def main(args: Array): Unit = {    job() {      val sparkAndProperties = SparkUtils.get()      val spark = sparkAndProperties.spark      val properties = sparkAndProperties.properties      // 调度任务      etl(spark,properties)    }}}删除Partition的代码示例:
object DropPartitionTools {/**   * 删除指定的Partition   * @param SparkSession实例   * @param database数据库名称   * @param table表名称   * @param partitionKey 分区字段的名称   * @param partitionValue 具体的分区值   */def dropPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String, partitionValue:String): Unit ={   val df = spark.sql(       s"""         | show tables in ${database} like '${table}'         |""".stripMargin)    if(df.count() > 0 ){      // 表存在,删除分区      spark.sql(      s"""         |ALTER TABLE${database}.${table} DROPIF EXISTSPARTITION (${partitionKey}='${partitionValue}')         |""".stripMargin)    }}/**   * 删除Partition   * @param SparkSession实例   * @param database数据库名称   * @param table表名称   * @param partitionKey 分区字段的名称   */def dropHistoryPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String): Unit ={    val df = spark.sql(      s"""         | show tables in ${database} like '${table}'         |""".stripMargin)    if(df.count() > 0 ){      // 表存在,删除历史分区,获取8天前的日期      val sevenDay = DateUtils.getSomeLastDayString(8);      spark.sql(      s"""         |ALTER TABLE${database}.${table} DROPIF EXISTSPARTITION (${partitionKey} ='${sevenDay}')         |""".stripMargin)    }}}从RDS同步数据到HIVE的代码示例:
object MysqlToHiveTools {/**   * 从mysql抽取数据到hive -- 全量   * @param spark spark实例   * @param dataSource 数据库配置信息   * @param tableName 抽取的数据库表名   * @param destTableName 目标表名   * @param mode 抽取的模式   */def mysqlToHiveTotal(spark: SparkSession, dataSource: JSONObject,tableName: String, destTableName:String,mode: SaveMode, partition: String): Unit = {   val sql = "(select * from " + tableName + ") as t"   mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)}/**   * 从mysql抽取数据到hive -- 增量量   * @param spark spark实例   * @param dataSource 数据库配置信息   * @param sql 抽取数据的SQL   * @param destTableName 目标表名   * @param mode 抽取的模式   */def readFromMysqlIncrement(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String,mode: SaveMode, partition: String): Unit = {    mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)}/**   * 真正的抽取数据   * @param spark spark实例   * @param properties 数据库配置信息   * @param sql 抽取数据的SQL   * @param destTableName 目标表名   * @param mode 抽取的模式   */def mysqlToHive(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String, mode: SaveMode, partition: String):Unit={    val df = spark.read.format("jdbc")      .option("url",dataSource.getString("url"))      .option("driver",dataSource.getString("driver"))      .option("fetchSize", 10000)      .option("numPartitions",2)      .option("dbtable",s"(${sql}) AS t")      .option("user",dataSource.getString("user"))      .option("password",dataSource.getString("password"))      .load()    if(partition == null || partition.isEmpty){      df.write.format("parquet").mode(mode).saveAsTable(destTableName)    }else{      df.write.format("parquet").mode(mode).partitionBy("ds").saveAsTable(destTableName)    }}}Spark Application代码示例
trait SparkHivePartitionOverwriteApplication extends Logging{def getProperties(): Properties ={    val prop:Properties = new Properties()    val inputStream = this.getClass.getClassLoader.getResourceAsStream("config.properties")    prop.load(inputStream);    prop}def job(appName: String = null,          master: String = null)(biz: => Unit): Unit = {    var spark: SparkSession = null    System.setProperty("HADOOP_USER_NAME", "mapred")    val prop:Properties = getProperties()    if (null == appName) {      spark = SparkSession.builder      .config("spark.sql.parquet.writeLegacyFormat", true)      .config("spark.sql.sources.partitionOverwriteMode","dynamic")      .config("hive.exec.dynamic.partition.mode","nonstrict")      .config("spark.sql.hive.convertMetastoreParquet",false)      .enableHiveSupport      .getOrCreate      var sparkAndProperties = SparkAndProperties(spark, prop)      SparkUtils.set(sparkAndProperties)    } else {      spark = SparkSession.builder.master(master).appName(appName)      .config("spark.sql.parquet.writeLegacyFormat", true)      .config("spark.sql.sources.partitionOverwriteMode","dynamic")      .config("hive.exec.dynamic.partition.mode","nonstrict")      .config("spark.sql.hive.convertMetastoreParquet",false)      .config("spark.testing.memory","2147480000")      .config("spark.driver.memory","2147480000")      .enableHiveSupport.getOrCreate      var sparkAndProperties = SparkAndProperties(spark, prop)      SparkUtils.set(sparkAndProperties)      SparkUtils.set(sparkAndProperties)    }    biz    spark.stop()    SparkUtils.remove()}}case class SparkAndProperties(spark: SparkSession,                              properties: Properties)四、配套生态


[*]自定义UDF函数
在使用的过程中,需要将表中的IP地址,解析为所在地的名称,这需要调用第三方的一个服务接口来完成,为了完成这个任务,定义了一个自定义UDF函数,进行解析。
a. 自定义UDF函数
object ParseIp{    def evaluate(ip: String):String= {      // 具体的IP解析服务      SplitAddress.getPlaceFromIp(ip)   }}b. 使用自定义UDF函数
object TraceTmpEtl extends SparkHivePartitionOverwriteApplication{/**   * 数据同步任务   * @param spark sparkSession实例   * @param properties 数据库配置   * @param date 日期   */def tmp_t_trace_user_visit_real_time_statistic(spark: SparkSession,properties:Properties,date: String):Unit ={    // 获取数据库配置的数据库名称    val odsDatabaseName = properties.get("hive.datasource.ods")    val tmpDatabaseName = properties.get("hive.datasource.tmp")    // 注册自定义的UDF函数    spark.udf.register("parseIP", (ip: String) => SplitAddress.getPlaceFromIp(ip))    // 在Spark SQL中使用UDF函数    spark.sql(      s"""         |INSERT OVERWRITE TABLE ${tmpDatabaseName}.tmp_t_statistic partition(ds='${date}')         |select         |          `id` ,         |          `create_time` ,         |          `update_time` ,         |          `ip` ,         |      replace( replace( replace(replace( case when parseIP(ip) rlike '^中国' then replace(parseIP(ip),'中国','')         |          when parseIP(ip) rlike '^内蒙古' then replace(parseIP(ip),'内蒙古','内蒙古自治区')         |          when parseIP(ip) rlike '^广西' then replace(parseIP(ip),'广西','广西壮族自治区')         |          when parseIP(ip) rlike '^西藏' then replace(parseIP(ip),'西藏','西藏自治区')         |          when parseIP(ip) rlike '^宁夏' then replace(parseIP(ip),'宁夏','宁夏回族自治区')         |          when parseIP(ip) rlike '^新疆' then replace(parseIP(ip),'新疆','新疆维吾尔自治区')         |          when parseIP(ip) rlike '^香港' then replace(parseIP(ip),'香港','香港特别行政区')         |          when parseIP(ip) rlike '^澳门' then replace(parseIP(ip),'澳门','澳门特别行政区')         |   else parseIP(ip) end, "省", "省."),"市", "市."),"县", "县."),"区", "区.") as ip_place,         |          `page_view`          |from ${odsDatabaseName}.ods_t_statistic where ds ='${date}'         |""".stripMargin)}/**   * 数据etl   * @param spark SparkSession实例   * @param properties 数据库配置   */def etl(spark: SparkSession, properties:Properties): Unit = {    val lastDate = DateUtils.getLastDayString    tmp_t_trace_user_visit_real_time_statistic(spark,properties, lastDate)}    def main(args: Array): Unit = {    job() {      val sparkAndProperties = SparkUtils.get()      val spark = sparkAndProperties.spark      val properties = sparkAndProperties.properties      etl(spark,properties)    }}}
[*]数据库的配置安全性问题
刚开始数据库配置同步配置文件直接写死,但是后续发现这样存在一些安全性的问题,后来采用将数据库相关的配置组合为一个JSON字符串,将其加密之后保存到MongoDB中,在使用时进行查询解密。
public class DataSourceUtils {    privatestatic Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);    public static JSONObject getDataSourceProperties(String dataSourceKey,Properties properties){      List<ServerAddress> adds = new ArrayList<>();      try {            String filePath = properties.getProperty("spark.mongo.properties.file.url");            properties = new Properties();            File file = new File(filePath);            FileInputStream inputStream = null;             inputStream = new FileInputStream(file);            properties.load(inputStream);      }catch (Exception e){            logger.info("not load file, reason:" + e.getMessage());            e.printStackTrace();      }      String mongoUrl = properties.getProperty("mongo_url");      String mongoPort = properties.getProperty("mongo_port");      String mongoDbName = properties.getProperty("mongo_dbName");      String mongoCollect = properties.getProperty("mongo_collect");      String mongoUser = properties.getProperty("mongo_user");      String mongoPassword = properties.getProperty("mongo_password");      String desKey = properties.getProperty("data_des_key");      ServerAddress serverAddress = new ServerAddress(mongoUrl, Integer.parseInt(mongoPort));      adds.add(serverAddress);      List<MongoCredential> credentials = new ArrayList<>();      MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(mongoUser, mongoDbName, mongoPassword.toCharArray());      credentials.add(mongoCredential);      MongoClient mongoClient = new MongoClient(adds, credentials);      MongoDatabase mongoDatabase = mongoClient.getDatabase(mongoDbName);      MongoCollection<Document> collection = mongoDatabase.getCollection(mongoCollect);      //指定查询过滤器      Bson filter = Filters.eq("key", dataSourceKey);      //指定查询过滤器查询      FindIterable findIterable = collection.find(filter);      //取出查询到的第一个文档      Document document = (Document) findIterable.first();      //打印输出      String content = DESUtil.decrypt(desKey, document.getString("content"));      return JSON.parseObject(content);    }    public staticProperties json2Properties(JSONObject jsonObject){      String tmpKey = "";      String tmpKeyPre = "";      Properties properties = new Properties();      j2p(jsonObject, tmpKey, tmpKeyPre, properties);      return properties;    }    private static void j2p(JSONObject jsonObject, String tmpKey, String tmpKeyPre, Properties properties){      for (String key : jsonObject.keySet()) {            // 获得key            String value = jsonObject.getString(key);            try {                JSONObject jsonStr = JSONObject.parseObject(value);                tmpKeyPre = tmpKey;                tmpKey += key + ".";                j2p(jsonStr, tmpKey, tmpKeyPre, properties);                tmpKey = tmpKeyPre;            } catch (Exception e) {                properties.put(tmpKey + key, value);                System.out.println(tmpKey + key + "=" + value);            }      }    }    public static void main(String[] args) {    }}
[*]Spark任务脚本示例
#!/bin/sh##### env ###########export JAVA_HOME=/usr/java/jdk1.8.0_151export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/sparkexport PATH=${JAVA_HOME}/bin:${SPARK_HOME}/bin:${PATH}export SPARK_USER=hadoopexport HADOOP_USER_NAME=hadoopLAST_DAY="$1"echo LAST_DAYspark-submit \--class net.app315.bigdata.operatereport.ods.MarketMysqlToHiveEtl \--conf spark.sql.hive.metastore.version=2.1.1 \--conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/* \--jars /opt/cloudera/parcels/CDH/lib/spark/jars/mysql-connector-java-5.1.48.jar,/opt/cloudera/parcels/CDH/lib/spark/jars/druid-1.1.10.jar \--master yarn \--deploy-mode cluster \--executor-memory 4G \--driver-memory 2G \--num-executors 4 \--executor-cores 2 \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.dynamicAllocation.maxExecutors=8 \--conf spark.yarn.am.attemptFailuresValidityInterval=1h \--conf spark.yarn.max.executor.failures=128 \--conf spark.yarn.executor.failuresValidityInterval=1h \--conf spark.task.maxFailures=4 \--conf spark.yarn.maxAppAttempts=2 \--conf spark.scheduler.mode=FIFO \--conf spark.network.timeout=420000 \--conf spark.dynamicAllocation.enabled=true \--conf spark.executor.heartbeatInterval=360000 \--conf spark.sql.crossJoin.enabled=true \--conf spark.mongo.properties.file.url=/opt/conf/mongo.properties \--conf spark.etl.last.day="${LAST_DAY}" \./target/spark-operate-report-project-1.0.jar
[*]Job任务脚本实例
nodes:- name: bigdata_market_ods_etl    type: command    config:      command: sh -x ./script/bigdata_market_ods_etl.sh "${spark.etl.last.day}"      failure.emails: mxx@xxx.com- name: bigdata_market_dim_etl    type: command    config:      command: sh -x ./script/bigdata_market_dim_etl.sh "${spark.etl.last.day}"      failure.emails: mxx@xxx.com    dependsOn:          - bigdata_market_ods_etl            - name: bigdata_market_dw_etl    type: command    config:      command: sh -x ./script/bigdata_market_dw_etl.sh "${spark.etl.last.day}"      failure.emails: mxx@xxx.com    dependsOn:          - bigdata_market_dim_etl          - bigdata_user_dw_etl五、备注


[*]Davinci报表   一个开源的报表平台
第二代-基于DolphinScheduler的离线数据同步

一、背景

自从上次开始使用基于Hadoop的大数据体现方案之后,业务平稳发展,但是随着时间的推移,新的问题开始出现,主要出现的问题为两个:

[*]数据的变更越来越频繁,基于之前SparkSQL任务的方式,只要需要对表结构进行变更,就需要重新修改Scala代码,然后重新进行任务的打包,这对于一些不熟悉代码的人来说,不太友好,而且成本也很高。
[*]虽然使用了Presto对HIVE的数据查询进行了加速,但是所在数据量越来越大,分析要求越来越复杂,即席查询越来越多,由于集群本身资源有限,查询能力出现了显著瓶颈。
二、数据同步架构

随着技术的发展已经对大数据的认识,接触到了更多的大数据相关的知识与组件,基于此,通过认真分析与思考之后,对数据的同步方案进行了如下的重新设计。

[*]数据存储与查询放弃了HDFS+HIVE+Presto的组合,转而采用现代化的MPP数据库StarRocks,StarRocks在数据查询的效率层面非常优秀,在相同资源的情况下,可以解决目前遇到的数据查询瓶颈。
[*]数据同步放弃了SparkSQL,转而采用更加轻量级的DATAX来进行,其只需要通过简单的配置,即可完成数据的同步,同时其也支持StarRocks Writer,开发人员只需要具备简单的SQL知识,就可以完成整个数据同步任务的配置,难度大大降低,效率大大提升,友好度大大提升。
[*]定时任务调度放弃Azkaban,采用现代化的任务调度工作Apache DolphinScheduler,通过可视化的页面进行调度任务工作流的配置,更加友好。

三、数据同步的详细流程

数据同步在这种方式下变动非常简单,只需要可视化的配置DataX任务,即可自动调度。下面的一个任务的配置示例
{"job": {    "setting": {      "speed": {      "channel":1      }    },    "content": [      {      "reader": {          "name": "mysqlreader",          "parameter": {            "username": "",            "password": "",            "connection": [            {                "querySql": [                  "SELECT CustomerId AS customer_id FROM base_info.base_customer where date(UpdateTime) > '${sdt}' and date(UpdateTime) < '${edt}'"                ],                "jdbcUrl": [                  "jdbc:mysql://IP:3306/base_info?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"                ]            }            ]          }      },      "writer": {          "name": "starrockswriter",          "parameter": {            "username": "xxx",            "password": "xxx",            "database": "ods_cjm_test",            "table": "ods_base_customer",            "column": ["id"],            "preSql": [],            "postSql": [],             "jdbcUrl": "jdbc:mysql://IP:9030/",            "loadUrl": ["IP:8050", "IP:8050", "IP:8050"],            "loadProps": {            "format": "json",            "strip_outer_array": true            }          }      }      }      ]    }}数据同步过程中,遇到了另外一个问题,即业务存在大量的分库分表的,这些分库分表的逻辑五花八门,60张左右的逻辑板,经过分库分表之后达到了惊人的5000多张,为每张表配置任务很显然不太正常,这就需要能够在进行数据同步的时候动态生成需要的表列表,把表列表配置到DataX的配置文件中去。
经过技术的调用,Apache DolphinScheduler的Python任务类型很适合做这个事情,由于公司本身使用了Apache DolphinScheduler3.0的版本,其Python任务还不支持返回数据到下游节点,但是社区最新版本已经支持该能力,因为按照已实现版本对其进行改造。
改造之后,Python节点能够将数据传递给他的下游节点,因此使用Python脚本查询获取需要进行同步的表列表,将其传递给DataX节点,完成动态表的数据同步
import pymysqlimport datetimedef select_all_table(date: str):    result_list = []    sql = """    SELECT concat('"', table_name, '"')   FROM information_schema.`TABLES`   WHERE table_schema='hydra_production_flow'         and table_name like 't_package_flow_log_%'      and table_name like '%_{}'    """.format(date)    conn = pymysql.connect(host='', port=3306, user='', passwd='',                           db='information_schema')    cur = conn.cursor()    cur.execute(query=sql)    while 1:      res = cur.fetchone()      if res is None:            break      result_list.append(res)    cur.close()    conn.close()    return result_listif __name__ == '__main__':    # 获取当前年月    # 获取当前日期    today = datetime.date.today()    # 计算前一天的日期    yesterday = today - datetime.timedelta(days=1)    current_date = yesterday.strftime("%Y_%m")    table_list = select_all_table(current_date)    table_str = ",".join(table_list)    # 设置变量,传递给下游节点    print('${setValue(table_list=%s)}' % table_str){"job": {    "setting": {      "speed": {      "channel":1      }    },    "content": [      {      "reader": {          "name": "mysqlreader",          "parameter": {            "username": "xxx",            "password": "xxxx",            "column": [            "id",            "concat('t_package_flow_log_',DATE_FORMAT(create_time,'%Y_%m'))",            "operation_type"            ],            "where": "date(create_time) ${operator_symbol} '${dt}'",            "connection": [            {                "table": [                  ${table_list}                ],                "jdbcUrl": [                  "jdbc:mysql://xx:3306/hydra_production_flow?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"                ]            }            ]          }      },      "writer": {                  "name": "starrockswriter",                  "parameter": {                        "username": "xxxxxx",                        "password": "xxxxxxx",                        "database": "ods_cjm",                        "table": "ods_t_package_flow_log",                        "column": ["id", "table_name","operation_type"],                        "preSql": [],                        "postSql": [],                         "jdbcUrl": "jdbc:mysql://IP:9030/",                        "loadUrl": ["IP:8050", "IP:8050", "IP:8050"],                        "loadProps": {                            "format": "json",                            "strip_outer_array": true                        }                  }                }            }      ]    }}四、踩坑记录


[*]DATAX只支持python2.x
下载支持python3.x的相关文件,替换DataX中的相同文件,即可支持python3.x使用
五、备注


[*]StarRocks    高性能的MPP数据库
[*]DataX离线数据同步
[*]Apache DolphinScheduler任务调度工具
第三代-基于Python自定义的离线数据同步

一、背景

自从采用Apache DolphinScheduler + StarRocks数据方案以来,一切都很平稳发展;但是随着时间的推移,总会出现新的问题。
随着数据量的增多,使用方需求的增长,已经一些其他因素的影响,对目前的数据同步架构带来了一些不小的挑战,这些问题导致任务的维护和更新越来越麻烦,需要耗费大量的时间来进行,急需一种新的方式来处理。

[*]由于等保的要求,线上RDS数据库不再支持通过公网访问,又因为StarRocks也在内网,这就导致了之前的数据同步链路彻底断裂,需要新的方案。
[*]由于数据结构的频繁变更、服务器资源导致的任务调度异常等等原因,需要重跑数据的需求越来越多,这就导致需要不断的修改任务的调度参数(如日期),目前已经上线了10个业务的调度任务,也就是重新同步一次,就需要依次修改调度这10个任务,这期间还需要专人进行状态的跟踪,即使修改调度,压力很大。
二、数据同步架构

鉴于数据链路变更,导致原本数据链路断裂的问题,通过调研之后,决定采用KAFKA进行数据的中转,在内网部署KAFKA集群,同时该集群提供公网访问地址;在RDS所在的内网机器上使用DataX将RDS数据通过公网地址写入KAFKA,在内网中通过KafkaConnector消费数据写入StarRocks。
鉴于新的资源有限,原本内网提供了4台8C32G的服务器,但是新的RDS所在内网只能提供一台最大4C8G的服务器。因此放弃了使用Apache DolphinScheduler来进行调度,直接使用crontab调用对应的Python脚本进行DataX任务调度。

三、具体的数据同步

新的方案,主要解决的问题有两个,一是DataX如何将数据写入KAFKA,二是Python脚本怎么解决前面遇到的修改复杂的问题。

[*]DataX写KAFKA
DataX本身并没有kafkawriter实现,这就需要我们自己实现一个KafkaWriter来支持我们的需求,同时为了数据安全,希望能够对数据进行加密。
DataX的KafkaWriter实现
public class KafkaWriter extends Writer {    public static class Job extends Writer.Job {      private static final Logger logger = LoggerFactory.getLogger(Job.class);      private Configuration conf = null;      @Override      public List<Configuration> split(int mandatoryNumber) {            List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);            for (int i = 0; i < mandatoryNumber; i++) {                configurations.add(conf);            }            return configurations;      }      private void validateParameter() {            this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);            this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);      }      @Override      public void init() {            this.conf = super.getPluginJobConf();            logger.info("kafka writer params:{}", conf.toJSON());            this.validateParameter();      }      @Override      public void destroy() {      }    }    public static class Task extends Writer.Task {      private static final Logger logger = LoggerFactory.getLogger(Task.class);      private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");      private Producer<String, String> producer;      private String fieldDelimiter;      private Configuration conf;      private Properties props;      private AesEncryption aesEncryption;      private List<String> columns;      @Override      public void init() {            this.conf = super.getPluginJobConf();            fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);            columns = conf.getList(Key.COLUMN_LIST, new ArrayList<>(), String.class);            props = new Properties();            props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));            props.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。            props.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "5", null));            props.put("retry.backoff.ms", "1000");            props.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));            props.put("linger.ms", 100);            props.put("connections.max.idle.ms", 300000);            props.put("max.in.flight.requests.per.connection", 5);            props.put("socket.keepalive.enable", true);            props.put("key.serializer", conf.getUnnecessaryValue(Key.KEYSERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));            props.put("value.serializer", conf.getUnnecessaryValue(Key.VALUESERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));            producer = new KafkaProducer<String, String>(props);            String encryptKey = conf.getUnnecessaryValue(Key.ENCRYPT_KEY, null, null);            if(encryptKey != null){                aesEncryption = new AesEncryption(encryptKey);            }      }      @Override      public void prepare() {            AdminClient adminClient = AdminClient.create(props);            ListTopicsResult topicsResult = adminClient.listTopics();            String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);            try {                if (!topicsResult.names().get().contains(topic)) {                  new NewTopic(                            topic,                            Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),                            Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))                  );                  List<NewTopic> newTopics = new ArrayList<NewTopic>();                  adminClient.createTopics(newTopics);                }                adminClient.close();            } catch (Exception e) {                throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());            }      }      @Override      public void startWrite(RecordReceiver lineReceiver) {            logger.info("start to writer kafka");            Record record = null;            while ((record = lineReceiver.getFromReader()) != null) {                if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)                        .equalsIgnoreCase(WriteType.TEXT.name())) {                  producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),                            Md5Encrypt.md5Hexdigest(recordToString(record)),                            aesEncryption ==null ? recordToString(record): JSONObject.toJSONString(aesEncryption.encrypt(recordToString(record))))                  );                } else if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)                        .equalsIgnoreCase(WriteType.JSON.name())) {                  producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),                            Md5Encrypt.md5Hexdigest(recordToString(record)),                            aesEncryption ==null ? recordToJsonString(record) : JSONObject.toJSONString(aesEncryption.encrypt(recordToJsonString(record))))                  );                }                producer.flush();            }      }      @Override      public void destroy() {            if (producer != null) {                producer.close();            }      }      /**         * 数据格式化         *         * @param record         * @return         */      private String recordToString(Record record) {            int recordLength = record.getColumnNumber();            if (0 == recordLength) {                return NEWLINE_FLAG;            }            Column column;            StringBuilder sb = new StringBuilder();            for (int i = 0; i < recordLength; i++) {                column = record.getColumn(i);                sb.append(column.asString()).append(fieldDelimiter);            }            sb.setLength(sb.length() - 1);            sb.append(NEWLINE_FLAG);            return sb.toString();      }      /**         * 数据格式化         *         * @param record 数据         *         */      private String recordToJsonString(Record record) {            int recordLength = record.getColumnNumber();            if (0 == recordLength) {                return "{}";            }            Map<String, Object> map = new HashMap<>();            for (int i = 0; i < recordLength; i++) {                String key = columns.get(i);                Column column = record.getColumn(i);                map.put(key, column.getRawData());            }            return JSONObject.toJSONString(map);      }    }}进行数据加密的实现:
public class AesEncryption {    private SecretKey secretKey;    public AesEncryption(String secretKey) {      byte[] keyBytes = Base64.getDecoder().decode(secretKey);      this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES");    }    public String encrypt(String data) {      try {            Cipher cipher = Cipher.getInstance("AES");            cipher.init(Cipher.ENCRYPT_MODE, secretKey);            byte[] encryptedBytes = cipher.doFinal(data.getBytes());            return Base64.getEncoder().encodeToString(encryptedBytes);      } catch (Exception e) {            throw new RuntimeException(e);      }    }    public String decrypt(String encryptedData) throws Exception {      Cipher cipher = Cipher.getInstance("AES");      cipher.init(Cipher.DECRYPT_MODE, secretKey);      byte[] decodedBytes = Base64.getDecoder().decode(encryptedData);      byte[] decryptedBytes = cipher.doFinal(decodedBytes);      return new String(decryptedBytes);    }}Kafka的公网配置
Kafka的内外网配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。
# 配置kafka的监听端口,同时监听9093和9092listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://kafka节点3内网IP:9092# 配置kafka的对外广播地址, 同时配置内网的9093和外网的19092advertised.listeners=INTERNAL://kafka节点3内网IP:9093,EXTERNAL://公网IP:19092# 配置地址协议listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT# 指定broker内部通信的地址inter.broker.listener.name=INTERNAL
[*]自定义的配置文件
Python脚本需要能够自动生成对应的DataX调度的配置文件和shell脚本,自动调度DataX进行任务的执行。因此经过调研,采用自定义配置文件,通过读取配置文件,动态生成对应的DataX任务脚本和调度脚本,调度任务执行。
自定义的配置文件示例1:
{"datasource": {    "host": "xxxxxx",    "port": "3306",    "username": "xxxxx",    "password": "xxxxxxx",    "properties": {      "characterEncoding": "utf-8",      "useSSL": "false",      "tinyInt1isBit": "false"    }},"table": {    "database": "app",    "table": "device",    "column": [      "Id AS id",      "CompanyName AS company_name",      "CompanyId AS company_id",      "SecretKey AS secret_key",      "Brand AS brand",      "ModelType AS model_type",      "Enable AS enable",      "CAST(CreateTime as CHAR) AS create_time",      "CAST(UpdateTime as CHAR) AS update_time"    ],    "where": "date(UpdateTime) >= '$'",    "searchTableSql": []},"kafka": {    "topic": "mzt_ods_cjm.ods_device"}}支持分库分表的配置文件示例2
{"datasource": {    "host": "xxxxxxx",    "port": "3306",    "username": "xxxxxxx",    "password": "xxxxxxxx",    "properties": {      "characterEncoding": "utf-8",      "useSSL": "false",      "tinyInt1isBit": "false"    }},"table": {    "database": "hydra_logistics_flow",    "table": "",    "column": [      "id",      "concat('t_logistics_sweep_out_code_flow_',DATE_FORMAT(create_time,'%Y')) AS table_name",      "cus_org_id",      "CAST(create_time as CHAR) AS create_time",      "replace_product_id",      "replace_product_name",      "replace_product_code"    ],    "where": "date(create_time) >= '$'",    "searchTableSql": [      "SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(SUBDATE(CURDATE(), 1))) AS TABLE_NAME",      "SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(DATE_SUB(DATE_SUB(CURDATE(), INTERVAL 1 DAY), INTERVAL 1 YEAR))) AS TABLE_NAME"    ]},"kafka": {    "topic": "mzt_ods_cjm.ods_t_logistics_sweep_out_code_flow"}}如上的配置文件,解释如下:
KEY说明datasourceRDS数据源datasource.hostRDS数据库的hostdatasource.port>RDS数据库的端口datasource.usernameRDS数据库的用户名datasource.passwordRDS数据库的密码datasource.propertiesjdbc连接的参数,连接时拼接为?key=value&key=valuetable要同步的表信息table.databaseRDS数据库名称table.tableRDS中表的名称,分库分表的可以为空table.columnRDS表中要同步的字段列表,支持取别名和使用函数table.where同步数据的过滤条件table.searchTableSql查询表名称的SQL语句,用于动态分库分表kafkakafka相关的配置kafka.topic数据要写入的kafka topic的名称
[*]Python调度脚本
import jsonimport osimport pymysqlimport refrom datetime import datetimefrom dateutil.relativedelta import relativedeltaimport uuidimport subprocessimport loggingimport hmacimport hashlibimport base64import urllib.parseimport urllibimport requestsimport timefrom typing import List, Mappingdef list_files_in_directory(directory_path: str) -> List:    """    获取目录下的所有以.json结尾的文件    :param directory_path: 目录    :return: 文件列表    """    entries = os.listdir(directory_path)    # 过滤出所有文件    files =     logging.info(f"读取配置文件数量:{len(files)}")    return filesdef read_file_content(file_path: str) -> str:    """    读取文件内容    :param file_path: 文件路径    :return: 文件内容    """    with open(file_path, 'r', encoding='utf-8') as file:      content = file.read()    return contentdef read_all_files_in_directory(directory_path: str) -> Mapping:    """    读取文件夹下面的所有文件的内容    :param directory_path: 文件夹路径    :return: 内容map    """    logging.info(f"开始读取所有的配置文件信息")    files = list_files_in_directory(directory_path)    file_contents = {}    for file in files:      file_path = os.path.join(directory_path, file)      content = read_file_content(file_path)      file_contents = content    sorted_items = sorted(file_contents.items())    sorted_dict = dict(sorted_items)    return file_contentsdef search_table_list(datasource: json, search_table_sql_list: List) -> List:    """    执行语句获取表信息    :param datasource: 数据源信息    :param search_table_sql_list: 查询表的SQL语句    :return: 表列表    """    logging.info(f"开始查询需要同步的表")    host = datasource['host']    port = int(datasource['port'])    username = datasource['username']    password = datasource['password']    conn = pymysql.connect(host=host,                           port=port,                           user=username,                           passwd=password,                           db='',                           charset='utf8',                           connect_timeout=200,                           autocommit=True,                           read_timeout=2000                        )    table_name_list = []    for search_table_sql in search_table_sql_list:      search_table_sql = parse_where_sql(search_table_sql)      with conn.cursor() as cursor:            cursor.execute(query=search_table_sql)            while 1:                res = cursor.fetchone()                if res is None:                  break                table_name_list.append(res)    return table_name_listdef general_default_job_config() -> json:    """    生成默认的datax配置    :return: 默认的配置    """    default_job_json = """    {    "job": {      "setting": {            "speed": {               "channel":1            }      },      "content": [            {                "reader": {                  "name": "mysqlreader",                  "parameter": {                        "username": "test",                        "password": "test1234",                        "connection": [                            {                              "querySql": [                                    "SELECT id, code from test.t_open_api_classify"                              ],                              "jdbcUrl": [                                    "jdbc:mysql://IP:3306/test?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"                              ]                            }                        ]                  }                },               "writer": {                  "name": "kafkawriter",                  "parameter": {                        "bootstrapServers": "IP:9092,IP:9092,IP:9092",                        "topic": "test-m-t-k",                        "ack": "all",                        "batchSize": 1000,                        "retries": 0,                        "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",                        "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",                        "fieldDelimiter": ",",                        "writeType": "json",                        "topicNumPartition": 1,                        "topicReplicationFactor": 1,                        "encryptionKey": "5s8FGjerddfWkG/b64CGHHZYvQ=="                  }                }            }      ]    }}    """    return json.loads(default_job_json, encoding='utf-8')def general_jdbc_url(json_config: json) -> str:    """    根据数据源信息生成jdbc url    :param json_config: 配置    :return: jdbc url    """    logging.info(f"开始解析jdbc url")    host = json_config['datasource']['host']    port = int(json_config['datasource']['port'])    database = json_config['table']['database']    url = "jdbc:mysql://{}:{}/{}".format(host, port, database)    # 解下properties    properties = json_config['datasource']['properties']    properties_list = []    if properties is not None and len(properties) > 0:      for key, value in properties.items():            properties_list.append(key + "=" + str(value))      url = url + "?" + "&".join(properties_list)    logging.info(f"jdbc url: {url}")    return urldef parse_where_sql(where_sql: str) -> str:    """    解析where语句    :param where_sql: 原始where语句    :return: 转换之后的where语句    """    # 定义支持的类型 $$    # 正则表达式模式    logging.info(f"还是解析where语句:where_sql: {where_sql}")    pattern = r"\$\[.*?\]"    return re.sub(pattern, replacement_function, where_sql)def replacement_function(match):    """    替换函数    :param match: 匹配结果    :return: 替换之后的结果    """    matched_text = match.group(0)    return calc_datetime(matched_text)def calc_datetime(expression: str) -> str:    """    计算时间表达式    :param expression: 表达式    :return: 计算之后的值    """    logging.info(f"开始计算时间参数:expression: {expression}")    # 设置映射    format_units = {      "yyyy": "%Y",      "MM": "%m",      "dd": "%d",      "HH": "%H",      "mm": "%M",      "ss": "%S"    }    unit_map = {      "Y": "yyyy",      "M": "MM",      "d": "dd",      "H": "HH",      "m": "mm",      "s": "ss"    }    # 解析参数    expression = expression    # 判断其开头,截取尾部    min_unit = None    for key, value in format_units.items():      if key in expression:            min_unit = key            expression = expression.replace(key, value)    # 替换完毕,确定是否有数字    logging.info(f"转换为Python格式的表达式:expression: {expression}")    # 定义正则表达式模式    pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_())?'    matches = re.match(pattern, expression)    # 输出拆分结果    if matches:      date_part = matches.group(1)      remainder = matches.group(2)      unit = matches.group(4)      if unit is not None and unit in unit_map.keys():            min_unit = unit_map      return calculate_expression(min_unit, date_part, remainder)    else:      return expressiondef calculate_expression(min_unit: str, date_part: str, remainder: str) -> str:    """    计算表达式    :param min_unit: 最小单位    :param date_part: 日期表达式部分    :param remainder: 偏移量部分    :return: 计算之后的结果    """    logging.info(f"开始计算表达式:min_unit: {min_unit}, date_part: {date_part}, remainder:{remainder}")    # 获取当前日期和时间    now = datetime.now()    # 计算时间的偏移量    if remainder is None:      # 格式化的日期      formatted_datetime = now.strftime(date_part)      logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")      return formatted_datetime    else:      # 计算偏移量      plus_or_sub = remainder      offset = eval(remainder)      logging.info(f"计算偏移量,plus_or_sub:{plus_or_sub}, offset:{offset}")      if min_unit == 'yyyy':            if plus_or_sub == '-':                now = now - relativedelta(years=offset)            else:                now = now + relativedelta(years=offset)      elif min_unit == 'MM':            if plus_or_sub == '-':                now = now - relativedelta(months=offset)            else:                now = now + relativedelta(months=offset)      elif min_unit == 'dd':            if plus_or_sub == '-':                now = now - relativedelta(days=offset)            else:                now = now + relativedelta(days=offset)      elif min_unit == 'HH':            if plus_or_sub == '-':                now = now - relativedelta(hours=offset)            else:                now = now + relativedelta(hours=offset)      elif min_unit == 'mm':            if plus_or_sub == '-':                now = now - relativedelta(minutes=offset)            else:                now = now + relativedelta(minutes=offset)      elif min_unit == 'ss':            if plus_or_sub == '-':                now = now - relativedelta(seconds=offset)            else:                now = now + relativedelta(seconds=offset)      formatted_datetime = now.strftime(date_part)      logging.info(f"日期偏移量为空,返回值:{formatted_datetime}")      return formatted_datetimedef general_reader(json_config: json) -> json:    """    生成配置的reader部分    :param json_config: 配置    :return: JSON结果    """    logging.info(f"开始生成DataX的配置JSON文件的reader内容")    reader_json = json.loads("{}", encoding='utf-8')    reader_json['name'] = "mysqlreader"    reader_json['parameter'] = {}    reader_json['parameter']['username'] = json_config['datasource']['username']    reader_json['parameter']['password'] = json_config['datasource']['password']    reader_json['parameter']['column'] = json_config['table']['column']    reader_json['parameter']['connection'] = [{}]    reader_json['parameter']['connection']['table'] = json_config['table']['table']    reader_json['parameter']['connection']['jdbcUrl'] =     where_sql = json_config['table']['where']    if where_sql is not None and where_sql != '':      reader_json['parameter']['where'] = parse_where_sql(where_sql)    return reader_jsondef general_writer(json_config: json) -> json:    """    生成配置的Writer部分    :param json_config: 配置    :return: JSON结果    """    columns = json_config['table']['column']    new_columns = []    for column in columns:      column = str(column).replace("`", "")      if " AS " in str(column).upper():            new_columns.append(str(column).split(" AS ").strip())      else:            new_columns.append(str(column).strip())    logging.info(f"开始生成DataX的配置JSON文件的Writer内容")    writer_json = json.loads("{}", encoding='utf-8')    writer_json['name'] = "kafkawriter"    writer_json['parameter'] = {}    writer_json['parameter']['bootstrapServers'] = "IP:19092,IP:19093,IP:19094"    writer_json['parameter']['topic'] = json_config['kafka']['topic']    writer_json['parameter']['ack'] = "all"    writer_json['parameter']['batchSize'] = 1000    writer_json['parameter']['retries'] = 3    writer_json['parameter']['keySerializer'] = "org.apache.kafka.common.serialization.StringSerializer"    writer_json['parameter']['valueSerializer'] = "org.apache.kafka.common.serialization.StringSerializer"    writer_json['parameter']['fieldDelimiter'] = ","    writer_json['parameter']['writeType'] = "json"    writer_json['parameter']['topicNumPartition'] = 1    writer_json['parameter']['topicReplicationFactor'] = 1    writer_json['parameter']['encryptionKey'] = "5s8FGjerddfWkG/b64CGHHZYvQ=="    writer_json['parameter']['column'] = new_columns    return writer_jsondef general_datax_job_config(datax_config: str):    """    生成job的配置内容    :param datax_config: 配置    :return: 完整的JSON内容    """    logging.info(f"开始生成DataX的配置JSON文件内容, {datax_config}")    json_config = json.loads(datax_config, encoding='utf-8')    # 判定是否需要查询表    datasource = json_config['datasource']    table = json_config['table']['table']    search_table_sql_list = json_config['table']['searchTableSql']    if search_table_sql_list is not None and len(search_table_sql_list) > 0:      # 查询表列表,覆盖原来的配置信息      table_list = search_table_list(datasource, search_table_sql_list)    else:      table_list =     json_config['table']['table'] = table_list    # 开始生成配置文件    job_json = general_default_job_config()    job_json['job']['content']['reader'] = general_reader(json_config)    job_json['job']['content']['writer'] = general_writer(json_config)    return job_jsondef write_job_file(base_path: str, job_config: json) -> str:    """    生成job的JSON配置文件    :param base_path: 根路径    :param job_config: 配置信息    :return: 完整的JSON文件路径    """    # 生成一个脚本    logging.info(f"开始创建DataX的配置JSON文件")    date_day = datetime.now().strftime('%Y-%m-%d')    timestamp_milliseconds = int(datetime.now().timestamp() * 1000)    # 生成UUID    file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".json"    # 完整文件路径    # 创建文件夹    mkdir_if_not_exist(base_path + "/task/datax/json/" + date_day)    complex_file_path = base_path + "/task/datax/json/" + date_day + "/" + file_name    logging.info(f"完整的DataX的配置JSON文件路径:{complex_file_path}")    with open(complex_file_path, 'w+', encoding='utf-8') as f:      f.write(json.dumps(job_config, ensure_ascii=False))    return complex_file_pathdef mkdir_if_not_exist(path):    """    创建目录    :param path: 目录路径    :return: None    """    os.makedirs(path, exist_ok=True)def write_task_file(base_path: str, python_path: str, datax_path: str, job_file_path: str) -> str:    """    写shell脚本文件    :param base_path: 跟路径    :param python_path: python执行文件路径    :param datax_path: datax执行文件路径    :param job_file_path: JSON配置文件路径    :return: shell脚本的完整路径    """    # 组合内容    logging.info(f"开始创建Shell脚本文件")    task_content = python_path + " " + datax_path + " " + job_file_path    # 生成一个脚本    date_day = datetime.now().strftime('%Y-%m-%d')    timestamp_milliseconds = int(datetime.now().timestamp() * 1000)    # 生成UUID    task_file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".sh"    # 完整文件路径    # 创建文件夹    mkdir_if_not_exist(base_path + "/task/datax/shell/" + date_day)    complex_file_path = base_path + "/task/datax/shell/" + date_day + "/" + task_file_name    logging.info(f"完整的shell脚本路径: {complex_file_path}")    with open(complex_file_path, 'w+', encoding='utf-8') as f:      f.write(task_content)    # 添加执行权限    current_permissions = os.stat(complex_file_path).st_mode    # 添加执行权限 (权限值 0o111 表示用户、组和其他人的执行权限)    new_permissions = current_permissions | 0o111    # 使用 os.chmod 设置新的权限    os.chmod(complex_file_path, new_permissions)    return complex_file_pathdef signs(dd_secret: str, timestamp: str) -> str:    """    钉钉机器人签名    :param dd_secret: 秘钥    :param timestamp: 时间戳    :return: 签名    """    secret_enc = dd_secret.encode('utf-8')    string_to_sign = '{}\n{}'.format(timestamp, dd_secret)    string_to_sign_enc = string_to_sign.encode('utf-8')    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()    sign = urllib.parse.quote(base64.b64encode(hmac_code))    return signdef real_send_msg(dd_secret: str, dd_access_token: str, text: json):    """    发送钉钉机器人消息    :param dd_secret: 秘钥    :param dd_access_token: token    :param text: 内容    :return: None    """    timestamp = str(round(time.time() * 1000))    sign = signs(dd_secret, timestamp)    headers = {'Content-Type': 'application/json'}    web_hook = f'https://oapi.dingtalk.com/robot/send?access_token={dd_access_token}&timestamp={timestamp}&sign={sign}'    # 定义要发送的数据    requests.post(web_hook, data=json.dumps(text), headers=headers)def send_msg(dd_secret: str, dd_access_token: str, job_start_time: str, total_count: int, success_count: int, fail_task_list: List):    """    组合钉钉消息    :param dd_secret: 秘钥    :param dd_access_token: token    :param job_start_time: 任务开始时间    :param total_count: 总任务数    :param success_count: 成功任务数    :return: NONE    """    title = '### <font color=#CCCC00>数据同步结果</font>'    if success_count == total_count:      title = '### <font color=#00FF00>数据同步结果</font>'    elif success_count == 0:      title = '### <font color=#FF0000>数据同步结果</font>'    end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')    result = {      "msgtype": "markdown",      "markdown": {            "title": "数据同步结果",            "text": title + ' \n\n\n\n- '                  + "总同步任务数:" + str(total_count) + "\n\n- "                  + "成功任务数:" + str(success_count) + "\n\n- "                  + "失败任务数" + str(total_count - success_count) + "\n\n- "                  + "开始时间:" + str(job_start_time) + "\n\n- "                  + "结束时间:" + str(end_time) + "\n\n- "                  + "失败列表:" + str(fail_task_list) + "\n\n "      }    }    if success_count < total_count:      result['markdown']['at'] = json.loads("{\"atMobiles\": [\"12345678997\"]}")    real_send_msg(dd_secret, dd_access_token, result)def run_job(dd_secret, dd_access_token, job_start_time, base_path: str, python_script_path: str, datax_json_path: str):    """    运行任务    :param dd_secret: 秘钥    :param dd_access_token: token    :param job_start_time: 任务开始时间    :param base_path: 根路径    :param python_script_path: Python执行路径    :param datax_json_path: datax执行路径    :return: NONE    """    task_content_list = read_all_files_in_directory(base_path + "/task/config/")    success_count = 0    total_count = len(task_content_list)    fail_task_list = []    for task_content in task_content_list:      try:            logging.info(f"开始生成,配置文件名称:{task_content}")            job_config = general_datax_job_config(task_content_list)            job_file_path = write_job_file(base_path, job_config)            shell_path = write_task_file(base_path, python_script_path, datax_json_path, job_file_path)            logging.info(f"shell脚本创建成功,路径为:{base_path}")            # 调用脚本            call_shell(shell_path)            success_count += 1      except Exception as e:            fail_task_list.append(task_content)            logging.error(f"配置文件:{task_content} 执行失败", e)    # 发送消息    send_msg(dd_secret, dd_access_token, job_start_time, total_count, success_count, fail_task_list)def call_shell(shell_path: str):    """    执行shell脚本    :param shell_path: shell脚本路径    :return: NONE    """    logging.info(f"调用shell脚本,路径为:{shell_path}")    result = subprocess.run(shell_path,                            check=True,                            shell=True,                            universal_newlines=True,                            stdout=subprocess.PIPE,                            stderr=subprocess.PIPE)    # 输出标准输出    logging.info(f"shell脚本{shell_path}标准输出:%s", result.stdout)    # # 输出标准错误输出    logging.info(f"shell脚本{shell_path}标准错误输出:%s", result.stderr)    # # 输出返回码    logging.info(f"shell脚本{shell_path}的返回码:%s", result.returncode)if __name__ == '__main__':    """    码中台数据同步任务脚本    使用前请修改如下配置信息:      - secret钉钉机器人的秘钥      - access_token钉钉机器人的token      - python_path   Python的安装路径      - datax_path   datax的执行文件路径    """    # 钉钉配置    start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')    secret = ''    access_token = ''    python_path = "/usr/bin/python3"    datax_path = "/opt/datax-k/bin/datax.py"    # 当前脚本文件的目录路径    script_dir = '/opt/data-job'    curr_date_day = datetime.now().strftime('%Y-%m-%d')    # 创建文件夹    mkdir_if_not_exist(script_dir + "/logs/" + curr_date_day)    logging.basicConfig(level=logging.INFO,                        format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s',                        filename='logs/' + curr_date_day + '/app.log',                        filemode='w')    run_job(secret, access_token, start_time, script_dir, python_path, datax_path)    logging.shutdown()
[*]同步日期的控制
我们在之前的任务同步中,遇到的问题便是日期的修改很麻烦,因此我们需要一个更加简单的方式来进行日期的批量更新。在我们上面的调度脚本中,包含了对日期表达式的解析,我们自定义了一种时间的表达式$ 通过解析该表达式,我们可以生成需要的任意时间,该时间表达式的含义为:

[*]yyyy 表示年份
[*]MM 表示月份
[*]dd 表示日期
[*]HH表示24进制小时
[*]mm表示分钟
[*]ss表示秒
[*]

[*]表示当前时间加上N

[*]

[*]表示当前时间减去N

[*]_Y表示加减的单位,可以是YMdHms(年、月、日、时、分、秒)
通过对该表达式的解析,我们可以生成相对于当前之前或之后的任何格式的时间字符串,将其用于同步的where条件中,既可以完成针对时间的解析。

[*]如何更新日期
日期目前可以计算,但是我们需要能够批量修改配置文件中的WHERE条件中的时间表达式,如我们想同步8天前的数据,我们就需要将脚本中的表达式修改为$ ,即代表当前时间减去8天,这样我们就可以同步八天前那一天的数据,但是我们可能想同步从8天气到现在的所有数据,那么我们希望我们也能批量修改where表达式中的条件,如将=改为>=。
鉴于以上的需求,我们开发了一个新的Python脚本,通过简单的配置,即可一次修改所有脚本中的where条件中的表达式,这样,我们只需要执行两个脚本,就完成了一切,再也不需要依次修改执行10个工作流了。
import jsonimport osimport loggingfrom typing import List, Mappingimport refrom datetime import datetime, datedef list_files_in_directory(directory_path: str) -> List:    """    获取目录下的所有以.json结尾的文件    :param directory_path: 目录    :return: 文件列表    """    entries = os.listdir(directory_path)    # 过滤出所有文件    files =     logging.info(f"读取配置文件数量:{len(files)}")    return filesdef read_file_content(file_path: str) -> str:    """    读取文件内容    :param file_path: 文件路径    :return: 文件内容    """    with open(file_path, 'r', encoding='utf-8') as file:      content = file.read()    return contentdef read_all_files_in_directory(directory_path: str) -> Mapping:    """    读取文件夹下面的所有文件的内容    :param directory_path: 文件夹路径    :return: 内容map    """    logging.info(f"开始读取所有的配置文件信息")    files = list_files_in_directory(directory_path)    file_contents = {}    for file in files:      file_path = os.path.join(directory_path, file)      content = read_file_content(file_path)      file_contents = content    sorted_items = sorted(file_contents.items())    sorted_dict = dict(sorted_items)    return file_contentsdef parse_where_sql(where_sql: str, sub_day: int, comparator: str = None) -> str:    """    解析where语句    :param where_sql: 原始where语句    :param sub_day: 天数    :param comparator: 比较符包括 = != > < >=   <=    :return: 转换之后的where语句    """    # 定义支持的类型 $$    # 正则表达式模式    pattern = r'\$(\[.*?\])'    matches = re.finditer(pattern, where_sql)    for match in matches:      matched_text = match.group(1)      new_search = calc_datetime(matched_text, sub_day)      where_sql = where_sql.replace(matched_text, new_search)    legal_comparator_list = ['>==','<>', '!=', '>=', '<=', '=', '>','<']    legal_default = '@'    if comparator is not None:      for legal_comparator in legal_comparator_list:            if legal_comparator in where_sql:                where_sql = where_sql.replace(legal_comparator, legal_default)      where_sql = where_sql.replace(legal_default, comparator)    return where_sqldef calc_datetime(expression: str, sub_day: int) -> str:    """    计算时间表达式    :param expression: 表达式    :param sub_day: 天数    :return: 计算之后的值    """    # 替换完毕,确定是否有数字    # 定义正则表达式模式    pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_())?'    matches = re.match(pattern, expression)    # 输出拆分结果    if matches:      date_part = matches.group(1)      remainder = matches.group(2)      unit = matches.group(4)      plus_or_sub = remainder      if unit is not None:            return date_part + plus_or_sub + str(sub_day) + '_' + unit + "]"      else:            return date_part + plus_or_sub + str(sub_day) + "]"    else:      return expressiondef check_parma(formatted_date: str, sub_day: int, comparator: str = None):    """    校验参数是否合法    :param formatted_date: 格式化日期    :param sub_day: 天数    :param comparator: 操作符    :return: NONE    """    legal_comparator = ['=', '<>', '!=', '>', '>=', '<', '<=']    if formatted_date is None and sub_day is None:      raise "formatted_date 和 sub_day不能同时为空"    if formatted_date is not None:      try:            datetime.strptime(formatted_date, "%Y-%m-%d")      except Exception as _:            raise "formatted_date 必须是一个完整的yyyy-MM-dd日期格式, 当前sub_day={}".format(sub_day)    if formatted_date is None and not isinstance(sub_day, int):      raise "sub_day 必须是一个整数, 当前sub_day={}".format(sub_day)    if comparator is not None and comparator not in legal_comparator:      raise "comparator 不合法,合法操作列表为:{} 当前comparator={}".format(legal_comparator, comparator)def update_file(base_path: str, sub_day: int, comparator: str = None):    """    更新配置文件    :param base_path 配置文件根目录    :param sub_day要减去的天数    :param comparator 比较符    """    file_dict = read_all_files_in_directory(base_path)    for key, value in file_dict.items():      json_data = json.loads(value, encoding='utf-8')      where_sql = json_data['table']['where']      if where_sql is not None:            new_where_sql = parse_where_sql(where_sql, sub_day, comparator)            json_data['table']['where'] = new_where_sql      search_tal_sql_list = json_data['table']['searchTableSql']      if search_tal_sql_list is not None:            new_search_table_sql_list = []            for search_tal_sql in search_tal_sql_list:                new_search_table_sql = parse_where_sql(search_tal_sql, sub_day)                new_search_table_sql_list.append(new_search_table_sql)            json_data['table']['searchTableSql'] = new_search_table_sql_list      with open(base_path + "/" + key, "w+", encoding='utf-8') as f:            f.write(json.dumps(json_data, ensure_ascii=False, indent=2))      print("{} 更新完成".format(key))if __name__ == '__main__':    """    更新数据同步配置文件的日期    """    dir_path = r'/opt/data-job/task/config'    # 多少天前    day = 6    # 要指定的日期    date_format = '2024-11-19'    # where表达式的条件    comparator_symbol = '>='    check_parma(date_format, day, comparator_symbol)    if date_format is not None:      # 使用date_format的值覆盖day      single_date = datetime.strptime(date_format, "%Y-%m-%d").date()      current_date = date.today()      day = (current_date - single_date).days    update_file(dir_path, day, comparator_symbol)
[*]通过KafkaConnector同步数据到StarRocks
[*]starrocks-connector-for-kafka的实现

StarRocks官方提供了starrocks-connector-for-kafka的实现,我们只需要在其中加入我们的数据解密逻辑即可直接使用。
package com.starrocks.connector.kafka.transforms;public class DecryptJsonTransformation <R extends ConnectRecord<R>> implements Transformation<R> {    private static final Logger LOG = LoggerFactory.getLogger(DecryptJsonTransformation.class);    private AesEncryption aesEncryption;    private interface ConfigName {      String SECRET_KEY = "secret.key";    }    public static final ConfigDef CONFIG_DEF = new ConfigDef()    .define(ConfigName.SECRET_KEY, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "secret key");    @Override    public R apply(R record) {      if (record.value() == null) {            return record;      }      String value = (String) record.value();      try {            String newValue = aesEncryption.decrypt(value);            JSONObject jsonObject = JSON.parseObject(newValue, JSONReader.Feature.UseBigDecimalForDoubles);            return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, jsonObject, record.timestamp());      } catch (Exception e) {            return record;      }    }    @Override    public ConfigDef config() {      return CONFIG_DEF;    }    @Override    public void close() {    }    @Override    public void configure(Map<String, ?> map) {      final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map);      String secretKey = config.getString(ConfigName.SECRET_KEY);      aesEncryption = new AesEncryption(secretKey);    }}解密的逻辑
package com.starrocks.connector.kafka;import javax.crypto.Cipher;import javax.crypto.SecretKey;import javax.crypto.spec.SecretKeySpec;import java.util.Base64;public class AesEncryption {    private SecretKey secretKey;    public AesEncryption(String secretKey) {      byte[] keyBytes = Base64.getDecoder().decode(secretKey);      this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES");    }    public String encrypt(String data) {      try {            Cipher cipher = Cipher.getInstance("AES");            cipher.init(Cipher.ENCRYPT_MODE, secretKey);            byte[] encryptedBytes = cipher.doFinal(data.getBytes());            return Base64.getEncoder().encodeToString(encryptedBytes);      } catch (Exception e) {            throw new RuntimeException(e);      }    }    public String decrypt(String encryptedData) throws Exception {      Cipher cipher = Cipher.getInstance("AES");      cipher.init(Cipher.DECRYPT_MODE, secretKey);      byte[] decodedBytes = Base64.getDecoder().decode(encryptedData);      byte[] decryptedBytes = cipher.doFinal(decodedBytes);      return new String(decryptedBytes);    }}b. 配置KafkaConnector任务
{"name": "mzt_ods_cjm.ods_device-connect","config": {    "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",    "topics": "mzt_ods_cjm.ods_device",    "key.converter": "org.apache.kafka.connect.storage.StringConverter",    "value.converter": "org.apache.kafka.connect.storage.StringConverter",    "key.converter.schemas.enable": "true",    "value.converter.schemas.enable": "false",    "starrocks.http.url": "IP:8050,IP:8050,IP:8050",    "starrocks.topic2table.map": "mzt_ods_cjm.ods_device:ods_device",    "starrocks.username": "xxxxxxx",    "starrocks.password": "xxxxxx",    "starrocks.database.name": "ods_cjm",    "sink.properties.strip_outer_array": "true",    "sink.properties.columns": "id,company_name,company_id,secret_key,",    "sink.properties.jsonpaths": "[\"$.id\",\"$.company_name\",\"$.company_id\",\"$.secret_key\"]",    "transforms": "decrypt",    "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation",    "transforms.decrypt.secret.key": "5s8ekjRWkG/b64CGHHZYvQ=="}}四、备注


[*]starrocks-connector-for-kafka Kafka Connector是StarRocks数据源连接器
[*]DataX 批量数据同步工具
[*]kafka-console-ui Kakfa可视化控制台
[*]StarRocks-kafka-Connector 通过kafkaConnector导入数据到StarRocks
[*]StreamLoad实现数据增删改
[*]Kafka Connector的API列表
方法路径说明GET/connectors返回活动连接器的列表POST/connectors创建一个新的连接器; 请求主体应该是包含字符串name字段和config带有连接器配置参数的对象字段的JSON对象GET/connectors/获取有关特定连接器的信息GET/connectors/{name}/config获取特定连接器的配置参数PUT/connectors/{name}/config更新特定连接器的配置参数GET/connectors/{name}/status获取连接器的当前状态,包括连接器是否正在运行,失败,已暂停等,分配给哪个工作者,失败时的错误信息以及所有任务的状态GET/connectors/{name}/tasks获取当前为连接器运行的任务列表GET/connectors/{name}/tasks/{taskid}/status获取任务的当前状态,包括如果正在运行,失败,暂停等,分配给哪个工作人员,如果失败,则返回错误信息PUT/connectors/{name}/pause暂停连接器及其任务,停止消息处理,直到连接器恢复PUT/connectors/{name}/resume恢复暂停的连接器(或者,如果连接器未暂停,则不执行任何操作)POST/connectors/{name}/restart重新启动连接器(通常是因为失败)POST/connectors/{name}/tasks/{taskId}/restart重启个别任务(通常是因为失败)DELETE/connectors/删除连接器,停止所有任务并删除其配置
页: [1]
查看完整版本: 离线数据同步变迁