flink 写入Doris cpu 持续上升异常

Viewed 19

flink 版本:
1.14
doris 版本
5.7.99(腾讯云产品化doris,SELECT version() 查询结果,真实版本应该是2.1)
flink sql 写入doris

-- 必须设置 table.optimizer.deterministic-operator-uid-for-cdas=true 才可以启用无损加表能力
SET
  table.optimizer.deterministic-operator-uid-for-cdas = true;
-- 可选:开启多 Source 复用,降低资源占用,提升稳定性
SET
  table.optimizer.mysql-cdc-source.merge.enabled = true;
set pipeline.task-name-length=80;

CREATE DATABASE IF NOT EXISTS `default_catalog`.`xxx-service`;

 CREATE TABLE IF NOT EXISTS `default_catalog`.`xxx-service`.`xxx_src` (
`id` INT NOT NULL,
`ticket_id` INT NOT NULL,
`attribute` STRING NOT NULL,
`previous` STRING NULL,
`current` STRING NULL,
`second_previous` STRING NULL,
`second_current` STRING NULL,
`third_previous` STRING NULL,
`third_current` STRING NULL,
`trigger_method` STRING NULL,
`trigger_user_id` INT NULL,
`trigger_customer_id` INT NULL,
`is_latest` TINYINT NULL,
`desc` STRING NULL,
`customer_id` INT NULL,
`remark` STRING NULL,
`created_at` TIMESTAMP NULL,
`updated_at` TIMESTAMP NULL,
`deleted` TINYINT NULL,
PRIMARY KEY(`id`)
NOT ENFORCED
) with (
'port' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'xxx',
'connector' = 'mysql-cdc',
'hostname' = 'xxxx'
);

CREATE TABLE IF NOT EXISTS `default_catalog`.`xxx-service`.`xxx_sink` (
`id` INT NOT NULL,
`id_to_time` TIMESTAMP NOT NULL,
`created_at` TIMESTAMP  NULL,
`ticket_id` INT NOT NULL,
`attribute` STRING NOT NULL,
`previous` STRING NULL,
`current` STRING NULL,
`second_previous` STRING NULL,
`second_current` STRING NULL,
`third_previous` STRING NULL,
`third_current` STRING NULL,
`trigger_method` STRING NULL,
`trigger_user_id` INT NULL,
`trigger_customer_id` INT NULL,
`is_latest` TINYINT NULL,
`desc` STRING NULL,
`customer_id` INT NULL,
`remark` STRING NULL,
`updated_at` TIMESTAMP NULL,
`deleted` TINYINT NULL,
PRIMARY KEY(`id`,id_to_time)
NOT ENFORCED
) with (
       'connector' = 'doris',
       'fenodes' = 'xxx:8030',
       'table.identifier' = 'xxx.xxx',
       'username' = 'xxx',
       'password' = 'xx-xx',
       'sink.properties.format' = 'json',
        'sink.properties.read_json_by_line' = 'true',
        'sink.enable-2pc' = 'true',
        'sink.enable-delete' = 'true',  -- 同步删除事件
       'sink.label-prefix' = 'xxx'
);




INSERT INTO `default_catalog`.`xxx-service`.`xxx_sink` SELECT
 `id`,
 TO_TIMESTAMP(FROM_UNIXTIME(id, 'yyyy-MM-dd 00:00:00')) as id_to_time,
 `created_at`,
 `ticket_id`,
 `attribute`,
 `previous`,
 `current`,
 `second_previous`,
 `second_current`,
 `third_previous`,
 `third_current`,
 `trigger_method`,
 `trigger_user_id`,
 `trigger_customer_id`,
 `is_latest`,
 `desc`,
 `customer_id`,
 `remark`,
 `updated_at`,
 `deleted`
 FROM `default_catalog`.`xxx-service`.`xxx_src`;

Doris 建表语句:

