描述:
我通过Java以200条数据一批 写入MySQL表中,共写入10000条数据,在40秒左右写入完成,但通过该实时同步链路,Doris内部表同步完需要20+分钟且我调整Doris Kafka Connector中的buffer.count.records参数与buffer.flush.time参数的值对同步速度没有明显的影响。请使用过该链路进行数据实时同步的大佬指教。
1、部署信息:Linux 服务器 8核32G fe与be都部署在该服务器上,zookeeper,kafka,kafka connect通过docker compose也部署在该服务器上。
版本信息:doris 4.0.1,kafka connect 3.5.1,debezium 2.4.2,doris kafka connector的包是doris-kafka-connector-25.0.0.jar
2、MySQL建表语句如下:
CREATE TABLE user_behavior_detail (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
user_id varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '用户ID',
user_name varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '用户名',
email varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '邮箱',
phone char(11) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '手机号',
age tinyint(3) unsigned DEFAULT NULL COMMENT '年龄',
gender enum('MALE','FEMALE','OTHER') COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '性别',
birth_date date DEFAULT NULL COMMENT '出生日期',
register_time datetime NOT NULL COMMENT '注册时间',
last_login_time timestamp NULL DEFAULT NULL COMMENT '最后登录时间',
total_amount decimal(18,2) NOT NULL DEFAULT '0.00' COMMENT '总金额',
available_amount decimal(12,2) DEFAULT '0.00' COMMENT '可用金额',
frozen_amount decimal(12,2) DEFAULT '0.00' COMMENT '冻结金额',
height float(5,2) DEFAULT NULL COMMENT '身高(米)',
weight double(6,2) DEFAULT NULL COMMENT '体重(kg)',
is_vip tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否VIP',
is_active bit(1) NOT NULL DEFAULT b'1' COMMENT '是否激活',
user_level smallint(5) unsigned DEFAULT '1' COMMENT '用户等级',
login_count int(11) NOT NULL DEFAULT '0' COMMENT '登录次数',
order_count mediumint(8) unsigned DEFAULT '0' COMMENT '订单数',
country_code char(2) COLLATE utf8mb4_unicode_ci DEFAULT 'CN' COMMENT '国家代码',
province varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '省份',
city varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '城市',
district varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '区县',
address varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '详细地址',
postal_code varchar(10) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '邮政编码',
id_card_no varchar(18) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '身份证号',
bank_card_no varchar(20) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '银行卡号',
profile_json json DEFAULT NULL COMMENT '用户画像JSON',
interests set('MUSIC','SPORTS','READING','GAMES','TRAVEL') COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '兴趣爱好',
introduction text COLLATE utf8mb4_unicode_ci COMMENT '个人介绍',
signature varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '个性签名',
credit_score smallint(6) DEFAULT '100' COMMENT '信用分',
account_status varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'NORMAL' COMMENT '账户状态',
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
deleted tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记',
source_system varchar(30) COLLATE utf8mb4_unicode_ci DEFAULT 'WEB' COMMENT '来源系统',
device_type varchar(20) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '设备类型',
ip_address varchar(45) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'IP地址',
longitude decimal(10,7) DEFAULT NULL COMMENT '经度',
latitude decimal(10,7) DEFAULT NULL COMMENT '纬度',
PRIMARY KEY (id),
UNIQUE KEY uk_user_id (user_id),
KEY idx_email (email),
KEY idx_phone (phone),
KEY idx_register_time (register_time),
KEY idx_city_level (city,user_level),
KEY idx_update_time (update_time)
) ENGINE=InnoDB AUTO_INCREMENT=2004065754502139322 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户行为详情表';
3、Debezium Kafka Connector配置如下:
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "root",
"max.queue.size": "32768",
"database.server.id": "184054",
"tasks.max": "1",
"time.precision.mode": "connect",
"database.server.name": "dbserver1",
"schema.history.internal.kafka.bootstrap.servers": "kafka-standalone:9092",
"snapshot.max.threads": "11",
"schema.history.internal.skip.unparseable.ddl": "true",
"database.port": "3306",
"include.schema.changes": "true",
"topic.prefix": "dbserver1",
"schema.history.internal.kafka.topic": "schema-changelog",
"database.hostname": "ip",
"database.connectionTimeZone": "Asia/Shanghai",
"database.password": "pwd",
"poll.interval.ms": "1000",
"max.batch.size": "2000",
"database.include.list": "kafka_manager",
"snapshot.mode": "schema_only"
}
4、Doris Kafka Connector配置如下:
{
"connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
"doris.topic2table.map": "dbserver1.kafka_manager.user_behavior_detail:user_behavior_detail_doris3",
"doris.http.port": "8030",
"doris.user": "root",
"tasks.max": "3",
"topics": "dbserver1.kafka_manager.user_behavior_detail",
"buffer.flush.time": "10",
"sink.properties.group_commit": "async_mode",
"doris.urls": "ip",
"doris.query.port": "9030",
"enable.delete": "true",
"enable.combine.flush": "true",
"buffer.count.records": "10000",
"doris.database": "testdb",
"doris.password": "pwd",
"converter.mode": "debezium_ingestion",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"buffer.size.bytes": "20000000"
}
5、Doris建表语句如下:
CREATE TABLE user_behavior_detail_doris3 ( id bigint NULL COMMENT "主键ID", user_id varchar(50) NOT NULL COMMENT "用户ID", register_time datetime NULL COMMENT "注册时间", user_name varchar(100) NULL COMMENT "用户名", email varchar(255) NULL COMMENT "邮箱", phone varchar(11) NULL COMMENT "手机号", age tinyint NULL COMMENT "年龄", gender varchar(10) NULL COMMENT "性别", birth_date date NULL COMMENT "出生日期", last_login_time datetime NULL COMMENT "最后登录时间", total_amount decimal(18,2) NULL COMMENT "总金额", available_amount decimal(12,2) NULL COMMENT "可用金额", frozen_amount decimal(12,2) NULL COMMENT "冻结金额", height float NULL COMMENT "身高(米)", weight double NULL COMMENT "体重(kg)", is_vip boolean NULL COMMENT "是否VIP", is_active boolean NULL COMMENT "是否激活", user_level smallint NULL COMMENT "用户等级", login_count int NULL COMMENT "登录次数", order_count int NULL COMMENT "订单数", country_code varchar(2) NULL COMMENT "国家代码", province varchar(50) NULL COMMENT "省份", city varchar(50) NULL COMMENT "城市", district varchar(50) NULL COMMENT "区县", address varchar(500) NULL COMMENT "详细地址", postal_code varchar(10) NULL COMMENT "邮政编码", id_card_no varchar(18) NULL COMMENT "身份证号", bank_card_no varchar(20) NULL COMMENT "银行卡号", profile_json varchar(2000) NULL COMMENT "用户画像JSON", interests varchar(100) NULL COMMENT "兴趣爱好", introduction varchar(65533) NULL COMMENT "个人介绍", signature varchar(500) NULL COMMENT "个性签名", credit_score smallint NULL COMMENT "信用分", account_status varchar(20) NULL COMMENT "账户状态", create_time datetime NULL COMMENT "创建时间", update_time datetime NULL COMMENT "更新时间", deleted boolean NULL COMMENT "删除标记", source_system varchar(30) NULL COMMENT "来源系统", device_type varchar(20) NULL COMMENT "设备类型", ip_address varchar(45) NULL COMMENT "IP地址", longitude decimal(10,7) NULL COMMENT "经度", latitude decimal(10,7) NULL COMMENT "纬度" ) ENGINE=OLAP UNIQUE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 10 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "min_load_replica_num" = "-1", "is_being_synced" = "false", "storage_medium" = "hdd", "storage_format" = "V2", "inverted_index_storage_format" = "V3", "enable_unique_key_merge_on_write" = "true", "light_schema_change" = "true", "disable_auto_compaction" = "false", "enable_single_replica_compaction" = "false", "group_commit_interval_ms" = "10000", "group_commit_data_bytes" = "134217728", "enable_mow_light_delete" = "false" );