flink 版本:
1.14
doris 版本
5.7.99(腾讯云产品化doris,SELECT version() 查询结果,真实版本应该是2.1)
flink sql 写入doris
-- 必须设置 table.optimizer.deterministic-operator-uid-for-cdas=true 才可以启用无损加表能力
SET
table.optimizer.deterministic-operator-uid-for-cdas = true;
-- 可选:开启多 Source 复用,降低资源占用,提升稳定性
SET
table.optimizer.mysql-cdc-source.merge.enabled = true;
set pipeline.task-name-length=80;
CREATE DATABASE IF NOT EXISTS `default_catalog`.`xxx-service`;
CREATE TABLE IF NOT EXISTS `default_catalog`.`xxx-service`.`xxx_src` (
`id` INT NOT NULL,
`ticket_id` INT NOT NULL,
`attribute` STRING NOT NULL,
`previous` STRING NULL,
`current` STRING NULL,
`second_previous` STRING NULL,
`second_current` STRING NULL,
`third_previous` STRING NULL,
`third_current` STRING NULL,
`trigger_method` STRING NULL,
`trigger_user_id` INT NULL,
`trigger_customer_id` INT NULL,
`is_latest` TINYINT NULL,
`desc` STRING NULL,
`customer_id` INT NULL,
`remark` STRING NULL,
`created_at` TIMESTAMP NULL,
`updated_at` TIMESTAMP NULL,
`deleted` TINYINT NULL,
PRIMARY KEY(`id`)
NOT ENFORCED
) with (
'port' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'xxx',
'connector' = 'mysql-cdc',
'hostname' = 'xxxx'
);
CREATE TABLE IF NOT EXISTS `default_catalog`.`xxx-service`.`xxx_sink` (
`id` INT NOT NULL,
`id_to_time` TIMESTAMP NOT NULL,
`created_at` TIMESTAMP NULL,
`ticket_id` INT NOT NULL,
`attribute` STRING NOT NULL,
`previous` STRING NULL,
`current` STRING NULL,
`second_previous` STRING NULL,
`second_current` STRING NULL,
`third_previous` STRING NULL,
`third_current` STRING NULL,
`trigger_method` STRING NULL,
`trigger_user_id` INT NULL,
`trigger_customer_id` INT NULL,
`is_latest` TINYINT NULL,
`desc` STRING NULL,
`customer_id` INT NULL,
`remark` STRING NULL,
`updated_at` TIMESTAMP NULL,
`deleted` TINYINT NULL,
PRIMARY KEY(`id`,id_to_time)
NOT ENFORCED
) with (
'connector' = 'doris',
'fenodes' = 'xxx:8030',
'table.identifier' = 'xxx.xxx',
'username' = 'xxx',
'password' = 'xx-xx',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-2pc' = 'true',
'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'xxx'
);
INSERT INTO `default_catalog`.`xxx-service`.`xxx_sink` SELECT
`id`,
TO_TIMESTAMP(FROM_UNIXTIME(id, 'yyyy-MM-dd 00:00:00')) as id_to_time,
`created_at`,
`ticket_id`,
`attribute`,
`previous`,
`current`,
`second_previous`,
`second_current`,
`third_previous`,
`third_current`,
`trigger_method`,
`trigger_user_id`,
`trigger_customer_id`,
`is_latest`,
`desc`,
`customer_id`,
`remark`,
`updated_at`,
`deleted`
FROM `default_catalog`.`xxx-service`.`xxx_src`;
Doris 建表语句:
CREATE TABLE `ods_consult_tk_ticket_event` (
`id` int NOT NULL,
`id_to_time` datetime NOT NULL COMMENT "主键转时间",
`created_at` datetime NULL COMMENT "创建时间",
`ticket_id` int NOT NULL COMMENT "id",
`attribute` varchar(65533) NOT NULL COMMENT "",
`previous` varchar(65533) NULL COMMENT "",
`current` varchar(65533) NULL COMMENT "",
`second_previous` varchar(65533) NULL COMMENT "",
`second_current` varchar(65533) NULL COMMENT "",
`third_previous` varchar(65533) NULL COMMENT "",
`third_current` varchar(65533) NULL COMMENT "",
`trigger_method` varchar(65533) NULL COMMENT " ",
`trigger_user_id` int NULL COMMENT "",
`trigger_customer_id` int NULL COMMENT "",
`is_latest` tinyint NULL COMMENT ":0-否,1-是",
`desc` varchar(65533) NULL COMMENT "描述,",
`customer_id` int NULL COMMENT "",
`remark` varchar(65533) NULL COMMENT "备注,",
`updated_at` datetime NULL,
`deleted` tinyint NULL DEFAULT "0"
) ENGINE=OLAP
UNIQUE KEY(`id`, `id_to_time`)
COMMENT '工单状态变更记录表'
AUTO PARTITION BY RANGE (date_trunc(`id_to_time`, 'day'))
(PARTITION p000000001 VALUES [('1970-01-01 00:00:00'), ('1970-11-16 00:00:00')),
PARTITION p000000002 VALUES [('1970-11-16 00:00:00'), ('1971-05-21 00:00:00')),
PARTITION p000000003 VALUES [('1971-05-21 00:00:00'), ('1971-09-21 00:00:00')),
PARTITION p000000004 VALUES [('1971-09-21 00:00:00'), ('1971-10-08 00:00:00')),
PARTITION p000000005 VALUES [('1971-10-08 00:00:00'), ('1971-10-09 00:00:00')),
PARTITION p19711009000000 VALUES [('1971-10-09 00:00:00'), ('1971-10-10 00:00:00')),
PARTITION p19711010000000 VALUES [('1971-10-10 00:00:00'), ('1971-10-11 00:00:00')),
PARTITION p19711011000000 VALUES [('1971-10-11 00:00:00'), ('1971-10-12 00:00:00')),
PARTITION p19711012000000 VALUES [('1971-10-12 00:00:00'), ('1971-10-13 00:00:00')),
PARTITION p19711013000000 VALUES [('1971-10-13 00:00:00'), ('1971-10-14 00:00:00')),
PARTITION p19711014000000 VALUES [('1971-10-14 00:00:00'), ('1971-10-15 00:00:00')),
PARTITION p19711015000000 VALUES [('1971-10-15 00:00:00'), ('1971-10-16 00:00:00')),
PARTITION p19711016000000 VALUES [('1971-10-16 00:00:00'), ('1971-10-17 00:00:00')),
PARTITION p19711017000000 VALUES [('1971-10-17 00:00:00'), ('1971-10-18 00:00:00')),
PARTITION p19711018000000 VALUES [('1971-10-18 00:00:00'), ('1971-10-19 00:00:00')),
PARTITION p19711019000000 VALUES [('1971-10-19 00:00:00'), ('1971-10-20 00:00:00')),
PARTITION p19711020000000 VALUES [('1971-10-20 00:00:00'), ('1971-10-21 00:00:00')))
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);
单个flink 任务写入大概26 张表,三个flink 任务一起写,总涉及 78 张表。
doris配置:
内核版本
2.1(tencent-cdw-doris-2.1.10-a836df5-2506121)
高可用
读高可用
FE节点配置
标准型,4核16G / 3个节点 / 增强型SSD云硬盘200GB
BE节点配置
标准型,4核16G / 3个节点 / 增强型SSD云硬盘500GB
我这里光写入数据基本耗费所有cpu 资源,如何优化呢?