doris3.1.4使用Doris Kafka Connector消费kafka报错

Viewed 7

使用Distributed 模式启动,doris新建表有两个隐藏字段用于删除标记和多版本并发控制,但是doris kafka connector不会自动补多版本的,字段数量对不上会报错,Reason: There is no column matching jsonpaths in the json file, columns:[user_id, username, city, age, sex, phone, address, register_time, DORIS_DELETE_SIGN, ], please check columns and jsonpaths:. src line [{"op":"c","before":null,"after":{"address":"滕王阁","user_id":5,"city":"南昌","phone":345,"sex":1,"age":51,"register_time":"2026-04-22 08:45:11","username":"王五"},"source":{"db":"reemoon","table":"cs_a"}}]; 使用flink sql可以,但是doris kafka connector不行,想问下这个是正常还是我配置有问题

1 Answers

底层走的还是streamload,你尝试在 sink.properties. 中用jsonpath 取值,然后指定column,而且你这个取的是Debezium 格式 JSON 的嵌套字段映射。

  -H "format: json" \  
  -H "jsonpaths: [\"$.op\", \"$.after.user_id\", \"$.after.username\", \"$.after.city\", \"$.after.age\", \"$.after.sex\", \"$.after.phone\", \"$.after.address\", \"$.after.register_time\"]" \  
  -H "columns: op, user_id, username, city, age, sex, phone, address, register_time, __DORIS_DELETE_SIGN__ = if(op='d', 1, 0)" \