
1.我的目的是通过业务服务把设备地址信息推送到kafka,然后flink去读取后更新doris。但是推送过来的地址信息可能只有主键+其中一部分数据(相当于更新列是动态的),所以需要用到doris3.1.0的灵活部分列更新功能。
2.附件是你们官方的文档相关截图。
3.还有一个问题,比如我用的是以下代码去更新,推送到kafka不存在的列会被置为空,不太确定你们灵活列用这种方式是怎么用的 ,有没有demo。
tableEnv.getConfig().getConfiguration().setString("table.exec.state.ttl", "5 min");
tableEnv.executeSql("CREATE TABLE gpsLocation(\n" +
"imei bigint,\n" +
"imeiStr string,\n" +
"acc int,\n" +
"cellID bigint,\n" +
"course int,\n" +
"gpsSatellites int,\n" +
"lac bigint,\n" +
"latitude bigint,\n" +
"longitude bigint,\n" +
"mcc int,\n" +
"mnc int,\n" +
"reportMode int,\n" +
"speed int,\n" +
"supplement int,\n" +
"totalDistanceMeter bigint,\n" +
"updateTime bigint,\n" +
"reportTime bigint,\n" +
" et as TO_TIMESTAMP_LTZ(updateTime * 1000, 3),\n" +
" WATERMARK FOR et AS et - INTERVAL '5' SECOND,\n" +
" proc_time as PROCTIME()\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'gps-location',\n" +
" 'properties.bootstrap.servers' = '" + Constant.KAFKA_BROKERS + "', " +
" 'properties.group.id' = 'gps-location',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")");
Table gpsLocation1 = tableEnv.sqlQuery(
"select imei, updateTime * 1000 AS `time`, FROM_UNIXTIME(updateTime, 'yyyy-MM-dd') AS dt, imeiStr, acc, cellID, course, gpsSatellites, lac, latitude, longitude, mcc, mnc, reportMode, speed, supplement, totalDistanceMeter, updateTime, reportTime " +
"from gpsLocation ");
tableEnv.createTemporaryView("gpsLocation1",gpsLocation1);
tableEnv.executeSql("CREATE TABLE ods_gps_location_info(\n" +
"imei bigint,\n" +
"`time` bigint,\n" +
"dt string,\n" +
"imeiStr string,\n" +
"acc int,\n" +
"cellID bigint,\n" +
"course int,\n" +
"gpsSatellites int,\n" +
"lac bigint,\n" +
"latitude bigint,\n" +
"longitude bigint,\n" +
"mcc int,\n" +
"mnc int,\n" +
"reportMode int,\n" +
"speed int,\n" +
"supplement int,\n" +
"totalDistanceMeter bigint,\n" +
"updateTime bigint,\n" +
"reportTime bigint\n" +
") WITH (\n" +
" 'connector' = 'doris',\n" +
" 'fenodes' = '" + Constant.DORIS_FE_NODES + "',\n" +
" 'table.identifier' = 'gps.ods_gps_location_info',\n" +
" 'username' = '" + Constant.DORIS_USER_NAME + "',\n" +
" 'password' = '" + Constant.DORIS_PASSWORD + "',\n" +
" 'sink.enable.batch-mode' = 'true',\n" +
" 'sink.buffer-flush.max-rows' = '5000',\n" +
" 'sink.buffer-flush.interval' = '5s',\n" +
" 'sink.label-prefix' = 'ods_gps_location_info',\n" +
" 'sink.properties.unique_key_update_mode' = 'UPDATE_FLEXIBLE_COLUMNS',\n"+
" 'sink.enable-delete' = 'true'\n" +
")");
gpsLocation1.executeInsert("ods_gps_location_info");


