create routine load报错

Viewed 12

如题,2.1.8版本Doris,创建routine load如下:

CREATE ROUTINE LOAD load_test ON test_fact_cml

COLUMNS(kafka_message_row_uuid, kafka_biz_object_key, push_kafka_timestamp, session_uuid, kafka_biz_object_count, 

session_total_count, matnr, werks, lifnr, datab, datbi, kbetr_usd, kbetr, konwa, kpein, ebeln, ebelp, update_timestamp, 

ebeln_last_po, ebeln_last_po_item, last_po_date, system_id)

PROPERTIES(

    "format"="json"

)

FROM KAFKA(

    "kafka_broker_list" = "test1.com:30902,test2.com:30902,test3.com:30902",

    "kafka_topic" = "ff.test_fact_cml",

    "property.kafka_default_offsets" = "OFFSET_BEGINNING",

    "property.security.protocol"="SASL_SSL",

    "property.sasl.mechanism"="SCRAM-SHA-512",

    "property.sasl.username"="xxx",

    "property.sasl.password"="xxx",

    "property.group.id" = "xxx",

    "property.ssl.ca.location" = "FILE:xxx.pem"

);

show routine load,查看状态PAUSED,报错信息:ReasonOfStateChanged: ErrorReason{code=errCode = 4, msg='errCode = 2, detailMessage = org.apache.doris.common.LoadException: errCode = 2, detailMessage = Failed to get real offsets of kafka topic: ff.test_fact_cml. error: errCode = 2, detai
lMessage = Failed to get info'}

be节点warn日志:

W20250422 17:42:42.120007 3802755 data_consumer.cpp:471] failed to get latest offset for partition: 0, err: Local: Timed out
W20250422 17:42:42.120244 3802755 status.h:415] meet error status: [INTERNAL_ERROR]failed to get latest offset for partition: 0, err: Local: Timed out

        0#  doris::KafkaDataConsumer::get_real_offsets_for_partitions(std::vector<doris::PIntegerPair, std::allocator<doris::PIntegerPair> > const&, std::vector<doris::PIntegerPair, std::allocator<doris::PIntegerPair> >*, int) at /var/local/ldb-toolchain
/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187
        1#  doris::RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions(doris::PKafkaMetaProxyRequest const&, std::vector<doris::PIntegerPair, std::allocator<doris::PIntegerPair> >*, int) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-
gnu/11/../../../../include/c++/11/bits/stl_vector.h:680
        2#  std::_Function_handler<void (), doris::PInternalServiceImpl::get_info(google::protobuf::RpcController*, doris::PProxyRequest const*, doris::PProxyResult*, google::protobuf::Closure*)::$_0>::_M_invoke(std::_Any_data const&) at /home/zcp/repo_c
enter/doris_release/doris/be/src/common/status.h:494
        3#  doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
        4#  doris::Thread::supervise_thread(void*) at /var/local/ldb-toolchain/bin/../usr/include/pthread.h:562
        5#  start_thread
        6#  __GI___clone
1 Answers