ROUTINE LOAD 消费kafka数据 内层json的时间字段如何解析出来

Viewed 17

doris版本:3.0.8
建表语句:
CREATE TABLE IF NOT EXISTS k8s_events_log1
(

event_timestamp DATETIME,
event_value TEXT,
first_timestamp DATETIME,
last_timestamp DATETIME,
creation_timestamp DATETIME

)
DUPLICATE KEY(event_timestamp )
PARTITION BY RANGE(event_timestamp)()
DISTRIBUTED BY HASH(creation_timestamp) BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"storage_medium" = "SSD",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_allocation" = "tag.location.default: 3"
);
ROUTINE LOAD任务:
CREATE ROUTINE LOAD k8s_events_kafka_load1 ON k8s_events_log1
COLUMNS(
event_value,
event_timestamp,
first_timestamp ,
last_timestamp = json_extract(event_value, '$.last_timestamp'),
creation_timestamp = json_extract(event_value, '$.metadata.creationTimestamp')
)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[
"$.EventValue",
"$.EventTimestamp",
"$.event_value.firstTimestamp",
"$.event_value.lastTimestamp"
]",
"desired_concurrent_number" = "10",
"max_batch_interval" = "3",
"max_batch_rows" = "1000000",
"max_batch_size" = "500000000",
"max_error_number" = "10000"
)
FROM KAFKA
(
"kafka_broker_list" = "xxxx:9092,exxxx:9092,xxxx:9092",
"kafka_topic" = "k8s-cluster-event-log",
"property.group.id" = "doris_k8s_events_consumer1",
"property.fetch.message.max.bytes" = "20971520",
"property.max.partition.fetch.bytes" = "20971520",
"property.fetch.max.bytes" = "104857600",
"property.max.poll.records" = "50000",
"property.session.timeout.ms" = "60000",
"property.heartbeat.interval.ms" = "20000",
"property.auto.offset.reset" = "earliest"
);

kafka中的数据示例

{
    "EventValue": "{\n \"metadata\": {  \"creationTimestamp\": \"2025-11-25T13:32:10Z\"\n }\n \"firstTimestamp\": \"2025-11-25T13:32:10Z\",\n \"lastTimestamp\": \"2025-12-01T11:15:30Z\"}",
    "EventTimestamp": "2025-12-01T11:15:30Z"
}

load后数据示例
first_timestamp last_timestamp creation_timestamp 三个内层json的时间字段数据无法解析,
event_timestamp 时间正常
image.png

2 Answers

image.png

这个json 看着有问题,EventValue的values 是一个string 串,没办法用。jsonpath的两层穿透

原始数据的json做了下精简,我去掉很多内层很多的其他字段,
doris表中的event_value可以被解析如下(去掉了一些无关字段)
{
"metadata": {
"resourceVersion": "810559006",
"creationTimestamp": "2025-12-05T16:32:05Z"
},
"reason": "NodeNotSchedulable",
"firstTimestamp": "2024-12-05T14:26:06Z",
"lastTimestamp": "2025-12-10T01:40:10Z",
"count": 321,
"type": "Normal",
"eventTime": null
}
我在doris表中加了两个字段,验证了下EventValue的values应该是能被jsonpath解析的
count BIGINT,
type VARCHAR(20)
在ROUTINE LOAD中加了字符串和int类型的转化
count = json_extract(event_value, '$.count'),
type = json_extract(event_value, '$.type')
image.png
目前看doris表中count和type 是可以被解析出来的?
看起来只有时间字段没有解析出来