2 Answers

为啥要去掉column呢?

可以在sink端手动通过 header 指定column
image.png

1.我的目的是通过业务服务把设备地址信息推送到kafka,然后flink去读取后更新doris。但是推送过来的地址信息可能只有主键+其中一部分数据(相当于更新列是动态的),所以需要用到doris3.1.0的灵活部分列更新功能。
2.附件是你们官方的文档相关截图。
3.还有一个问题,比如我用的是以下代码去更新,推送到kafka不存在的列会被置为空,不太确定你们灵活列用这种方式是怎么用的 ,有没有demo。

tableEnv.getConfig().getConfiguration().setString("table.exec.state.ttl", "5 min");

    tableEnv.executeSql("CREATE TABLE gpsLocation(\n" +
            "imei bigint,\n" +
            "imeiStr string,\n" +
            "acc int,\n" +
            "cellID bigint,\n" +
            "course int,\n" +
            "gpsSatellites int,\n" +
            "lac bigint,\n" +
            "latitude bigint,\n" +
            "longitude bigint,\n" +
            "mcc int,\n" +
            "mnc int,\n" +
            "reportMode int,\n" +
            "speed int,\n" +
            "supplement int,\n" +
            "totalDistanceMeter bigint,\n" +
            "updateTime bigint,\n" +
            "reportTime bigint,\n" +
            "     et as TO_TIMESTAMP_LTZ(updateTime * 1000, 3),\n" +
            "     WATERMARK FOR et AS et - INTERVAL '5' SECOND,\n" +
            "  proc_time as PROCTIME()\n" +
            ") WITH (\n" +
            "  'connector' = 'kafka',\n" +
            "  'topic' = 'gps-location',\n" +
            "  'properties.bootstrap.servers' = '" + Constant.KAFKA_BROKERS + "', " +
            "  'properties.group.id' = 'gps-location',\n" +
            "  'scan.startup.mode' = 'latest-offset',\n" +
            "  'format' = 'json'\n" +
            ")");

    Table gpsLocation1 = tableEnv.sqlQuery(
            "select imei, updateTime * 1000 AS `time`, FROM_UNIXTIME(updateTime, 'yyyy-MM-dd') AS dt, imeiStr, acc, cellID, course, gpsSatellites, lac, latitude, longitude, mcc, mnc, reportMode, speed, supplement, totalDistanceMeter, updateTime, reportTime  " +
                    "from gpsLocation ");

    tableEnv.createTemporaryView("gpsLocation1",gpsLocation1);


    tableEnv.executeSql("CREATE TABLE ods_gps_location_info(\n" +
            "imei bigint,\n" +
            "`time` bigint,\n" +
            "dt string,\n" +
            "imeiStr string,\n" +
            "acc int,\n" +
            "cellID bigint,\n" +
            "course int,\n" +
            "gpsSatellites int,\n" +
            "lac bigint,\n" +
            "latitude bigint,\n" +
            "longitude bigint,\n" +
            "mcc int,\n" +
            "mnc int,\n" +
            "reportMode int,\n" +
            "speed int,\n" +
            "supplement int,\n" +
            "totalDistanceMeter bigint,\n" +
            "updateTime bigint,\n" +
            "reportTime bigint\n" +
            ") WITH (\n" +
            "  'connector' = 'doris',\n" +
            "  'fenodes' = '" + Constant.DORIS_FE_NODES + "',\n" +
            "  'table.identifier' = 'gps.ods_gps_location_info',\n" +
            "  'username' = '" + Constant.DORIS_USER_NAME + "',\n" +
            "  'password' = '" + Constant.DORIS_PASSWORD + "',\n" +
            "  'sink.enable.batch-mode' = 'true',\n" +
            "  'sink.buffer-flush.max-rows' = '5000',\n" +
            "  'sink.buffer-flush.interval' = '5s',\n" +
            "  'sink.label-prefix' = 'ods_gps_location_info',\n" +
            "  'sink.properties.unique_key_update_mode' = 'UPDATE_FLEXIBLE_COLUMNS',\n"+
            "  'sink.enable-delete' = 'true'\n" +
            ")");

    gpsLocation1.executeInsert("ods_gps_location_info");

image.png

image.png

image.png