如题,2.1.8版本Doris,创建routine load如下:
CREATE ROUTINE LOAD load_test ON test_fact_cml
COLUMNS(kafka_message_row_uuid, kafka_biz_object_key, push_kafka_timestamp, session_uuid, kafka_biz_object_count,
session_total_count, matnr, werks, lifnr, datab, datbi, kbetr_usd, kbetr, konwa, kpein, ebeln, ebelp, update_timestamp,
ebeln_last_po, ebeln_last_po_item, last_po_date, system_id)
PROPERTIES(
"format"="json"
)
FROM KAFKA(
"kafka_broker_list" = "test1.com:30902,test2.com:30902,test3.com:30902",
"kafka_topic" = "ff.test_fact_cml",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.security.protocol"="SASL_SSL",
"property.sasl.mechanism"="SCRAM-SHA-512",
"property.sasl.username"="xxx",
"property.sasl.password"="xxx",
"property.group.id" = "xxx",
"property.ssl.ca.location" = "FILE:xxx.pem"
);
show routine load,查看状态PAUSED,报错信息:ReasonOfStateChanged: ErrorReason{code=errCode = 4, msg='errCode = 2, detailMessage = org.apache.doris.common.LoadException: errCode = 2, detailMessage = Failed to get real offsets of kafka topic: ff.test_fact_cml. error: errCode = 2, detai
lMessage = Failed to get info'}
be节点warn日志:
W20250422 17:42:42.120007 3802755 data_consumer.cpp:471] failed to get latest offset for partition: 0, err: Local: Timed out
W20250422 17:42:42.120244 3802755 status.h:415] meet error status: [INTERNAL_ERROR]failed to get latest offset for partition: 0, err: Local: Timed out
0# doris::KafkaDataConsumer::get_real_offsets_for_partitions(std::vector<doris::PIntegerPair, std::allocator<doris::PIntegerPair> > const&, std::vector<doris::PIntegerPair, std::allocator<doris::PIntegerPair> >*, int) at /var/local/ldb-toolchain
/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187
1# doris::RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions(doris::PKafkaMetaProxyRequest const&, std::vector<doris::PIntegerPair, std::allocator<doris::PIntegerPair> >*, int) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-
gnu/11/../../../../include/c++/11/bits/stl_vector.h:680
2# std::_Function_handler<void (), doris::PInternalServiceImpl::get_info(google::protobuf::RpcController*, doris::PProxyRequest const*, doris::PProxyResult*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) at /home/zcp/repo_c
enter/doris_release/doris/be/src/common/status.h:494
3# doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
4# doris::Thread::supervise_thread(void*) at /var/local/ldb-toolchain/bin/../usr/include/pthread.h:562
5# start_thread
6# __GI___clone