routine load 的语句为:
CREATE ROUTINE LOAD equip_charge_log_task ON equip_charge_log
WITH APPEND
COLUMNS(operator_id,station_id,equipment_id,connector_id,start_charge_seq,start_charge_seq_stat,connector_status,current_a,current_b,current_c,voltage_a,voltage_b,voltage_c,soc,start_time,end_time,total_power,elec_money,service_money,total_money,sum_period,detail_start_time,detail_end_time,elec_power,service_price,detail_power,detail_elec_money,detail_service_money,power=ifnull((((ifnull(current_a
, 0) * ifnull(voltage_a
, 0)) + (ifnull(current_b
, 0) * ifnull(voltage_b
, 0))) + (ifnull(current_c
, 0) * ifnull(voltage_c
, 0))), 0))
PROPERTIES
(
"desired_concurrent_number" = "256",
"max_error_number" = "0",
"max_filter_ratio" = "1.0",
"max_batch_interval" = "10",
"max_batch_rows" = "20000000",
"max_batch_size" = "1073741824",
"format" = "json",
"jsonpaths" = "[
"$.operatorId",
"$.stationId",
"$.equipmentId",
"$.EquipmentChargeStatus.ConnectorID",
"$.EquipmentChargeStatus.StartChargeSeq",
"$.EquipmentChargeStatus.StartChargeSeqStat",
"$.EquipmentChargeStatus.ConnectorStatus",
"$.EquipmentChargeStatus.CurrentA",
"$.EquipmentChargeStatus.CurrentB",
"$.EquipmentChargeStatus.CurrentC",
"$.EquipmentChargeStatus.VoltageA",
"$.EquipmentChargeStatus.VoltageB",
"$.EquipmentChargeStatus.VoltageC",
"$.EquipmentChargeStatus.Soc",
"$.EquipmentChargeStatus.StartTime",
"$.EquipmentChargeStatus.EndTime",
"$.EquipmentChargeStatus.TotalPower",
"$.EquipmentChargeStatus.ElecMoney",
"$.EquipmentChargeStatus.ServiceMoney",
"$.EquipmentChargeStatus.TotalMoney",
"$.EquipmentChargeStatus.SumPeriod"
]",
"strip_outer_array" = "false",
"num_as_string" = "false",
"fuzzy_parse" = "false",
"strict_mode" = "false",
"timezone" = "Asia/Shanghai",
"exec_mem_limit" = "2147483648"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.22.10:9092,192.168.22.11:9092,192.168.22.12:9092",
"kafka_topic" = "tcec-business-equip-charge-status",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "equip_charge_log_task_33f22893-2e76-42f9-b3f7-72895f292c6f",
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "9866760, 7663526, 8406198"
);
建表语句为:
CREATE TABLE equip_charge_log
(
operator_id
varchar(200) NOT NULL COMMENT '运营商编码',
connector_id
varchar(200) NOT NULL COMMENT '充电设备接口编码',
station_id
varchar(32) NULL COMMENT '充电站编码',
equipment_id
varchar(32) NULL COMMENT '充电设备编码',
start_charge_seq
varchar(200) NOT NULL COMMENT '充电订单号',
start_charge_seq_stat
smallint NOT NULL COMMENT '充电订单状态| 1,启动中 2,充电中 3,停止中 4,已结束 5,未知',
connector_status
smallint NOT NULL COMMENT '充电设备接口状态| 1,空闲 2,占用(未充电) 3,占用(充电中) 4,占用(预约锁定) 255,故障',
current_a
decimal(10,2) NOT NULL COMMENT '电流A',
current_b
decimal(10,2) NULL COMMENT '电流B',
current_c
decimal(10,2) NULL COMMENT '电流C',
voltage_a
decimal(10,2) NOT NULL COMMENT '电压A',
voltage_b
decimal(10,2) NULL COMMENT '电压B',
voltage_c
decimal(10,2) NULL COMMENT '电压C',
soc
decimal(10,2) NOT NULL COMMENT 'SOC电池剩余电量',
start_time
datetime NOT NULL COMMENT '充电开始时间',
end_time
datetime NOT NULL COMMENT '充电结束时间',
total_power
decimal(10,2) NOT NULL COMMENT '总充电量',
elec_money
decimal(10,2) NULL COMMENT '充电金额',
service_money
decimal(10,2) NULL COMMENT '服务费',
total_money
decimal(10,2) NULL COMMENT '总金额',
sum_period
int NULL COMMENT '总充电时长',
detail_start_time
datetime NULL COMMENT '充电明细开始时间',
detail_end_time
datetime NULL COMMENT '充电明细结束时间',
elec_power
decimal(10,2) NULL COMMENT '充电明细电量',
service_price
decimal(10,2) NULL COMMENT '服务费单价',
detail_power
decimal(10,2) NULL COMMENT '充电明细电量',
detail_elec_money
decimal(10,2) NULL COMMENT '充电明细金额',
detail_service_money
decimal(10,2) NULL COMMENT '充电明细服务费',
power
decimal(10,2) NULL COMMENT '功率'
) ENGINE=OLAP
DUPLICATE KEY(operator_id
)
COMMENT '充电设备流水表'
PARTITION BY RANGE(start_time
)
(PARTITION p202412 VALUES [('2024-12-01 00:00:00'), ('2025-01-01 00:00:00')),
PARTITION p202501 VALUES [('2025-01-01 00:00:00'), ('2025-02-01 00:00:00')),
PARTITION p202502 VALUES [('2025-02-01 00:00:00'), ('2025-03-01 00:00:00')),
PARTITION p202503 VALUES [('2025-03-01 00:00:00'), ('2025-04-01 00:00:00')),
PARTITION p202504 VALUES [('2025-04-01 00:00:00'), ('2025-05-01 00:00:00')),
PARTITION p202505 VALUES [('2025-05-01 00:00:00'), ('2025-06-01 00:00:00')),
PARTITION p202506 VALUES [('2025-06-01 00:00:00'), ('2025-07-01 00:00:00')),
PARTITION p202507 VALUES [('2025-07-01 00:00:00'), ('2025-08-01 00:00:00')),
PARTITION p202508 VALUES [('2025-08-01 00:00:00'), ('2025-09-01 00:00:00')))
DISTRIBUTED BY HASH(operator_id
) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 2",
"is_being_synced" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 2",
"dynamic_partition.buckets" = "8",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "-1",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"dynamic_partition.start_day_of_month" = "1",
"storage_medium" = "hdd",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"enable_mow_light_delete" = "false"
);
最近数据无法采集,直接报错为 errCode = 2, detailMessage = failed to send task: Socket is closed by peer.
同样的建表语句下,有的任务还会报错 errCode = 2, detailMessage = failed to send task: java.net.SocketException: Broken pipe (Write failed)
核验了,网络是正常的