ROUTINE LOAD 消费kafka数据 内层json的时间字段如何解析出来

Viewed 2

doris版本:3.0.8
建表语句:
CREATE TABLE IF NOT EXISTS k8s_events_log1
(

event_timestamp DATETIME,
event_value TEXT,
first_timestamp DATETIME,
last_timestamp DATETIME,
creation_timestamp DATETIME

)
DUPLICATE KEY(event_timestamp )
PARTITION BY RANGE(event_timestamp)()
DISTRIBUTED BY HASH(creation_timestamp) BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"storage_medium" = "SSD",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_allocation" = "tag.location.default: 3"
);
ROUTINE LOAD任务:
CREATE ROUTINE LOAD k8s_events_kafka_load1 ON k8s_events_log1
COLUMNS(
event_value,
event_timestamp,
first_timestamp ,
last_timestamp = json_extract(event_value, '$.last_timestamp'),
creation_timestamp = json_extract(event_value, '$.metadata.creationTimestamp')
)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[
"$.EventValue",
"$.EventTimestamp",
"$.event_value.firstTimestamp",
"$.event_value.lastTimestamp"
]",
"desired_concurrent_number" = "10",
"max_batch_interval" = "3",
"max_batch_rows" = "1000000",
"max_batch_size" = "500000000",
"max_error_number" = "10000"
)
FROM KAFKA
(
"kafka_broker_list" = "xxxx:9092,exxxx:9092,xxxx:9092",
"kafka_topic" = "k8s-cluster-event-log",
"property.group.id" = "doris_k8s_events_consumer1",
"property.fetch.message.max.bytes" = "20971520",
"property.max.partition.fetch.bytes" = "20971520",
"property.fetch.max.bytes" = "104857600",
"property.max.poll.records" = "50000",
"property.session.timeout.ms" = "60000",
"property.heartbeat.interval.ms" = "20000",
"property.auto.offset.reset" = "earliest"
);

kafka中的数据示例

{
    "EventValue": "{\n \"metadata\": {  \"creationTimestamp\": \"2025-11-25T13:32:10Z\"\n }\n \"firstTimestamp\": \"2025-11-25T13:32:10Z\",\n \"lastTimestamp\": \"2025-12-01T11:15:30Z\"}",
    "EventTimestamp": "2025-12-01T11:15:30Z"
}

load后数据示例
first_timestamp last_timestamp creation_timestamp 三个内层json的时间字段数据无法解析,
event_timestamp 时间正常
image.png

0 Answers