doris表是明细或主键表都不行,cdc能op=d:
protected static DorisSink<String> getDorisSink() {
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("192.168.100.87:8030")
.setTableIdentifier("emap_ods_ams.org")
.setUsername("emap")
.setPassword("MMmm@@2025")
.setAutoRedirect(true)
// StreamLoad batch 参数
// .setBatchRows(5000)
// .setBatchSize(5 * 1024 * 1024)
// .setEnable2PC(true) // 推荐:两阶段提交
.build();
DorisReadOptions dorisReadOptions = DorisReadOptions.builder().setRequestBatchSize(5 * 1024 * 1024).build();
// 配置 Doris Sink 使用 JSON 格式
Properties dorisSinkConfig = new Properties();
dorisSinkConfig.put("format", "json");
dorisSinkConfig.put("read_json_by_line", "true");
dorisSinkConfig.put("strip_outer_array", "false");
DorisExecutionOptions dorisExecutionOptions = DorisExecutionOptions.builder()
.setBufferCount(1)
.setStreamLoadProp(dorisSinkConfig)
.setBufferFlushMaxRows(10000)
.setBufferFlushMaxBytes(10 * 1024 * 1024)
.setBufferFlushIntervalMs(5000)
.setDeletable(true)
.enable2PC()
.setBatchMode(true)
.build();
return DorisSink.<String>builder()
.setDorisOptions(dorisOptions)
.setDorisReadOptions(dorisReadOptions)
.setDorisExecutionOptions(dorisExecutionOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisOptions)
.build())
.build();
}