doris-flink-connector 查询doris下推异常

Viewed 45

flink版本1.16.2;flink-doris-connector-1.16版本24.0.1;doris版本:2.1.6或2.1.7
使用参数doris.filter.query报错如下:
image.png

Caused by: org.apache.doris.flink.exception.DorisException: Parse Doris FE's response to json failed. res: "malformedjson:{\"sql\":\"select`event_time`,`thread_id`,`write_table`,`other_info`,`write_cnts`,`comments`from`ec_data_management`.`hive_data_monitor`wheredate_trunc(`event_time`,\"day\")=\"2024-12-18\"\"}"
	at org.apache.doris.flink.rest.RestService.getQueryPlan(RestService.java:626)
	at org.apache.doris.flink.rest.RestService.findPartitions(RestService.java:597)
	at org.apache.doris.flink.source.DorisSource.createEnumerator(DorisSource.java:99)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)
	... 33 more
Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `org.apache.doris.flink.rest.models.QueryPlan` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('malformedjson:{"sql":"select`event_time`,`thread_id`,`write_table`,`other_info`,`write_cnts`,`comments`from`ec_data_management`.`hive_data_monitor`wheredate_trunc(`event_time`,"day")="2024-12-18""}')
 at [Source: (String)""malformedjson:{\"sql\":\"select`event_time`,`thread_id`,`write_table`,`other_info`,`write_cnts`,`comments`from`ec_data_management`.`hive_data_monitor`wheredate_trunc(`event_time`,\"day\")=\"2024-12-18\"\"}""; line: 1, column: 1]
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1728)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1353)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer._deserializeFromString(StdDeserializer.java:311)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1495)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:196)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:186)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
	at org.apache.doris.flink.rest.RestService.getQueryPlan(RestService.java:622)
	... 36 more
2 Answers

你这种转换不能在doris.filter.query 里面做,这里只能是简单的谓词下推,这种你需要找下flink对应的函数,在flink这块做

那如果是分区过滤,该如何使用:

tenv.executeSql(
                " CREATE TABLE flink_hive_data_monitor  (                         " +
                        " 	event_time     TIMESTAMP(3),                             " +
                        " 	thread_id     STRING,                                    " +
                        " 	write_table    STRING,                                   " +
                        " 	other_info    STRING,                                    " +
                        " 	write_cnts      int,                                     " +
                        "   `comments`       STRING                                  " +
                        "     )                                                      " +
                        "     WITH (                                                 " +
                        "       'connector' = 'doris',                               " +
                        "       'fenodes' = '" + fenodes + "',                       " +
                        "       'table.identifier' = '" + sinkTableName + "',        " +
                        "       'username' = '" + dorisUsername + "',                " +
                        "       'password' = '" + dorisPassword + "',                " +
                        "        'doris.filter.query' = 'partition(`p20241217000000`)',                       "   +
//                        "        'doris.filter.query' = 'write_cnts>1000',                       "   +
                        "   'sink.label-prefix' = 'doris_label" + System.currentTimeMillis() + "'        " +
                        " );                                                         "
        );

报错如下:

	... 13 more
Caused by: org.apache.doris.flink.exception.DorisException: Parse Doris FE's response to json failed. res: {"exception":"errCode=2,detailMessage=Syntaxerrorinline1:\n...`hive_data_monitor`wherepartition(`p20241217000000`)\n^\nEncountered:PARTITION\nExpected:PARTITIONiskeyword,maybe`PARTITION`\n","status":400}
	at org.apache.doris.flink.rest.RestService.getQueryPlan(RestService.java:626)
	at org.apache.doris.flink.rest.RestService.findPartitions(RestService.java:597)
	at org.apache.doris.flink.source.DorisSource.createEnumerator(DorisSource.java:99)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)
	... 33 more
Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "exception" (class org.apache.doris.flink.rest.models.QueryPlan), not marked as ignorable (3 known properties: "partitions", "status", "opaqued_query_plan"])
 at [Source: (String)"{"exception":"errCode=2,detailMessage=Syntaxerrorinline1:\n...`hive_data_monitor`wherepartition(`p20241217000000`)\n^\nEncountered:PARTITION\nExpected:PARTITIONiskeyword,maybe`PARTITION`\n","status":400}"; line: 1, column: 15] (through reference chain: org.apache.doris.flink.rest.models.QueryPlan["exception"])
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1127)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2023)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1700)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1678)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:319)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
	at org.apache.doris.flink.rest.RestService.getQueryPlan(RestService.java:622)
	... 36 more