Kafka Routine load 中报错 Socket is closed by peer

Viewed 15

routine load 的语句为:
CREATE ROUTINE LOAD equip_charge_log_task ON equip_charge_log
WITH APPEND
COLUMNS(operator_id,station_id,equipment_id,connector_id,start_charge_seq,start_charge_seq_stat,connector_status,current_a,current_b,current_c,voltage_a,voltage_b,voltage_c,soc,start_time,end_time,total_power,elec_money,service_money,total_money,sum_period,detail_start_time,detail_end_time,elec_power,service_price,detail_power,detail_elec_money,detail_service_money,power=ifnull((((ifnull(current_a, 0) * ifnull(voltage_a, 0)) + (ifnull(current_b, 0) * ifnull(voltage_b, 0))) + (ifnull(current_c, 0) * ifnull(voltage_c, 0))), 0))
PROPERTIES
(
"desired_concurrent_number" = "256",
"max_error_number" = "0",
"max_filter_ratio" = "1.0",
"max_batch_interval" = "10",
"max_batch_rows" = "20000000",
"max_batch_size" = "1073741824",
"format" = "json",
"jsonpaths" = "[

                "$.operatorId",

                "$.stationId",

                "$.equipmentId",

                "$.EquipmentChargeStatus.ConnectorID",

                "$.EquipmentChargeStatus.StartChargeSeq",

                "$.EquipmentChargeStatus.StartChargeSeqStat",

                "$.EquipmentChargeStatus.ConnectorStatus",

                "$.EquipmentChargeStatus.CurrentA",

                "$.EquipmentChargeStatus.CurrentB",

                "$.EquipmentChargeStatus.CurrentC",

                "$.EquipmentChargeStatus.VoltageA",

                "$.EquipmentChargeStatus.VoltageB",

                "$.EquipmentChargeStatus.VoltageC",

                "$.EquipmentChargeStatus.Soc",

                "$.EquipmentChargeStatus.StartTime",

                "$.EquipmentChargeStatus.EndTime",

                "$.EquipmentChargeStatus.TotalPower",

                "$.EquipmentChargeStatus.ElecMoney",

                "$.EquipmentChargeStatus.ServiceMoney",

                "$.EquipmentChargeStatus.TotalMoney",

                "$.EquipmentChargeStatus.SumPeriod"

                ]",

"strip_outer_array" = "false",
"num_as_string" = "false",
"fuzzy_parse" = "false",
"strict_mode" = "false",
"timezone" = "Asia/Shanghai",
"exec_mem_limit" = "2147483648"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.22.10:9092,192.168.22.11:9092,192.168.22.12:9092",
"kafka_topic" = "tcec-business-equip-charge-status",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "equip_charge_log_task_33f22893-2e76-42f9-b3f7-72895f292c6f",
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "9866760, 7663526, 8406198"
);

建表语句为:
CREATE TABLE equip_charge_log (
operator_id varchar(200) NOT NULL COMMENT '运营商编码',
connector_id varchar(200) NOT NULL COMMENT '充电设备接口编码',
station_id varchar(32) NULL COMMENT '充电站编码',
equipment_id varchar(32) NULL COMMENT '充电设备编码',
start_charge_seq varchar(200) NOT NULL COMMENT '充电订单号',
start_charge_seq_stat smallint NOT NULL COMMENT '充电订单状态| 1,启动中 2,充电中 3,停止中 4,已结束 5,未知',
connector_status smallint NOT NULL COMMENT '充电设备接口状态| 1,空闲 2,占用(未充电) 3,占用(充电中) 4,占用(预约锁定) 255,故障',
current_a decimal(10,2) NOT NULL COMMENT '电流A',
current_b decimal(10,2) NULL COMMENT '电流B',
current_c decimal(10,2) NULL COMMENT '电流C',
voltage_a decimal(10,2) NOT NULL COMMENT '电压A',
voltage_b decimal(10,2) NULL COMMENT '电压B',
voltage_c decimal(10,2) NULL COMMENT '电压C',
soc decimal(10,2) NOT NULL COMMENT 'SOC电池剩余电量',
start_time datetime NOT NULL COMMENT '充电开始时间',
end_time datetime NOT NULL COMMENT '充电结束时间',
total_power decimal(10,2) NOT NULL COMMENT '总充电量',
elec_money decimal(10,2) NULL COMMENT '充电金额',
service_money decimal(10,2) NULL COMMENT '服务费',
total_money decimal(10,2) NULL COMMENT '总金额',
sum_period int NULL COMMENT '总充电时长',
detail_start_time datetime NULL COMMENT '充电明细开始时间',
detail_end_time datetime NULL COMMENT '充电明细结束时间',
elec_power decimal(10,2) NULL COMMENT '充电明细电量',
service_price decimal(10,2) NULL COMMENT '服务费单价',
detail_power decimal(10,2) NULL COMMENT '充电明细电量',
detail_elec_money decimal(10,2) NULL COMMENT '充电明细金额',
detail_service_money decimal(10,2) NULL COMMENT '充电明细服务费',
power decimal(10,2) NULL COMMENT '功率'
) ENGINE=OLAP
DUPLICATE KEY(operator_id)
COMMENT '充电设备流水表'
PARTITION BY RANGE(start_time)
(PARTITION p202412 VALUES [('2024-12-01 00:00:00'), ('2025-01-01 00:00:00')),
PARTITION p202501 VALUES [('2025-01-01 00:00:00'), ('2025-02-01 00:00:00')),
PARTITION p202502 VALUES [('2025-02-01 00:00:00'), ('2025-03-01 00:00:00')),
PARTITION p202503 VALUES [('2025-03-01 00:00:00'), ('2025-04-01 00:00:00')),
PARTITION p202504 VALUES [('2025-04-01 00:00:00'), ('2025-05-01 00:00:00')),
PARTITION p202505 VALUES [('2025-05-01 00:00:00'), ('2025-06-01 00:00:00')),
PARTITION p202506 VALUES [('2025-06-01 00:00:00'), ('2025-07-01 00:00:00')),
PARTITION p202507 VALUES [('2025-07-01 00:00:00'), ('2025-08-01 00:00:00')),
PARTITION p202508 VALUES [('2025-08-01 00:00:00'), ('2025-09-01 00:00:00')))
DISTRIBUTED BY HASH(operator_id) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 2",
"is_being_synced" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 2",
"dynamic_partition.buckets" = "8",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "-1",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"dynamic_partition.start_day_of_month" = "1",
"storage_medium" = "hdd",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"enable_mow_light_delete" = "false"
);

最近数据无法采集,直接报错为 errCode = 2, detailMessage = failed to send task: Socket is closed by peer.

同样的建表语句下,有的任务还会报错 errCode = 2, detailMessage = failed to send task: java.net.SocketException: Broken pipe (Write failed)

核验了,网络是正常的

1 Answers

这个看着是网络的问题呀,之前 routineload job 是正常跑的吗?