为何Doris Routine Load仅部分消费Kafka分区并报"Offset out of range"​

Viewed 15

创建脚本:
CREATE ROUTINE LOAD threat_intelligence_log_routine_load ON threat_intelligence_log
COLUMNS (log_id, event_time, event_name, event_type, protocol_type, src_ip, src_port, dst_ip, dst_port, severity, attack_results, payload, event_description, filename, file_md5, dns_arecord, packet_data)
PROPERTIES (
"max_error_number" = "1000",
"max_filter_ratio" = "0.1",
"desired_concurrent_number" = "3",
"max_batch_interval" = "3",
"max_batch_rows" = "300000",
"max_batch_size" = "268435456",
"format" = "json",
"json_root" = "$.busiAnnotations"
)
FROM KAFKA(
"kafka_broker_list" = "XXX:9092,XXX:9092,XXX:9092",
"kafka_topic" = "XXXXX",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "OFFSET_BEGINNING,OFFSET_BEGINNING,OFFSET_BEGINNING",
"property.group.id" = "xxxxxx_log_topic_consumer_group_prod",
"property.client.id" = "xxx_doris_prod"
);

routine_load运行状态:
| Id | Name | CreateTime | PauseTime | EndTime | DbName | TableName | IsMultiTable | State | DataSourceType | CurrentTaskNum | JobProperties | DataSourceProperties | CustomProperties | Statistic | Progress | Lag | ReasonOfStateChanged | ErrorLogUrls | OtherMsg | User | Comment |
| 1763706395082 | xxxxxx_routine_load | 2025-11-25 01:24:54 | 2025-11-28 18:36:05 | NULL | sop | threat_intelligence_log | false | PAUSED | KAFKA | 0 | {"max_batch_rows":"300000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","load_to_single_tablet":"false","delete":"","current_concurrent_number":"3","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","max_batch_interval":"3","max_batch_size":"268435456","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"log_id,event_time,event_name,event_type,protocol_type,src_ip,src_port,dst_ip,dst_port,severity,attack_results,payload,event_description,filename,file_md5,dns_arecord,packet_data","whereExpr":"","desired_concurrent_number":"3","precedingFilter":"","format":"json","max_error_number":"1000","max_filter_ratio":"0.1","sequence_col":"*","json_root":"$.busiAnnotations","strip_outer_array":"false","num_as_string":"false"} | {"topic":"XXX","currentKafkaPartitions":"0,1,2","brokerList":"XXX:9092,XXX:9092,XXX:9092"} | {"group.id":"XXX_log_topic_consumer_group_prod","client.id":"XXX_doris_prod"} | {"receivedBytes":218506594,"runningTxns":[],"errorRows":0,"committedTaskNum":32950,"loadedRows":147748,"loadRowsRate":0,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":147748,"unselectedRows":0,"receivedBytesRate":680,"taskExecuteTimeMs":321061824} | {"0":"5854963","1":"5696258","2":"5860221"} | {"0":0,"1":84482,"2":0} | ErrorReason{code=errCode = 105, msg='be 1763706366490 abort task, task id: af1ec275-b81b-4859-9ef2-c038ca2761d3 job id: 1763706395082 with reason: [INTERNAL_ERROR]fetch failed due to requested offset not available on the broker: Broker: Offset out of range (broker 10001) the offset used by job does not exist in kafka, please check the offset, using the Alter ROUTINE LOAD command to modify it, and resume the job'} | | 2025-11-28 18:36:05:[INTERNAL_ERROR]fetch failed due to requested offset not available on the broker: Broker: Offset out of range (broker 10001) | root | |

问题说明:
1、其中xxxxxx_routine_load运行了一段时间,于2025-11-28 18:36:05暂停,报异常"fetch failed due to requested offset not available on the broker: Broker: Offset out of range (broker 10001) the offset used by job does not exist in kafka, please check the offset"

2、routine_load任务显示此topic分区1有积压,{"0":"5854963","1":"5696258","2":"5860221"} | {"0":0,"1":84482,"2":0}

3、通过kafka查看消费组消费情况,其中只消费了分区0和2,分区1一直未进行消费,但是routine_load的消费点位未更新,导致使用点位"1":"5696258"消费时找不到,从而报错
[root@isms-cu-5 bin]# sh kafka-consumer-groups.sh --bootstrap-server xxxx:9092,xxxx:9092,xxxx:9092 --describe --group xxxxx_log_topic_consumer_group_prod
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
xxxx_log_topic_consumer_group_prod xxxxx_44 0 5854964 5878125 23161 - - -
xxxx_log_topic_consumer_group_prod xxxxx_44 2 5860222 5883221 22999 - - -

4、通过kafka查看最早点位
[root@isms-cu-5 bin]# sh kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server xxx:9092,xxxx:9092,xxxx:9092 --topic xxx_44 --time -2
xxx_44:0:5809937
xxx_44:1:5780741
xxx_44:2:5814919

5、通过kafka查看最新点位
[root@isms-cu-5 bin]# sh kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server xxx:9092,xxx:9092,xxx:9092 --topic xxxx_44 --time -1
xxx_44:0:5878125
xxx_44:1:5876548
xxx_44:2:5883225

0 Answers