flink-doris-connector的25.1.0版本使用CdcMysqlSyncDatabaseCase当任务启动后MySQL后新增的表无法同步,还需要其它什么设置

Viewed 15

flink-doris-connector的25.1.0版本使用CdcMysqlSyncDatabaseCase当任务启动后MySQL后新增的表无法同步,还需要其它什么设置吗

public static void main(String[] args) throws Exception {

        Map<String, String> flinkMap = new HashMap<>();
        flinkMap.put("execution.checkpointing.interval", "10s");
        flinkMap.put("pipeline.operator-chaining.enabled", "false");
        flinkMap.put("parallelism.default", "1");

        Configuration configuration = Configuration.fromMap(flinkMap);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        String database = "stg_cdc";

        Map<String, String> mysqlConfig = new HashMap<>();
        mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), "stg_cdc");
        mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), "192.168.6.90");
        mysqlConfig.put(MySqlSourceOptions.PORT.key(), "63306");
        mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "root");
        mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "");
        // add jdbc properties for MySQL
        mysqlConfig.put("jdbc.properties.use_ssl", "false");
        Configuration config = Configuration.fromMap(mysqlConfig);

        Map<String, String> sinkConfig = new HashMap<>();
        sinkConfig.put(DorisConfigOptions.FENODES.key(), "192.168.6.90:8030");
        sinkConfig.put(DorisConfigOptions.BENODES.key(), "192.168.6.90:8040");
        sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root");
        sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "");
        sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://192.168.6.90:9030");
        sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString());
        sinkConfig.put("sink.enable-delete", "false");
        Configuration sinkConf = Configuration.fromMap(sinkConfig);


        Map<String, String> tableConfig = new HashMap<>();
        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");

        DorisTableConfig dorisTableConfig = new DorisTableConfig(tableConfig);

        // String includingTables = "tbl1|tbl2|tbl3";



        boolean ignoreDefaultValue = false;
        boolean useNewSchemaChange = true;
        String schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();
        boolean singleSink = false;
        boolean ignoreIncompatible = false;
        DatabaseSync databaseSync = new MysqlDatabaseSync();
        databaseSync
                .setEnv(env)
                .setDatabase(database)
                .setConfig(config)
                .setIgnoreDefaultValue(ignoreDefaultValue)
                .setSinkConfig(sinkConf)
                .setTableConfig(dorisTableConfig)
                .setCreateTableOnly(false)
                .setNewSchemaChange(useNewSchemaChange)
                .setSchemaChangeMode(schemaChangeMode)
                .setSingleSink(singleSink)
                .setIgnoreIncompatible(ignoreIncompatible)
                .create();
        databaseSync.build();
        env.execute(String.format("MySQL-Doris Database Sync: %s", database));
    }
0 Answers