doris 数据写入失败-can not cast from origin type varchar(65533) to target type=invalid_type

Viewed 3

1. 基本信息

doirs版本:2.1.9/2.1.10
flink doris connector版本:25.1.0
flink任务写入doris
问题:flink sink doirs报错,在2.1.9与2.1.10均出现该现象

2. 问题描述

Flink Sink Doris数据写入失败,报错

{
    "TxnId": 51221,
    "Label": "_saas_vehicle_iot_1045_realtime_dwd_security_event_data_0_1_075b3fa9-9ad1-43ea-94b7-a340857c46fe",
    "Comment": "",
    "TwoPhaseCommit": "false",
    "Status": "Fail",
    "Message": "[ANALYSIS_ERROR]TStatus: errCode = 2, detailMessage = can not cast from origin type varchar(65533) to target type=invalid_type",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 0,
    "LoadTimeMs": 0,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 3,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 0,
    "ReceiveDataTimeMs": 0,
    "CommitAndPublishTimeMs": 0
  }

3. 操作过程

  1. 初始化表SQL
    create table if not exists realtime_dwd_security_event_data
    (
    process_time datetime not null comment "入库时间",
    happen_time datetime not null comment "happen time",
    vin varchar(128) comment "车辆唯一标识",
    kafka_partition int not null comment "kafka topic partitions",
    event_level int not null default "0" comment "事件级别",
    part_type int comment "风险类型",
    risk_type int comment "风险类型",
    risk_subtype int comment "风险子类型",
    kafka_topic varchar(256) comment "kafka topic",
    kafka_offset bigint comment "kafka topic partitions offset",
    kafka_timestamp bigint comment "kafka timestamp",
    vehicle_brand_name varchar(128) comment "车品牌",
    vehicle_series_name varchar(128) comment "车系",
    vehicle_model_name varchar(128) comment "车型",
    vehicle_brand_id bigint comment "车品牌id",
    vehicle_series_id bigint comment "车系id",
    vehicle_model_id bigint comment "车型id",
    metri_tag_pk_id bigint not null comment "唯一id",
    event_id varchar(64) comment "事件id,唯一值",
    process_time_ms bigint comment "入库时间戳",
    source_ip varchar(128) comment "source_ip",
    target_ip varchar(128) comment "target_ip",
    event_name varchar(128) comment "事件名称",
    message string comment "事件详情",
    model_id bigint comment "碰撞模型id",
    tenant_id bigint comment "租户id",
    device_receive_time bigint comment "设备服务接收时间",
    split_id bigint comment "拆分规则id",
    rule_ids String comment "规则模型碰撞id",
    other_field string comment "数据表转化字段",
    ids_type varchar(128) comment "IDS节点厂商",
    ids_name varchar(128) comment "IDS节点名称",
    ids_version varchar(128) comment "IDS节点版本",
    part_id varchar(128) comment "part_id",
    ota_version varchar(128) comment "ota版本",
    software_version varchar(128) comment "软件版本",
    hardware_version varchar(128) comment "硬件版本",
    match_number int default "0" comment "碰撞次数",
    match_resource_id string comment "统计模型碰撞源id",
    window_time string comment "窗口时间",
    match_type int comment "1-解析规则,2-规则模型,3-统计模型,4-关联模型",
    INDEX idx_vehicle_brand(vehicle_brand_name) USING INVERTED PROPERTIES("parser" = "unicode"),
    INDEX idx_vehicle_series(vehicle_series_name) USING INVERTED PROPERTIES("parser" = "unicode"),
    INDEX idx_vehicle_model(vehicle_model_name) USING INVERTED PROPERTIES("parser" = "unicode")
    )
    engine=olap
    duplicate key(process_time,happen_time,vin,kafka_partition)
    partition by range(process_time) ()
    distributed by hash(kafka_partition) buckets auto
    properties (
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "day",
    "dynamic_partition.end" = "7",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.replication_num" = "1"
    );

  2. 为表新增以下字段,分多次新增

ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_text` STRING NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_int` INT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_varchar` VARCHAR(32) NULL COMMENT ''; -- 修正拼写
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_string` STRING NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_varchar` VARCHAR(65533) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_intt` INT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqq_decimal` DECIMAL(38,10) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq1_varchar` VARCHAR(32) NULL COMMENT ''; -- 修正拼写
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_datetime` DATETIME NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_date` DATE NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_decimal` DECIMAL(38,10) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_bigint` BIGINT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_intt` INT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_smallint` SMALLINT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xq_tinyint` TINYINT NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqtestzuhu` DECIMAL(38,10) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `status` VARCHAR(64) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqtttt` DECIMAL(5,4) NULL COMMENT '';
ALTER TABLE realtime_dwd_security_event_data ADD COLUMN `xqtinyy` TINYINT NULL COMMENT '';
  1. 上报数据
{
 "event_name": "yutestAA基本",
 "event_level": 2,
 "part_type": 3,
 "source_ip": "115.192.34.121",
 "target_ip": "129.227.150.140",
 "risk_type": 1,
 "ids_name": "testCCU",
 "ids_type": "CCU_idstype",
 "ids_version": "1.1",
 "risk_subtype": 3,
 "other_field": "{\"cpu_usage_ERR\":null,\"status_ERR\":\"攻击\",\"yutest_int_ERR\":\"201\"}",
 "match_type": 1,
 "event_id": "27590c68b8f24dfe9846ca4dac9041fa",
 "tenant_id": 1045,
 "receive_id": 1432431817547776,
 "metri_tag_pk_id": 1931272596202901505,
 "device_receive_time": 1749286242901,
 "vehicle_series_name": "宝马5系",
 "vehicle_brand_id": 1433008500195328,
 "vehicle_model_id": 1433008536379392,
 "vehicle_series_id": 1433008511131648,
 "vin": "YU202500000000041",
 "vehicle_brand_name": "宝马",
 "first_active_time": "2025-06-07 15:27:41",
 "vehicle_model_name": "宝马5系高配车型",
 "rule_ids": "[1432431372640256]",
 "happen_time": "2025-06-07 16:50:43",
 "kafka_topic": "dwd-security-event-data-1045",
 "kafka_partition": 1,
 "kafka_offset": 18,
 "kafka_timestamp": 1749468757650,
 "process_time_ms": 1749468758619,
 "process_time": "2025-06-09 19:32:38"
}
  1. 最终表结构
CREATE TABLE `realtime_dwd_security_event_data` (
  `process_time` datetime NOT NULL COMMENT "入库时间",
  `happen_time` datetime NOT NULL COMMENT "happen time",
  `vin` varchar(128) NULL COMMENT "车辆唯一标识",
  `kafka_partition` int NOT NULL COMMENT "kafka topic partitions",
  `event_level` int NOT NULL DEFAULT "0" COMMENT "事件级别",
  `part_type` int NULL COMMENT "风险类型",
  `risk_type` int NULL COMMENT "风险类型",
  `risk_subtype` int NULL COMMENT "风险子类型",
  `kafka_topic` varchar(256) NULL COMMENT "kafka topic",
  `kafka_offset` bigint NULL COMMENT "kafka topic partitions offset",
  `kafka_timestamp` bigint NULL COMMENT "kafka timestamp",
  `vehicle_brand_name` varchar(128) NULL COMMENT "车品牌",
  `vehicle_series_name` varchar(128) NULL COMMENT "车系",
  `vehicle_model_name` varchar(128) NULL COMMENT "车型",
  `vehicle_brand_id` bigint NULL COMMENT "车品牌id",
  `vehicle_series_id` bigint NULL COMMENT "车系id",
  `vehicle_model_id` bigint NULL COMMENT "车型id",
  `metri_tag_pk_id` bigint NOT NULL COMMENT "唯一id",
  `event_id` varchar(64) NULL COMMENT "事件id,唯一值",
  `process_time_ms` bigint NULL COMMENT "入库时间戳",
  `source_ip` varchar(128) NULL COMMENT "source_ip",
  `target_ip` varchar(128) NULL COMMENT "target_ip",
  `event_name` varchar(128) NULL COMMENT "事件名称",
  `message` text NULL COMMENT "事件详情",
  `model_id` bigint NULL COMMENT "碰撞模型id",
  `tenant_id` bigint NULL COMMENT "租户id",
  `device_receive_time` bigint NULL COMMENT "设备服务接收时间",
  `split_id` bigint NULL COMMENT "拆分规则id",
  `rule_ids` text NULL COMMENT "规则模型碰撞id",
  `other_field` text NULL COMMENT "数据表转化字段",
  `ids_type` varchar(128) NULL COMMENT "IDS节点厂商",
  `ids_name` varchar(128) NULL COMMENT "IDS节点名称",
  `ids_version` varchar(128) NULL COMMENT "IDS节点版本",
  `part_id` varchar(128) NULL COMMENT "part_id",
  `ota_version` varchar(128) NULL COMMENT "ota版本",
  `software_version` varchar(128) NULL COMMENT "软件版本",
  `hardware_version` varchar(128) NULL COMMENT "硬件版本",
  `match_number` int NULL DEFAULT "0" COMMENT "碰撞次数",
  `match_resource_id` text NULL COMMENT "统计模型碰撞源id",
  `window_time` text NULL COMMENT "窗口时间",
  `match_type` int NULL COMMENT "1-解析规则,2-规则模型,3-统计模型,4-关联模型",
  `xq_text` text NULL,
  `xq_int` int NULL,
  `xq_varhcar` varchar(32) NULL,
  `xqq_string` text NULL,
  `xqq_varchar` varchar(65533) NULL,
  `xqq_intt` int NULL,
  `xqq_decimal` decimal(38,10) NULL,
  `xq1_varhcar` varchar(32) NULL,
  `xq_datetime` datetime NULL,
  `xq_date` date NULL,
  `xq_decimal` decimal(38,10) NULL,
  `xq_bigint` bigint NULL,
  `xq_intt` int NULL,
  `xq_smallint` smallint NULL,
  `xq_tinyint` tinyint NULL,
  `xqtestzuhu` decimal(38,10) NULL,
  `status` varchar(64) NULL,
  `xqtttt` decimal(5,4) NULL,
  `xqtinyy` tinyint NULL,
  `xq_varcharr` varchar(65533) NULL,
  `xq_varcharrt` varchar(65533) NULL,
  `xq_varcharrr` varchar(65533) NULL,
  `xqtestint` int NULL,
  INDEX idx_vehicle_brand (`vehicle_brand_name`) USING INVERTED PROPERTIES("parser" = "unicode", "lower_case" = "true", "support_phrase" = "true"),
  INDEX idx_vehicle_series (`vehicle_series_name`) USING INVERTED PROPERTIES("parser" = "unicode", "lower_case" = "true", "support_phrase" = "true"),
  INDEX idx_vehicle_model (`vehicle_model_name`) USING INVERTED PROPERTIES("parser" = "unicode", "lower_case" = "true", "support_phrase" = "true")
) ENGINE=OLAP
DUPLICATE KEY(`process_time`, `happen_time`, `vin`, `kafka_partition`)
PARTITION BY RANGE(`process_time`)
(PARTITION p20250530 VALUES [('2025-05-30 00:00:00'), ('2025-05-31 00:00:00')),
PARTITION p20250531 VALUES [('2025-05-31 00:00:00'), ('2025-06-01 00:00:00')),
PARTITION p20250601 VALUES [('2025-06-01 00:00:00'), ('2025-06-02 00:00:00')),
PARTITION p20250602 VALUES [('2025-06-02 00:00:00'), ('2025-06-03 00:00:00')),
PARTITION p20250603 VALUES [('2025-06-03 00:00:00'), ('2025-06-04 00:00:00')),
PARTITION p20250604 VALUES [('2025-06-04 00:00:00'), ('2025-06-05 00:00:00')),
PARTITION p20250605 VALUES [('2025-06-05 00:00:00'), ('2025-06-06 00:00:00')),
PARTITION p20250606 VALUES [('2025-06-06 00:00:00'), ('2025-06-07 00:00:00')),
PARTITION p20250607 VALUES [('2025-06-07 00:00:00'), ('2025-06-08 00:00:00')),
PARTITION p20250608 VALUES [('2025-06-08 00:00:00'), ('2025-06-09 00:00:00')),
PARTITION p20250609 VALUES [('2025-06-09 00:00:00'), ('2025-06-10 00:00:00')),
PARTITION p20250610 VALUES [('2025-06-10 00:00:00'), ('2025-06-11 00:00:00')),
PARTITION p20250611 VALUES [('2025-06-11 00:00:00'), ('2025-06-12 00:00:00')),
PARTITION p20250612 VALUES [('2025-06-12 00:00:00'), ('2025-06-13 00:00:00')),
PARTITION p20250613 VALUES [('2025-06-13 00:00:00'), ('2025-06-14 00:00:00')),
PARTITION p20250614 VALUES [('2025-06-14 00:00:00'), ('2025-06-15 00:00:00')),
PARTITION p20250615 VALUES [('2025-06-15 00:00:00'), ('2025-06-16 00:00:00')),
PARTITION p20250616 VALUES [('2025-06-16 00:00:00'), ('2025-06-17 00:00:00')),
PARTITION p20250617 VALUES [('2025-06-17 00:00:00'), ('2025-06-18 00:00:00')))
DISTRIBUTED BY HASH(`kafka_partition`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "day",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "7",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 1",
"dynamic_partition.buckets" = "10",
"dynamic_partition.create_history_partition" = "false",
"dynamic_partition.history_partition_num" = "-1",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728"
);

4. 分析

  1. 经过分析,上报的数据value字段类型均与表结构字段类型一致,也不存在超字段的value值
    报错:can not cast from origin type varchar(65533) to target type=invalid_type
  2. 在doris 2.1.9与2.1.10均出现该问题
  3. 删表重新创建后,就写入成功了
0 Answers