ROUTINE LOAD kafka可以将json转成struct类型吗

Viewed 1

求助大佬,kafka里消息存储的是maxwell同步过来的binlog,我想将binlog用ruotine load存储到表里,但是data字段一直解析不了,表结构和routine load如下所示

CREATE TABLE test.ods_test_inc (
    `dt` DATE COMMENT '分区日期yyyy-MM-dd',
    `type` STRING COMMENT '数据操作类型 insert/update/delete',
    `ts` BIGINT COMMENT 'binlog时间戳(秒级)',
    `data` STRUCT<
        id:STRING,
        wholesale_name:STRING
    > COMMENT '业务数据详情',
    `old` MAP<STRING,STRING> COMMENT '变更前数据'
)
ENGINE=OLAP
DUPLICATE KEY(`dt`)
COMMENT ''
PARTITION BY RANGE (`dt`)  
()
DISTRIBUTED BY HASH(`dt`)
PROPERTIES (
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-183",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "8"
);
CREATE ROUTINE LOAD test.ods_test_inc_job 
ON ods_test_inc
COLUMNS(
    `type`,
    `ts`,
    `data`,
    `old`,
    dt=from_unixtime(ts, 'yyyy-MM-dd')
)
PROPERTIES
(
    "format" = "json",  
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "30",
    "max_batch_rows" = "200000"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.0.54:9092,192.168.0.55:9092",
    "kafka_topic" = "test"
);
0 Answers