CREATE TABLE test
(
k1
bigint NOT NULL COMMENT 'k1',
k2
bigint NOT NULL COMMENT 'k2',
k3
bigint NOT NULL COMMENT 'k3',
id
bigint NOT NULL AUTO_INCREMENT(1) COMMENT '虚拟id',
credate
datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
) ENGINE=OLAP
UNIQUE KEY(k1
, k2
, k3
)
DISTRIBUTED BY HASH(k1
, k2
, k3
) BUCKETS 3 PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);
routeline任务脚本如下
CREATE ROUTINE LOAD test_load_json ON test
COLUMNS(k1,k2,k3)
PROPERTIES (
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format"="json"
)
FROM
KAFKA (
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test_topic",
"property.group.id" = "test_topic_01",
"property.client.id" = "test_topic_01",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
kafka 的 test_topic的消息 如下
{
"k1": 1,
"k2": 1,
"k3": 1,
} 发送两次后,发现 test表中的id 发生了变化,这是为什么呢