求助大佬,kafka里消息存储的是maxwell同步过来的binlog,我想将binlog用ruotine load存储到表里,但是data字段一直解析不了,表结构和routine load如下所示
CREATE TABLE test.ods_test_inc (
`dt` DATE COMMENT '分区日期yyyy-MM-dd',
`type` STRING COMMENT '数据操作类型 insert/update/delete',
`ts` BIGINT COMMENT 'binlog时间戳(秒级)',
`data` STRUCT<
id:STRING,
wholesale_name:STRING
> COMMENT '业务数据详情',
`old` MAP<STRING,STRING> COMMENT '变更前数据'
)
ENGINE=OLAP
DUPLICATE KEY(`dt`)
COMMENT ''
PARTITION BY RANGE (`dt`)
()
DISTRIBUTED BY HASH(`dt`)
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-183",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "8"
);
CREATE ROUTINE LOAD test.ods_test_inc_job
ON ods_test_inc
COLUMNS(
`type`,
`ts`,
`data`,
`old`,
dt=from_unixtime(ts, 'yyyy-MM-dd')
)
PROPERTIES
(
"format" = "json",
"desired_concurrent_number" = "3",
"max_batch_interval" = "30",
"max_batch_rows" = "200000"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.0.54:9092,192.168.0.55:9092",
"kafka_topic" = "test"
);