1. 基本信息
doirs版本:2.1.9/2.1.10
flink doris connector版本:25.1.0
flink任务写入doris
问题:flink sink doirs报错,在2.1.9与2.1.10均出现该现象
2. 问题描述
Flink Sink Doris数据写入失败,报错
{
"TxnId": 51221,
"Label": "_saas_vehicle_iot_1045_realtime_dwd_security_event_data_0_1_075b3fa9-9ad1-43ea-94b7-a340857c46fe",
"Comment": "",
"TwoPhaseCommit": "false",
"Status": "Fail",
"Message": "[ANALYSIS_ERROR]TStatus: errCode = 2, detailMessage = can not cast from origin type varchar(65533) to target type=invalid_type",
"NumberTotalRows": 0,
"NumberLoadedRows": 0,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 0,
"LoadTimeMs": 0,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 3,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 0,
"ReceiveDataTimeMs": 0,
"CommitAndPublishTimeMs": 0
}
3. 操作过程
-
初始化表SQL
create table if not exists realtime_dwd_security_event_data
(
process_time
datetime not null comment "入库时间",
happen_time
datetime not null comment "happen time",
vin
varchar(128) comment "车辆唯一标识",
kafka_partition
int not null comment "kafka topic partitions",
event_level
int not null default "0" comment "事件级别",
part_type
int comment "风险类型",
risk_type
int comment "风险类型",
risk_subtype
int comment "风险子类型",
kafka_topic
varchar(256) comment "kafka topic",
kafka_offset
bigint comment "kafka topic partitions offset",
kafka_timestamp
bigint comment "kafka timestamp",
vehicle_brand_name
varchar(128) comment "车品牌",
vehicle_series_name
varchar(128) comment "车系",
vehicle_model_name
varchar(128) comment "车型",
vehicle_brand_id
bigint comment "车品牌id",
vehicle_series_id
bigint comment "车系id",
vehicle_model_id
bigint comment "车型id",
metri_tag_pk_id
bigint not null comment "唯一id",
event_id
varchar(64) comment "事件id,唯一值",
process_time_ms
bigint comment "入库时间戳",
source_ip
varchar(128) comment "source_ip",
target_ip
varchar(128) comment "target_ip",
event_name
varchar(128) comment "事件名称",
message
string comment "事件详情",
model_id
bigint comment "碰撞模型id",
tenant_id
bigint comment "租户id",
device_receive_time
bigint comment "设备服务接收时间",
split_id
bigint comment "拆分规则id",
rule_ids
String comment "规则模型碰撞id",
other_field
string comment "数据表转化字段",
ids_type
varchar(128) comment "IDS节点厂商",
ids_name
varchar(128) comment "IDS节点名称",
ids_version
varchar(128) comment "IDS节点版本",
part_id
varchar(128) comment "part_id",
ota_version
varchar(128) comment "ota版本",
software_version
varchar(128) comment "软件版本",
hardware_version
varchar(128) comment "硬件版本",
match_number
int default "0" comment "碰撞次数",
match_resource_id
string comment "统计模型碰撞源id",
window_time
string comment "窗口时间",
match_type
int comment "1-解析规则,2-规则模型,3-统计模型,4-关联模型",
INDEX idx_vehicle_brand(vehicle_brand_name
) USING INVERTED PROPERTIES("parser" = "unicode"),
INDEX idx_vehicle_series(vehicle_series_name
) USING INVERTED PROPERTIES("parser" = "unicode"),
INDEX idx_vehicle_model(vehicle_model_name
) USING INVERTED PROPERTIES("parser" = "unicode")
)
engine=olap
duplicate key(process_time
,happen_time
,vin
,kafka_partition
)
partition by range(process_time
) ()
distributed by hash(kafka_partition
) buckets auto
properties (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "day",
"dynamic_partition.end" = "7",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_num" = "1"
); -
为表新增以下字段,分多次新增
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_text` STRING NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_int` INT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_varchar` VARCHAR(32) NULL COMMENT ''; -- 修正拼写
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_string` STRING NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_varchar` VARCHAR(65533) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_intt` INT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_decimal` DECIMAL(38,10) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq1_varchar` VARCHAR(32) NULL COMMENT ''; -- 修正拼写
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_datetime` DATETIME NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_date` DATE NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_decimal` DECIMAL(38,10) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_bigint` BIGINT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_intt` INT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_smallint` SMALLINT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_tinyint` TINYINT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqtestzuhu` DECIMAL(38,10) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `status` VARCHAR(64) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqtttt` DECIMAL(5,4) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqtinyy` TINYINT NULL COMMENT '';
- 上报数据
{
"event_name": "yutestAA基本",
"event_level": 2,
"part_type": 3,
"source_ip": "115.192.34.121",
"target_ip": "129.227.150.140",
"risk_type": 1,
"ids_name": "testCCU",
"ids_type": "CCU_idstype",
"ids_version": "1.1",
"risk_subtype": 3,
"other_field": "{\"cpu_usage_ERR\":null,\"status_ERR\":\"攻击\",\"yutest_int_ERR\":\"201\"}",
"match_type": 1,
"event_id": "27590c68b8f24dfe9846ca4dac9041fa",
"tenant_id": 1045,
"receive_id": 1432431817547776,
"metri_tag_pk_id": 1931272596202901505,
"device_receive_time": 1749286242901,
"vehicle_series_name": "宝马5系",
"vehicle_brand_id": 1433008500195328,
"vehicle_model_id": 1433008536379392,
"vehicle_series_id": 1433008511131648,
"vin": "YU202500000000041",
"vehicle_brand_name": "宝马",
"first_active_time": "2025-06-07 15:27:41",
"vehicle_model_name": "宝马5系高配车型",
"rule_ids": "[1432431372640256]",
"happen_time": "2025-06-07 16:50:43",
"kafka_topic": "dwd-security-event-data-1045",
"kafka_partition": 1,
"kafka_offset": 18,
"kafka_timestamp": 1749468757650,
"process_time_ms": 1749468758619,
"process_time": "2025-06-09 19:32:38"
}
- 最终表结构
CREATE TABLE `realtime_dwd_security_event_data` (
`process_time` datetime NOT NULL COMMENT "入库时间",
`happen_time` datetime NOT NULL COMMENT "happen time",
`vin` varchar(128) NULL COMMENT "车辆唯一标识",
`kafka_partition` int NOT NULL COMMENT "kafka topic partitions",
`event_level` int NOT NULL DEFAULT "0" COMMENT "事件级别",
`part_type` int NULL COMMENT "风险类型",
`risk_type` int NULL COMMENT "风险类型",
`risk_subtype` int NULL COMMENT "风险子类型",
`kafka_topic` varchar(256) NULL COMMENT "kafka topic",
`kafka_offset` bigint NULL COMMENT "kafka topic partitions offset",
`kafka_timestamp` bigint NULL COMMENT "kafka timestamp",
`vehicle_brand_name` varchar(128) NULL COMMENT "车品牌",
`vehicle_series_name` varchar(128) NULL COMMENT "车系",
`vehicle_model_name` varchar(128) NULL COMMENT "车型",
`vehicle_brand_id` bigint NULL COMMENT "车品牌id",
`vehicle_series_id` bigint NULL COMMENT "车系id",
`vehicle_model_id` bigint NULL COMMENT "车型id",
`metri_tag_pk_id` bigint NOT NULL COMMENT "唯一id",
`event_id` varchar(64) NULL COMMENT "事件id,唯一值",
`process_time_ms` bigint NULL COMMENT "入库时间戳",
`source_ip` varchar(128) NULL COMMENT "source_ip",
`target_ip` varchar(128) NULL COMMENT "target_ip",
`event_name` varchar(128) NULL COMMENT "事件名称",
`message` text NULL COMMENT "事件详情",
`model_id` bigint NULL COMMENT "碰撞模型id",
`tenant_id` bigint NULL COMMENT "租户id",
`device_receive_time` bigint NULL COMMENT "设备服务接收时间",
`split_id` bigint NULL COMMENT "拆分规则id",
`rule_ids` text NULL COMMENT "规则模型碰撞id",
`other_field` text NULL COMMENT "数据表转化字段",
`ids_type` varchar(128) NULL COMMENT "IDS节点厂商",
`ids_name` varchar(128) NULL COMMENT "IDS节点名称",
`ids_version` varchar(128) NULL COMMENT "IDS节点版本",
`part_id` varchar(128) NULL COMMENT "part_id",
`ota_version` varchar(128) NULL COMMENT "ota版本",
`software_version` varchar(128) NULL COMMENT "软件版本",
`hardware_version` varchar(128) NULL COMMENT "硬件版本",
`match_number` int NULL DEFAULT "0" COMMENT "碰撞次数",
`match_resource_id` text NULL COMMENT "统计模型碰撞源id",
`window_time` text NULL COMMENT "窗口时间",
`match_type` int NULL COMMENT "1-解析规则,2-规则模型,3-统计模型,4-关联模型",
`xq_text` text NULL,
`xq_int` int NULL,
`xq_varhcar` varchar(32) NULL,
`xqq_string` text NULL,
`xqq_varchar` varchar(65533) NULL,
`xqq_intt` int NULL,
`xqq_decimal` decimal(38,10) NULL,
`xq1_varhcar` varchar(32) NULL,
`xq_datetime` datetime NULL,
`xq_date` date NULL,
`xq_decimal` decimal(38,10) NULL,
`xq_bigint` bigint NULL,
`xq_intt` int NULL,
`xq_smallint` smallint NULL,
`xq_tinyint` tinyint NULL,
`xqtestzuhu` decimal(38,10) NULL,
`status` varchar(64) NULL,
`xqtttt` decimal(5,4) NULL,
`xqtinyy` tinyint NULL,
`xq_varcharr` varchar(65533) NULL,
`xq_varcharrt` varchar(65533) NULL,
`xq_varcharrr` varchar(65533) NULL,
`xqtestint` int NULL,
INDEX idx_vehicle_brand (`vehicle_brand_name`) USING INVERTED PROPERTIES("parser" = "unicode", "lower_case" = "true", "support_phrase" = "true"),
INDEX idx_vehicle_series (`vehicle_series_name`) USING INVERTED PROPERTIES("parser" = "unicode", "lower_case" = "true", "support_phrase" = "true"),
INDEX idx_vehicle_model (`vehicle_model_name`) USING INVERTED PROPERTIES("parser" = "unicode", "lower_case" = "true", "support_phrase" = "true")
) ENGINE=OLAP
DUPLICATE KEY(`process_time`, `happen_time`, `vin`, `kafka_partition`)
PARTITION BY RANGE(`process_time`)
(PARTITION p20250530 VALUES [('2025-05-30 00:00:00'), ('2025-05-31 00:00:00')),
PARTITION p20250531 VALUES [('2025-05-31 00:00:00'), ('2025-06-01 00:00:00')),
PARTITION p20250601 VALUES [('2025-06-01 00:00:00'), ('2025-06-02 00:00:00')),
PARTITION p20250602 VALUES [('2025-06-02 00:00:00'), ('2025-06-03 00:00:00')),
PARTITION p20250603 VALUES [('2025-06-03 00:00:00'), ('2025-06-04 00:00:00')),
PARTITION p20250604 VALUES [('2025-06-04 00:00:00'), ('2025-06-05 00:00:00')),
PARTITION p20250605 VALUES [('2025-06-05 00:00:00'), ('2025-06-06 00:00:00')),
PARTITION p20250606 VALUES [('2025-06-06 00:00:00'), ('2025-06-07 00:00:00')),
PARTITION p20250607 VALUES [('2025-06-07 00:00:00'), ('2025-06-08 00:00:00')),
PARTITION p20250608 VALUES [('2025-06-08 00:00:00'), ('2025-06-09 00:00:00')),
PARTITION p20250609 VALUES [('2025-06-09 00:00:00'), ('2025-06-10 00:00:00')),
PARTITION p20250610 VALUES [('2025-06-10 00:00:00'), ('2025-06-11 00:00:00')),
PARTITION p20250611 VALUES [('2025-06-11 00:00:00'), ('2025-06-12 00:00:00')),
PARTITION p20250612 VALUES [('2025-06-12 00:00:00'), ('2025-06-13 00:00:00')),
PARTITION p20250613 VALUES [('2025-06-13 00:00:00'), ('2025-06-14 00:00:00')),
PARTITION p20250614 VALUES [('2025-06-14 00:00:00'), ('2025-06-15 00:00:00')),
PARTITION p20250615 VALUES [('2025-06-15 00:00:00'), ('2025-06-16 00:00:00')),
PARTITION p20250616 VALUES [('2025-06-16 00:00:00'), ('2025-06-17 00:00:00')),
PARTITION p20250617 VALUES [('2025-06-17 00:00:00'), ('2025-06-18 00:00:00')))
DISTRIBUTED BY HASH(`kafka_partition`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "day",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "7",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 1",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "-1",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728"
);
4. 分析
- 经过分析,上报的数据value字段类型均与表结构字段类型一致,也不存在超字段的value值
报错:can not cast from origin type varchar(65533) to target type=invalid_type - 在doris 2.1.9与2.1.10均出现该问题
- 删表重新创建后,就写入成功了