Flink 批处理支持部分列更新么?

Viewed 74

age 和 op_time列都被空值覆盖了
image.png
Doris建表语句

CREATE TABLE IF NOT EXISTS flinkImportUnique_partial (
    user_id BIGINT,
    name VARCHAR(20),
    age INT,
    op_time DATETIME
) UNIQUE KEY(`user_id`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 PROPERTIES (
    "replication_allocation" = "tag.location.default: 1",
    "enable_unique_key_merge_on_write" = "true"
);

Flink 语句
Mysql jdbc Source

CREATE TABLE MysqlFlinkExportToDorisUnique (
    user_id BIGINT,
    name VARCHAR(20),
    age INT,
    op_time TIMESTAMP,
    PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
    'password' = '********',
    'connector' = 'jdbc',
    'driver' = 'com.mysql.jdbc.Driver',
    'table-name' = 'flinkexportdorisuniq',
    'url' = 'jdbc:mysql://***.***.***.***:3306/flink?useOldAliasMetadataBehavior=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8',
    'username' = 'root'
);

Doris Sink

CREATE TABLE flinkImportToDorisUnique_partial (
    user_id BIGINT,
    name STRING,
    age INT,
    op_time TIMESTAMP,
    PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
    'password' = '********',
    'connector' = 'doris',
    'fenodes' = '***.***.***.***:8030',
    'table.identifier' = 'demo.flinkImportUnique_partial',
    'sink.properties.partial_columns' = 'true',
    'sink.properties.column' = 'user_id,name,__DORIS_DELETE_SIGN__',
    'username' = 'root'
);

加工语句

INSERT INTO flinkImportToDorisUnique_partial(user_id,name)
SELECT user_id,name
from MysqlFlinkExportToDorisUnique;
1 Answers

方法一:
设置这个之后再试试

set global enable_unique_key_partial_update=true

方法二:
使用flink-doris-connector
添加熟悉'sink.properties.partial_columns' = 'true',并且在sink.properties.column中指定要导入的列(必须包含所有 key 列,不然无法更新)