CREATE TABLE `ods_consult_tk_ticket_event` (
  `id` int NOT NULL,
  `id_to_time` datetime NOT NULL COMMENT "主键转时间",
  `created_at` datetime NULL COMMENT "创建时间",
  `ticket_id` int NOT NULL COMMENT "id",
  `attribute` varchar(65533) NOT NULL COMMENT "",
  `previous` varchar(65533) NULL COMMENT "",
  `current` varchar(65533) NULL COMMENT "",
  `second_previous` varchar(65533) NULL COMMENT "",
  `second_current` varchar(65533) NULL COMMENT "",
  `third_previous` varchar(65533) NULL COMMENT "",
  `third_current` varchar(65533) NULL COMMENT "",
  `trigger_method` varchar(65533) NULL COMMENT " ",
  `trigger_user_id` int NULL COMMENT "",
  `trigger_customer_id` int NULL COMMENT "",
  `is_latest` tinyint NULL COMMENT ":0-否,1-是",
  `desc` varchar(65533) NULL COMMENT "描述,",
  `customer_id` int NULL COMMENT "",
  `remark` varchar(65533) NULL COMMENT "备注,",
  `updated_at` datetime NULL,
  `deleted` tinyint NULL DEFAULT "0"
) ENGINE=OLAP
UNIQUE KEY(`id`, `id_to_time`)
COMMENT '工单状态变更记录表'
AUTO PARTITION BY RANGE (date_trunc(`id_to_time`, 'day'))
(PARTITION p000000001 VALUES [('1970-01-01 00:00:00'), ('1970-11-16 00:00:00')),
PARTITION p000000002 VALUES [('1970-11-16 00:00:00'), ('1971-05-21 00:00:00')),
PARTITION p000000003 VALUES [('1971-05-21 00:00:00'), ('1971-09-21 00:00:00')),
PARTITION p000000004 VALUES [('1971-09-21 00:00:00'), ('1971-10-08 00:00:00')),
PARTITION p000000005 VALUES [('1971-10-08 00:00:00'), ('1971-10-09 00:00:00')),
PARTITION p19711009000000 VALUES [('1971-10-09 00:00:00'), ('1971-10-10 00:00:00')),
PARTITION p19711010000000 VALUES [('1971-10-10 00:00:00'), ('1971-10-11 00:00:00')),
PARTITION p19711011000000 VALUES [('1971-10-11 00:00:00'), ('1971-10-12 00:00:00')),
PARTITION p19711012000000 VALUES [('1971-10-12 00:00:00'), ('1971-10-13 00:00:00')),
PARTITION p19711013000000 VALUES [('1971-10-13 00:00:00'), ('1971-10-14 00:00:00')),
PARTITION p19711014000000 VALUES [('1971-10-14 00:00:00'), ('1971-10-15 00:00:00')),
PARTITION p19711015000000 VALUES [('1971-10-15 00:00:00'), ('1971-10-16 00:00:00')),
PARTITION p19711016000000 VALUES [('1971-10-16 00:00:00'), ('1971-10-17 00:00:00')),
PARTITION p19711017000000 VALUES [('1971-10-17 00:00:00'), ('1971-10-18 00:00:00')),
PARTITION p19711018000000 VALUES [('1971-10-18 00:00:00'), ('1971-10-19 00:00:00')),
PARTITION p19711019000000 VALUES [('1971-10-19 00:00:00'), ('1971-10-20 00:00:00')),
PARTITION p19711020000000 VALUES [('1971-10-20 00:00:00'), ('1971-10-21 00:00:00')))
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);

单个flink 任务写入大概26 张表,三个flink 任务一起写,总涉及 78 张表。

doris配置:

内核版本
2.1(tencent-cdw-doris-2.1.10-a836df5-2506121)
高可用
读高可用
FE节点配置
标准型,4核16G / 3个节点 / 增强型SSD云硬盘200GB
BE节点配置
标准型,4核16G / 3个节点 / 增强型SSD云硬盘500GB

我这里光写入数据基本耗费所有cpu 资源,如何优化呢?

2 Answers

SELECT version() 是 mysql client的版本。Doris 的版本可以通过 show backends看 version。

涉及到 78张表的写入,集群总的核数只有 24c ,cpu资源太少。如果想进行资源隔离,你可以看看 workload group ,Workload Group 提供进程内的资源隔离能力。

不过你这个资源太少了,还得扩一下吧

哦,好,我看看相关资料,谢谢哦