【BUG】flink-doris-connector动态新增表后发生flink容错恢复导致新增表无法被监听

Viewed 22

现象

我们在使用该连接器的cdcTools工具进行mysql-Doris的cdc同步时,动态的新增了一张表,前期一切正常,数据也在正常同步,之后Doris侧该表数据不再增加,日志中有大量该表不被监听的警告信息:WARN org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumDataChange [] - filter table xx.xxx, because it is not listened, record detail is {xxx}

Bug发生条件

1、动态新增表
2、新增表之后flink自身发生过容错恢复,非人工从指定ck或sp重启

原理分析

该工具同步时有一张hash表维护着mysql表与Doris表的映射关系,该hash表在任务启动时读取mysql元数据创建,并在build DorisSink时将其同步给每个writer:
image.png

image.png

写入Doris时会从该hash表上取出映射关系:
image.png

同步过程中,新增表也会扩展该hash表:
image.png

即内存中的这张hash表会动态扩充,查看DorisWriter的状态快照源码发现其并没有将该hash表存入状态:
image.png
image.png

因此当整个flink任务由于某些原因(如网络抖动)触发了flink的容错恢复,恢复后每个writer拿到的hash表是一开始从mysql元数据生成的原始hash表,非同步过程中发生过扩展的hash表:
image.png

开始正常同步后,binlog中再出现新增表的插入或更新语句,会发现该hash表中没有其映射关系,就会忽略该表的同步:
image.png

修复方案

将该hash表存入状态保存?

1 Answers

如果新增表之后发生错误,理论上checkpoint不会成功,从上一次成功的ckpt回滚的时候,应该会去重放新增表那一条记录?