flink-doris-connector的25.1.0版本使用CdcMysqlSyncDatabaseCase当任务启动后MySQL后新增的表无法同步,还需要其它什么设置吗
public static void main(String[] args) throws Exception {
Map<String, String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining.enabled", "false");
flinkMap.put("parallelism.default", "1");
Configuration configuration = Configuration.fromMap(flinkMap);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
String database = "stg_cdc";
Map<String, String> mysqlConfig = new HashMap<>();
mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), "stg_cdc");
mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), "192.168.6.90");
mysqlConfig.put(MySqlSourceOptions.PORT.key(), "63306");
mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), "root");
mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), "");
// add jdbc properties for MySQL
mysqlConfig.put("jdbc.properties.use_ssl", "false");
Configuration config = Configuration.fromMap(mysqlConfig);
Map<String, String> sinkConfig = new HashMap<>();
sinkConfig.put(DorisConfigOptions.FENODES.key(), "192.168.6.90:8030");
sinkConfig.put(DorisConfigOptions.BENODES.key(), "192.168.6.90:8040");
sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root");
sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "");
sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://192.168.6.90:9030");
sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString());
sinkConfig.put("sink.enable-delete", "false");
Configuration sinkConf = Configuration.fromMap(sinkConfig);
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
DorisTableConfig dorisTableConfig = new DorisTableConfig(tableConfig);
// String includingTables = "tbl1|tbl2|tbl3";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = true;
String schemaChangeMode = SchemaChangeMode.DEBEZIUM_STRUCTURE.getName();
boolean singleSink = false;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync
.setEnv(env)
.setDatabase(database)
.setConfig(config)
.setIgnoreDefaultValue(ignoreDefaultValue)
.setSinkConfig(sinkConf)
.setTableConfig(dorisTableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(useNewSchemaChange)
.setSchemaChangeMode(schemaChangeMode)
.setSingleSink(singleSink)
.setIgnoreIncompatible(ignoreIncompatible)
.create();
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));
}