age 和 op_time列都被空值覆盖了
Doris建表语句
CREATE TABLE IF NOT EXISTS flinkImportUnique_partial (
user_id BIGINT,
name VARCHAR(20),
age INT,
op_time DATETIME
) UNIQUE KEY(`user_id`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
Flink 语句
Mysql jdbc Source
CREATE TABLE MysqlFlinkExportToDorisUnique (
user_id BIGINT,
name VARCHAR(20),
age INT,
op_time TIMESTAMP,
PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
'password' = '********',
'connector' = 'jdbc',
'driver' = 'com.mysql.jdbc.Driver',
'table-name' = 'flinkexportdorisuniq',
'url' = 'jdbc:mysql://***.***.***.***:3306/flink?useOldAliasMetadataBehavior=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8',
'username' = 'root'
);
Doris Sink
CREATE TABLE flinkImportToDorisUnique_partial (
user_id BIGINT,
name STRING,
age INT,
op_time TIMESTAMP,
PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
'password' = '********',
'connector' = 'doris',
'fenodes' = '***.***.***.***:8030',
'table.identifier' = 'demo.flinkImportUnique_partial',
'sink.properties.partial_columns' = 'true',
'sink.properties.column' = 'user_id,name,__DORIS_DELETE_SIGN__',
'username' = 'root'
);
加工语句
INSERT INTO flinkImportToDorisUnique_partial(user_id,name)
SELECT user_id,name
from MysqlFlinkExportToDorisUnique;