新建了两张表:
CREATE TABLE dwd_equip_charge_log
(
operator_id
varchar(200) NOT NULL COMMENT '运营商编码',
connector_id
varchar(200) NOT NULL COMMENT '充电设备接口编码',
station_id
varchar(32) NULL COMMENT '充电站编码',
equipment_id
varchar(32) NULL COMMENT '充电设备编码',
start_charge_seq
varchar(200) NOT NULL COMMENT '充电订单号',
start_charge_seq_stat
smallint NOT NULL COMMENT '充电订单状态| 1,启动中 2,充电中 3,停止中 4,已结束 5,未知',
connector_status
smallint NOT NULL COMMENT '充电设备接口状态| 1,空闲 2,占用(未充电) 3,占用(充电中) 4,占用(预约锁定) 255,故障',
current_a
decimal(10,2) NOT NULL COMMENT '电流A',
current_b
decimal(10,2) NULL COMMENT '电流B',
current_c
decimal(10,2) NULL COMMENT '电流C',
voltage_a
decimal(10,2) NOT NULL COMMENT '电压A',
voltage_b
decimal(10,2) NULL COMMENT '电压B',
voltage_c
decimal(10,2) NULL COMMENT '电压C',
soc
decimal(10,2) NOT NULL COMMENT 'SOC电池剩余电量',
start_time
datetime NOT NULL COMMENT '充电开始时间',
end_time
datetime NOT NULL COMMENT '充电结束时间',
total_power
decimal(10,2) NOT NULL COMMENT '总充电量',
elec_money
decimal(10,2) NULL COMMENT '充电金额',
service_money
decimal(10,2) NULL COMMENT '服务费',
total_money
decimal(10,2) NULL COMMENT '总金额',
sum_period
int NULL COMMENT '总充电时长',
detail_start_time
datetime NULL COMMENT '充电明细开始时间',
detail_end_time
datetime NULL COMMENT '充电明细结束时间',
elec_power
decimal(10,2) NULL COMMENT '充电明细电量',
service_price
decimal(10,2) NULL COMMENT '服务费单价',
detail_power
decimal(10,2) NULL COMMENT '充电明细电量',
detail_elec_money
decimal(10,2) NULL COMMENT '充电明细金额',
detail_service_money
decimal(10,2) NULL COMMENT '充电明细服务费',
table_insert_time
datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '技术字段,数据插入时间',
power
decimal(10,2) NULL COMMENT '功率'
) ENGINE=OLAP
DUPLICATE KEY(operator_id
)
COMMENT '充电设备流水表_明细层'
PARTITION BY RANGE(table_insert_time
)
(PARTITION p202412 VALUES [('2024-12-01 00:00:00'), ('2025-01-01 00:00:00')),
PARTITION p202501 VALUES [('2025-01-01 00:00:00'), ('2025-02-01 00:00:00')),
PARTITION p202502 VALUES [('2025-02-01 00:00:00'), ('2025-03-01 00:00:00')),
PARTITION p202503 VALUES [('2025-03-01 00:00:00'), ('2025-04-01 00:00:00')),
PARTITION p202504 VALUES [('2025-04-01 00:00:00'), ('2025-05-01 00:00:00')),
PARTITION p202505 VALUES [('2025-05-01 00:00:00'), ('2025-06-01 00:00:00')),
PARTITION p202506 VALUES [('2025-06-01 00:00:00'), ('2025-07-01 00:00:00')),
PARTITION p202507 VALUES [('2025-07-01 00:00:00'), ('2025-08-01 00:00:00')),
PARTITION p202508 VALUES [('2025-08-01 00:00:00'), ('2025-09-01 00:00:00')),
PARTITION p202509 VALUES [('2025-09-01 00:00:00'), ('2025-10-01 00:00:00')),
PARTITION p202510 VALUES [('2025-10-01 00:00:00'), ('2025-11-01 00:00:00')),
PARTITION p202511 VALUES [('2025-11-01 00:00:00'), ('2025-12-01 00:00:00')),
PARTITION p202512 VALUES [('2025-12-01 00:00:00'), ('2026-01-01 00:00:00')),
PARTITION p202601 VALUES [('2026-01-01 00:00:00'), ('2026-02-01 00:00:00')),
PARTITION p202602 VALUES [('2026-02-01 00:00:00'), ('2026-03-01 00:00:00')),
PARTITION p202603 VALUES [('2026-03-01 00:00:00'), ('2026-04-01 00:00:00')),
PARTITION p202604 VALUES [('2026-04-01 00:00:00'), ('2026-05-01 00:00:00')),
PARTITION p202605 VALUES [('2026-05-01 00:00:00'), ('2026-06-01 00:00:00')),
PARTITION p202606 VALUES [('2026-06-01 00:00:00'), ('2026-07-01 00:00:00')),
PARTITION p202607 VALUES [('2026-07-01 00:00:00'), ('2026-08-01 00:00:00')),
PARTITION p202608 VALUES [('2026-08-01 00:00:00'), ('2026-09-01 00:00:00')),
PARTITION p202609 VALUES [('2026-09-01 00:00:00'), ('2026-10-01 00:00:00')),
PARTITION p202610 VALUES [('2026-10-01 00:00:00'), ('2026-11-01 00:00:00')),
PARTITION p202611 VALUES [('2026-11-01 00:00:00'), ('2026-12-01 00:00:00')),
PARTITION p202612 VALUES [('2026-12-01 00:00:00'), ('2027-01-01 00:00:00')),
PARTITION p202701 VALUES [('2027-01-01 00:00:00'), ('2027-02-01 00:00:00')),
PARTITION p202702 VALUES [('2027-02-01 00:00:00'), ('2027-03-01 00:00:00')),
PARTITION p202703 VALUES [('2027-03-01 00:00:00'), ('2027-04-01 00:00:00')),
PARTITION p202704 VALUES [('2027-04-01 00:00:00'), ('2027-05-01 00:00:00')),
PARTITION p202705 VALUES [('2027-05-01 00:00:00'), ('2027-06-01 00:00:00')),
PARTITION p202706 VALUES [('2027-06-01 00:00:00'), ('2027-07-01 00:00:00')),
PARTITION p202707 VALUES [('2027-07-01 00:00:00'), ('2027-08-01 00:00:00')),
PARTITION p202708 VALUES [('2027-08-01 00:00:00'), ('2027-09-01 00:00:00')),
PARTITION p202709 VALUES [('2027-09-01 00:00:00'), ('2027-10-01 00:00:00')),
PARTITION p202710 VALUES [('2027-10-01 00:00:00'), ('2027-11-01 00:00:00')),
PARTITION p202711 VALUES [('2027-11-01 00:00:00'), ('2027-12-01 00:00:00')),
PARTITION p202712 VALUES [('2027-12-01 00:00:00'), ('2028-01-01 00:00:00')),
PARTITION p202801 VALUES [('2028-01-01 00:00:00'), ('2028-02-01 00:00:00')),
PARTITION p202802 VALUES [('2028-02-01 00:00:00'), ('2028-03-01 00:00:00')),
PARTITION p202803 VALUES [('2028-03-01 00:00:00'), ('2028-04-01 00:00:00')),
PARTITION p202804 VALUES [('2028-04-01 00:00:00'), ('2028-05-01 00:00:00')),
PARTITION p202805 VALUES [('2028-05-01 00:00:00'), ('2028-06-01 00:00:00')),
PARTITION p202806 VALUES [('2028-06-01 00:00:00'), ('2028-07-01 00:00:00')),
PARTITION p202807 VALUES [('2028-07-01 00:00:00'), ('2028-08-01 00:00:00')),
PARTITION p202808 VALUES [('2028-08-01 00:00:00'), ('2028-09-01 00:00:00')),
PARTITION p202809 VALUES [('2028-09-01 00:00:00'), ('2028-10-01 00:00:00')),
PARTITION p202810 VALUES [('2028-10-01 00:00:00'), ('2028-11-01 00:00:00')),
PARTITION p202811 VALUES [('2028-11-01 00:00:00'), ('2028-12-01 00:00:00')),
PARTITION p202812 VALUES [('2028-12-01 00:00:00'), ('2029-01-01 00:00:00')))
DISTRIBUTED BY HASH(start_time
) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 2",
"is_being_synced" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 2",
"dynamic_partition.buckets" = "8",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "-1",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"dynamic_partition.start_day_of_month" = "1",
"storage_medium" = "hdd",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"enable_mow_light_delete" = "false"
);
CREATE TABLE dwd_equip_charge_status
(
operator_id
varchar(200) NOT NULL COMMENT '运营商编码',
connector_id
varchar(200) NOT NULL COMMENT '充电设备接口编码',
station_id
varchar(32) NULL COMMENT '充电站编码',
equipment_id
varchar(32) NULL COMMENT '充电设备编码',
start_charge_seq
varchar(200) NOT NULL COMMENT '充电订单号',
start_charge_seq_stat
smallint NOT NULL COMMENT '充电订单状态| 1,启动中 2,充电中 3,停止中 4,已结束 5,未知',
connector_status
smallint NOT NULL COMMENT '充电设备接口状态| 1,空闲 2,占用(未充电) 3,占用(充电中) 4,占用(预约锁定) 255,故障',
current_a
decimal(10,2) NOT NULL COMMENT '电流A',
current_b
decimal(10,2) NULL COMMENT '电流B',
current_c
decimal(10,2) NULL COMMENT '电流C',
voltage_a
decimal(10,2) NOT NULL COMMENT '电压A',
voltage_b
decimal(10,2) NULL COMMENT '电压B',
voltage_c
decimal(10,2) NULL COMMENT '电压C',
soc
decimal(10,2) NOT NULL COMMENT 'SOC电池剩余电量',
start_time
datetime NOT NULL COMMENT '充电开始时间',
end_time
datetime NOT NULL COMMENT '充电结束时间',
total_power
decimal(10,2) NOT NULL COMMENT '总充电量',
elec_money
decimal(10,2) NULL COMMENT '充电金额',
service_money
decimal(10,2) NULL COMMENT '服务费',
total_money
decimal(10,2) NULL COMMENT '总金额',
sum_period
int NULL COMMENT '总充电时长',
detail_start_time
datetime NULL COMMENT '充电明细开始时间',
detail_end_time
datetime NULL COMMENT '充电明细结束时间',
elec_power
decimal(10,2) NULL COMMENT '充电明细电量',
service_price
decimal(10,2) NULL COMMENT '服务费单价',
detail_power
decimal(10,2) NULL COMMENT '充电明细电量',
detail_elec_money
decimal(10,2) NULL COMMENT '充电明细金额',
detail_service_money
decimal(10,2) NULL COMMENT '充电明细服务费',
table_insert_time
datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '技术字段,积累插入时间'
) ENGINE=OLAP
UNIQUE KEY(operator_id
, connector_id
)
COMMENT 'dwd_设备充电状态'
DISTRIBUTED BY HASH(operator_id
) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 2",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"enable_mow_light_delete" = "false"
);
后新建了两个routine load 任务,具体如下:
CREATE ROUTINE LOAD dwd_equip_charge_status_task ON dwd_equip_charge_status
WITH APPEND
COLUMNS(operator_id,station_id,equipment_id,connector_id,start_charge_seq,start_charge_seq_stat,connector_status,current_a,current_b,current_c,voltage_a,voltage_b,voltage_c,soc,start_time,end_time,total_power,elec_money,service_money,total_money,sum_period,detail_start_time,detail_end_time,elec_power,service_price,detail_power,detail_elec_money,detail_service_money)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_error_number" = "0",
"max_filter_ratio" = "1.0",
"max_batch_interval" = "10",
"max_batch_rows" = "20000000",
"max_batch_size" = "1073741824",
"format" = "json",
"jsonpaths" = "[
"$.operatorId",
"$.stationId",
"$.equipmentId",
"$.EquipmentChargeStatus.ConnectorID",
"$.EquipmentChargeStatus.StartChargeSeq",
"$.EquipmentChargeStatus.StartChargeSeqStat",
"$.EquipmentChargeStatus.ConnectorStatus",
"$.EquipmentChargeStatus.CurrentA",
"$.EquipmentChargeStatus.CurrentB",
"$.EquipmentChargeStatus.CurrentC",
"$.EquipmentChargeStatus.VoltageA",
"$.EquipmentChargeStatus.VoltageB",
"$.EquipmentChargeStatus.VoltageC",
"$.EquipmentChargeStatus.Soc",
"$.EquipmentChargeStatus.StartTime",
"$.EquipmentChargeStatus.EndTime",
"$.EquipmentChargeStatus.TotalPower",
"$.EquipmentChargeStatus.ElecMoney",
"$.EquipmentChargeStatus.ServiceMoney",
"$.EquipmentChargeStatus.TotalMoney",
"$.EquipmentChargeStatus.SumPeriod"
]",
"strip_outer_array" = "false",
"num_as_string" = "false",
"fuzzy_parse" = "false",
"strict_mode" = "false",
"timezone" = "Asia/Shanghai",
"exec_mem_limit" = "2147483648"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.22.10:9092,192.168.22.11:9092,192.168.22.12:9092",
"kafka_topic" = "tcec-business-equip-charge-status",
"property.kafka_default_offsets" = "OFFSET_END",
"property.group.id" = "dwd_equip_charge_status_task_6436ea2e-de45-4611-af19-efa9f7fc4e12",
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "17768925, 15395929, 15315065"
);
CREATE ROUTINE LOAD dwd_equip_charge_log_task ON dwd_equip_charge_log
WITH APPEND
COLUMNS(operator_id,station_id,equipment_id,connector_id,start_charge_seq,start_charge_seq_stat,connector_status,current_a,current_b,current_c,voltage_a,voltage_b,voltage_c,soc,start_time,end_time,total_power,elec_money,service_money,total_money,sum_period,detail_start_time,detail_end_time,elec_power,service_price,detail_power,detail_elec_money,detail_service_money,power=ifnull(((((((ifnull(current_a
, 0) * ifnull(voltage_a
, 0))) + ((ifnull(current_b
, 0) * ifnull(voltage_b
, 0))))) + ((ifnull(current_c
, 0) * ifnull(voltage_c
, 0))))), 0))
PROPERTIES
(
"desired_concurrent_number" = "256",
"max_error_number" = "0",
"max_filter_ratio" = "1.0",
"max_batch_interval" = "10",
"max_batch_rows" = "20000000",
"max_batch_size" = "1073741824",
"format" = "json",
"jsonpaths" = "["$.operatorId","$.stationId","$.equipmentId","$.EquipmentChargeStatus.ConnectorID","$.EquipmentChargeStatus.StartChargeSeq","$.EquipmentChargeStatus.StartChargeSeqStat","$.EquipmentChargeStatus.ConnectorStatus","$.EquipmentChargeStatus.CurrentA","$.EquipmentChargeStatus.CurrentB","$.EquipmentChargeStatus.CurrentC","$.EquipmentChargeStatus.VoltageA","$.EquipmentChargeStatus.VoltageB","$.EquipmentChargeStatus.VoltageC","$.EquipmentChargeStatus.Soc","$.EquipmentChargeStatus.StartTime","$.EquipmentChargeStatus.EndTime","$.EquipmentChargeStatus.TotalPower","$.EquipmentChargeStatus.ElecMoney","$.EquipmentChargeStatus.ServiceMoney","$.EquipmentChargeStatus.TotalMoney","$.EquipmentChargeStatus.SumPeriod"]",
"strip_outer_array" = "false",
"num_as_string" = "false",
"fuzzy_parse" = "false",
"strict_mode" = "false",
"timezone" = "Asia/Shanghai",
"exec_mem_limit" = "2147483648"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.22.10:9092,192.168.22.11:9092,192.168.22.12:9092",
"kafka_topic" = "tcec-business-equip-charge-status",
"property.kafka_default_offsets" = "OFFSET_END",
"property.group.id" = "dwd_equip_charge_log_task_34725864-4c14-4ff4-919d-056bd2582553",
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "17768942, 15391032, 15311819"
);
但是偶尔会出现同一事件时间的数据,写入到dwd_equip_charge_log中的时间会慢于 dwd_equip_charge_status 的时间,这种情况,有一定的偶尔性,并不是每次都会出现,想请教一下如何解决?以及是啥原因导致的。