Doris版本:2.0.8
目标表数据量:71669405条
kafka导入任务创建语句:
CREATE ROUTINE LOAD scada_jx1.jx1scada_error_time ON jx1_scada_error_time
PROPERTIES
(
"desired_concurrent_number"="1",
"format" = "json",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "*.*.*.*:****",
"kafka_topic" = "error_time",
"property.group.id" = "jx1scada_kafka_doris",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol"="SASL_PLAINTEXT",
"property.sasl.mechanism"="PLAIN",
"property.sasl.username"="***",
"property.sasl.password"="***"
);
目标表建表语句:
create table scada_jx1.jx1_scada_error_time
(
deviceCode varchar(128),
tagCode varchar(128),
Ts varchar(128),
preValue string,
endValue string,
modelId string,
typeName string,
unionCode string,
min string,
startTime string,
startValue string,
processType string,
max string,
overrunFlag string,
topics string,
dataType string,
preTime string,
tagName string,
curValue string,
processFlag string,
ruleFlag string,
curTime string,
round string,
saveFlag string,
typeId string,
endTime string,
businessType string,
`status` string
)unique key(deviceCode,tagCode,TS)
COMMENT "scada数据"
distributed by hash(deviceCode) buckets 3
properties
(
"replication_allocation" = "tag.location.default: 3",
"enable_unique_key_merge_on_write"="true"
);
-- 查看导入作业情况
SHOW ROUTINE LOAD;
详细信息
93537,jx1scada_error_time,2024-06-12 16:21:10,2024-07-22 12:58:10,,default_cluster:scada_jx1,jx1_scada_error_time,false,PAUSED,KAFKA,0,"{""max_batch_rows"":""200000"",""timezone"":""Asia/Shanghai"",""send_batch_parallelism"":""1"",""load_to_single_tablet"":""false"",""current_concurrent_number"":""0"",""delete"":""*"",""partial_columns"":""false"",""merge_type"":""APPEND"",""exec_mem_limit"":""2147483648"",""strict_mode"":""false"",""jsonpaths"":"""",""max_batch_interval"":""10"",""max_batch_size"":""104857600"",""fuzzy_parse"":""false"",""partitions"":""*"",""columnToColumnExpr"":"""",""whereExpr"":""*"",""desired_concurrent_number"":""1"",""precedingFilter"":""*"",""format"":""json"",""max_error_number"":""0"",""max_filter_ratio"":""1.0"",""json_root"":"""",""strip_outer_array"":""false"",""num_as_string"":""false""}","{""topic"":""error_time"",""currentKafkaPartitions"":"""",""brokerList"":""*.*.*.*:****""}","{""security.protocol"":""SASL_PLAINTEXT"",""sasl.username"":""base001"",""sasl.mechanism"":""PLAIN"",""kafka_default_offsets"":""OFFSET_BEGINNING"",""group.id"":""jx1scada_kafka_doris"",""sasl.password"":""base001""}","{""receivedBytes"":36317343660,""runningTxns"":[],""errorRows"":0,""committedTaskNum"":352322,""loadedRows"":64154130,""loadRowsRate"":13,""abortedTaskNum"":12,""errorRowsAfterResumed"":0,""totalRows"":64154130,""unselectedRows"":0,""receivedBytesRate"":7778,""taskExecuteTimeMs"":4668752299}","{""0"":""24937910"",""1"":""19756524"",""2"":""18726076""}","{""0"":-1,""1"":-1,""2"":-1}","","","",root,""
通过Offset查Kafka有新数据:
原因分析:
导入作业被暂停。
疑问:并未手动暂停过任务,集群重启过,如何排查导入任务为何被暂停?