使用flink-doris-connector-1.17 25.1.0同步数据,新增更新操作能成功同步,删除操作不生效

Viewed 15

doris表是明细或主键表都不行,cdc能op=d:

protected static DorisSink<String> getDorisSink() {
        DorisOptions dorisOptions = DorisOptions.builder()
                .setFenodes("192.168.100.87:8030")
                .setTableIdentifier("emap_ods_ams.org")
                .setUsername("emap")
                .setPassword("MMmm@@2025")
                .setAutoRedirect(true)
                // StreamLoad batch 参数
//                .setBatchRows(5000)
//                .setBatchSize(5 * 1024 * 1024)
//                .setEnable2PC(true) // 推荐:两阶段提交
                .build();

        DorisReadOptions dorisReadOptions = DorisReadOptions.builder().setRequestBatchSize(5 * 1024 * 1024).build();

        // 配置 Doris Sink 使用 JSON 格式
        Properties dorisSinkConfig = new Properties();
        dorisSinkConfig.put("format", "json");
        dorisSinkConfig.put("read_json_by_line", "true");
        dorisSinkConfig.put("strip_outer_array", "false");

        DorisExecutionOptions dorisExecutionOptions = DorisExecutionOptions.builder()
                .setBufferCount(1)
                .setStreamLoadProp(dorisSinkConfig)
                .setBufferFlushMaxRows(10000)
                .setBufferFlushMaxBytes(10 * 1024 * 1024)
                .setBufferFlushIntervalMs(5000)
                .setDeletable(true)
                .enable2PC()
                .setBatchMode(true)
                .build();

        return DorisSink.<String>builder()
                .setDorisOptions(dorisOptions)
                .setDorisReadOptions(dorisReadOptions)
                .setDorisExecutionOptions(dorisExecutionOptions)
                .setSerializer(JsonDebeziumSchemaSerializer.builder()
                        .setDorisOptions(dorisOptions)
                        .build())
                .build();
    }
0 Answers