doris版本 2.0.4
streapark版本 2.0.0
kafka版本 kafka_2.12-2.4.1

flin报错详情
2024-03-19 09:33:02
java.lang.NullPointerException
at org.apache.doris.flink.rest.SchemaUtils.lambda$convertToSchema$0(SchemaUtils.java:36)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at org.apache.doris.flink.rest.SchemaUtils.convertToSchema(SchemaUtils.java:36)
at org.apache.doris.flink.datastream.ScalaValueReader.(ScalaValueReader.scala:127)
at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:96)
at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:45)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
sql信息
set execution.runtime-mode = batch;
CREATE TABLE ods_lcyl_lis_lis_test_reg (
BARCODESTRING,
EXEC_SQNSTRING,
HIS_ITEMCODESTRING,
ACCEPTLISTPRINTFLAGSTRING,
ACUTESTRING,
ADJUSTSAMPLETESTERIDSTRING,
ADJUSTSAMPLETESTERNAMESTRING,
BEDNOSTRING,
BIRTHDAYBIGINT,
CLINICPRINTHOSTIPSTRING,
CONFIRMDOCIDSTRING,
CONFIRMDOCIPSTRING,
CONFIRMDOCNAMESTRING,
CONFIRMDOCTIMEBIGINT,
DIAGNOSESTRING,
DIAGNOSE_CYCLESTRING,
DOCTORDEPTIDSTRING,
DOCTORDEPTNAMESTRING,
DOCTORIDSTRING,
DOCTORNAMESTRING,
ERRHEALTHSTRING,
EXPDATESTRING,
FACTORYSTRING,
FEESEQIDSTRING,
FETUSSTRING,
FIRSTTRIALDATEBIGINT,
GERMINSPECTIONTYPESTRING,
GESTATIONSTRING,
GETSAMPLEDATEBIGINT,
GETSAMPLETESTERIDSTRING,
GETSAMPLETESTERNAMESTRING,
GROUPCODESTRING,
HEALTHDATEBIGINT,
HIS_ITEMNAMESTRING,
HOSPITALIDSTRING,
IDENNOSTRING,
INPATIENTNOSTRING,
ISDISTINGUISHSTRING ,
ISGREENSTRING,
ISHEALTHSTRING ,
ISPDFSTRING ,
ISWXHEALTHSTRING,
ISYGHEALTHSTRING,
ISYNPTSTRING ,
ITEM_NUMDOUBLE ,
LASTSAMPLEACCEPTERSTRING,
LASTSAMPLEACCEPTERIDSTRING,
LASTSAMPLEREACHTIMEBIGINT,
LMPBIGINT,
LSPTESTFORMSTRING,
MACHINECODESTRING,
MACHINE_MEMOSTRING,
MEMOSTRING,
NURSEIDSTRING,
NURSENAMESTRING,
NURSE_CELL_CODESTRING,
PATIENTAGESTRING,
PATIENTNAMESTRING,
PATIENTNAME1STRING,
PATIENTNOSTRING,
PATIENTSEXSTRING,
PATIENTTYPESTRING,
PDFNAMESTRING,
PEOPLETYPECODESTRING,
PEOPLETYPENAMESTRING,
PRICEDOUBLE ,
PRINTGERMRECODEFLAGSTRING,
PRINTTIMEBIGINT,
READFLAGSTRING ,
REPRINTFLAGSTRING,
RESULTDATEBIGINT,
SAMPLEACCEPTERSTRING,
SAMPLEACCEPTERIDSTRING,
SAMPLEACKERSTRING,
SAMPLEACKTIMEBIGINT,
SAMPLEPOSITIONSTRING,
SAMPLEREACHTIMEBIGINT,
SAMPLESTATESTRING,
SAMPLETIMEBIGINT,
SAMPLETYPESTRING,
STATESTRING,
TAKETYPESTRING,
TELNOSTRING,
TESTDATEBIGINT,
TESTDESCRIBESTRING,
TESTFORMNOSTRING,
TESTFORMNO1STRING,
TESTTYPESTRING,
TUBECOLORSTRING,
UPDATESAMPLETIMESTRING,
WARDIDSTRING,
WARDNAMESTRING,
WBBARCODESTRING,
WEIGTHSTRING,
YNPTDATEBIGINT,
ZJLXSTRING,
__source_ts_msBIGINT,
__opSTRING,
__tableSTRING,
__dbSTRING,
__deletedSTRING,
__dtSTRING
) WITH (
'connector' = 'doris',
'fenodes' = '${doris.cluster.http}',
'table.identifier' = 'dw_ddbAEqIiCdH.ods_lcyl_all_1lis_test_reg',
'username' = 'root',
'password' = '000000',
'doris.request.tablet.size' = '1'
);
CREATE TABLE dwd_lcyl_lis_test_reg ( test_item_code STRING, electronicrequestnoteid STRING, bgdh STRING, machinecode STRING, groupcode STRING, testdate BIGINT, doctorid STRING, doctorname STRING, doctordeptid STRING, doctordeptname STRING, patientname1 STRING, patientsex STRING, patienttype STRING, test_method_code STRING, acute STRING, resultdate BIGINT, accept_specimen_dt BIGINT, test_technician_code STRING, getsampletesterid STRING, adjustsampletesterid STRING, state STRING, patientno STRING, test_item_name STRING, testformno STRING, patientage STRING, wardname STRING, bedno STRING, diagnose STRING, testformno1 STRING, item_num INT, price INT, testtype STRING, machine_memo STRING, testdescribe STRING, printtime BIGINT, sampleaccepter STRING, sampleaccepterid STRING, sampleacktime BIGINT, sampleacker STRING, hospitalid STRING, idenno STRING, telno STRING, factory STRING, sampleposition STRING, lastsamplereachtime BIGINT, lastsampleaccepter STRING, lastsampleaccepterid STRING, nurse_cell_code STRING, inpatientno STRING, clinicprinthostip STRING, firsttrialdate BIGINT, readflag STRING, updatesampletime STRING, reprintflag STRING, lsptestform STRING, confirmdocid STRING, confirmdocname STRING, confirmdoctime BIGINT, confirmdocip STRING, feeseqid STRING, birthday INT, weigth STRING, gestation STRING, fetus STRING, expdate STRING, lmp BIGINT, printgermrecodeflag STRING, isdistinguish STRING, ishealth STRING, wbbarcode STRING, taketype STRING, peopletypename STRING, peopletypecode STRING, iswxhealth STRING, errhealth STRING, pdfname STRING, isynpt STRING, ynptdate BIGINT, zjlx STRING, isyghealth STRING, patientname STRING, healthdate BIGINT, ispdf STRING, germinspectiontype STRING, reportaudit STRING, bbcyrqsj TIMESTAMP, jsbbrqsj TIMESTAMP, test_technician STRING, getsampletestername STRING, inspection_report_remarks STRING, bbzt STRING, _transform_dt BIGINT ) with ( 'connector' = 'doris', 'fenodes' = '${doris.cluster.http}', 'table.identifier' = 'dw_ddbAEqIiCdH.dwd_lcyl_lis_test_reg', 'username' = 'root', 'password' = '000000', 'sink.batch.size' = '10000',
'sink.max-retries' = '2',
'sink.batch.interval' = '20000',
'sink.properties.strip_outer_array' = 'true',
'sink.properties.format' = 'json',
'sink.properties.columns' = '');
CREATE TABLE dwd_lcyl_lis_test_reg_topic ( PRIMARY KEY (test_item_code,bgdh,electronicrequestnoteid) NOT ENFORCED, test_item_code STRING, electronicrequestnoteid STRING, bgdh STRING, machinecode STRING, groupcode STRING, testdate BIGINT, doctorid STRING, doctorname STRING, doctordeptid STRING, doctordeptname STRING, patientname1 STRING, patientsex STRING, patienttype STRING, test_method_code STRING, acute STRING, resultdate BIGINT, accept_specimen_dt BIGINT, test_technician_code STRING, getsampletesterid STRING, adjustsampletesterid STRING, state STRING, patientno STRING, test_item_name STRING, testformno STRING, patientage STRING, wardname STRING, bedno STRING, diagnose STRING, testformno1 STRING, item_num INT, price INT, testtype STRING, machine_memo STRING, testdescribe STRING, printtime BIGINT, sampleaccepter STRING, sampleaccepterid STRING, sampleacktime BIGINT, sampleacker STRING, hospitalid STRING, idenno STRING, telno STRING, factory STRING, sampleposition STRING, lastsamplereachtime BIGINT, lastsampleaccepter STRING, lastsampleaccepterid STRING, nurse_cell_code STRING, inpatientno STRING, clinicprinthostip STRING, firsttrialdate BIGINT, readflag STRING, updatesampletime STRING, reprintflag STRING, lsptestform STRING, confirmdocid STRING, confirmdocname STRING, confirmdoctime BIGINT, confirmdocip STRING, feeseqid STRING, birthday INT, weigth STRING, gestation STRING, fetus STRING, expdate STRING, lmp BIGINT, printgermrecodeflag STRING, isdistinguish STRING, ishealth STRING, wbbarcode STRING, taketype STRING, peopletypename STRING, peopletypecode STRING, iswxhealth STRING, errhealth STRING, pdfname STRING, isynpt STRING, ynptdate BIGINT, zjlx STRING, isyghealth STRING, patientname STRING, healthdate BIGINT, ispdf STRING, germinspectiontype STRING, reportaudit STRING, bbcyrqsj TIMESTAMP, jsbbrqsj TIMESTAMP, test_technician STRING, getsampletestername STRING, inspection_report_remarks STRING, bbzt STRING, _transform_dt BIGINT ) with ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = '${kafka.cluster}', 'topic' = 'datadev.dw_ddbAEqIiCdH.dwd_lcyl_lis_test_reg', 'key.format' = 'json', 'value.format' = 'json');
insert into dwd_lcyl_lis_test_reg select CAST(HIS_ITEMCODE as STRING), CAST(EXEC_SQN as STRING), CAST(BARCODE as STRING), CAST(MACHINECODE as STRING), CAST(GROUPCODE as STRING), TESTDATE, CAST(DOCTORID as STRING), CAST(DOCTORNAME as STRING), CAST(DOCTORDEPTID as STRING), CAST(DOCTORDEPTNAME as STRING), CAST(PATIENTNAME1 as STRING), CAST(PATIENTSEX as STRING), CAST(PATIENTTYPE as STRING), CAST(SAMPLETYPE as STRING), CAST(ACUTE as STRING), RESULTDATE, GETSAMPLEDATE, CAST(NURSEID as STRING), CAST(GETSAMPLETESTERID as STRING), CAST(ADJUSTSAMPLETESTERID as STRING), CAST(STATE as STRING), CAST(PATIENTNO as STRING), CAST(HIS_ITEMNAME as STRING), CAST(TESTFORMNO as STRING), CAST(PATIENTAGE as STRING), CAST(WARDNAME as STRING), CAST(BEDNO as STRING), CAST(DIAGNOSE as STRING), CAST(TESTFORMNO1 as STRING), CAST(ITEM_NUM as INT), CAST(PRICE as INT), CAST(TESTTYPE as STRING), CAST(MACHINE_MEMO as STRING), CAST(TESTDESCRIBE as STRING), PRINTTIME, CAST(SAMPLEACCEPTER as STRING), CAST(SAMPLEACCEPTERID as STRING), SAMPLEACKTIME, CAST(SAMPLEACKER as STRING), CAST(HOSPITALID as STRING), CAST(IDENNO as STRING), CAST(TELNO as STRING), CAST(FACTORY as STRING), CAST(SAMPLEPOSITION as STRING), LASTSAMPLEREACHTIME, CAST(LASTSAMPLEACCEPTER as STRING), CAST(LASTSAMPLEACCEPTERID as STRING), CAST(NURSE_CELL_CODE as STRING), CAST(INPATIENTNO as STRING), CAST(CLINICPRINTHOSTIP as STRING), FIRSTTRIALDATE, CAST(READFLAG as STRING), CAST(UPDATESAMPLETIME as STRING), CAST(REPRINTFLAG as STRING), CAST(LSPTESTFORM as STRING), CAST(CONFIRMDOCID as STRING), CAST(CONFIRMDOCNAME as STRING), CONFIRMDOCTIME, CAST(CONFIRMDOCIP as STRING), CAST(FEESEQID as STRING), CAST(BIRTHDAY as INT), CAST(WEIGTH as STRING), CAST(GESTATION as STRING), CAST(FETUS as STRING), CAST(EXPDATE as STRING), LMP, CAST(PRINTGERMRECODEFLAG as STRING), CAST(ISDISTINGUISH as STRING), CAST(ISHEALTH as STRING), CAST(WBBARCODE as STRING), CAST(TAKETYPE as STRING), CAST(PEOPLETYPENAME as STRING), CAST(PEOPLETYPECODE as STRING), CAST(ISWXHEALTH as STRING), CAST(ERRHEALTH as STRING), CAST(PDFNAME as STRING), CAST(ISYNPT as STRING), YNPTDATE, CAST(ZJLX as STRING), CAST(ISYGHEALTH as STRING), CAST(PATIENTNAME as STRING), HEALTHDATE, CAST(ISPDF as STRING), CAST(GERMINSPECTIONTYPE as STRING), CAST(ADJUSTSAMPLETESTERNAME as STRING), CAST(FROM_UNIXTIME(SAMPLETIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(FROM_UNIXTIME(SAMPLEREACHTIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(NURSENAME as STRING), CAST(GETSAMPLETESTERNAME as STRING), CAST(MEMO as STRING), CAST(SAMPLESTATE as STRING), CAST (UNIX_TIMESTAMP()*1000 as BIGINT) from ods_lcyl_lis_lis_test_reg where 1=1 ;
insert into dwd_lcyl_lis_test_reg_topic select CAST(HIS_ITEMCODE as STRING), CAST(EXEC_SQN as STRING), CAST(BARCODE as STRING), CAST(MACHINECODE as STRING), CAST(GROUPCODE as STRING), TESTDATE, CAST(DOCTORID as STRING), CAST(DOCTORNAME as STRING), CAST(DOCTORDEPTID as STRING), CAST(DOCTORDEPTNAME as STRING), CAST(PATIENTNAME1 as STRING), CAST(PATIENTSEX as STRING), CAST(PATIENTTYPE as STRING), CAST(SAMPLETYPE as STRING), CAST(ACUTE as STRING), RESULTDATE, GETSAMPLEDATE, CAST(NURSEID as STRING), CAST(GETSAMPLETESTERID as STRING), CAST(ADJUSTSAMPLETESTERID as STRING), CAST(STATE as STRING), CAST(PATIENTNO as STRING), CAST(HIS_ITEMNAME as STRING), CAST(TESTFORMNO as STRING), CAST(PATIENTAGE as STRING), CAST(WARDNAME as STRING), CAST(BEDNO as STRING), CAST(DIAGNOSE as STRING), CAST(TESTFORMNO1 as STRING), CAST(ITEM_NUM as INT), CAST(PRICE as INT), CAST(TESTTYPE as STRING), CAST(MACHINE_MEMO as STRING), CAST(TESTDESCRIBE as STRING), PRINTTIME, CAST(SAMPLEACCEPTER as STRING), CAST(SAMPLEACCEPTERID as STRING), SAMPLEACKTIME, CAST(SAMPLEACKER as STRING), CAST(HOSPITALID as STRING), CAST(IDENNO as STRING), CAST(TELNO as STRING), CAST(FACTORY as STRING), CAST(SAMPLEPOSITION as STRING), LASTSAMPLEREACHTIME, CAST(LASTSAMPLEACCEPTER as STRING), CAST(LASTSAMPLEACCEPTERID as STRING), CAST(NURSE_CELL_CODE as STRING), CAST(INPATIENTNO as STRING), CAST(CLINICPRINTHOSTIP as STRING), FIRSTTRIALDATE, CAST(READFLAG as STRING), CAST(UPDATESAMPLETIME as STRING), CAST(REPRINTFLAG as STRING), CAST(LSPTESTFORM as STRING), CAST(CONFIRMDOCID as STRING), CAST(CONFIRMDOCNAME as STRING), CONFIRMDOCTIME, CAST(CONFIRMDOCIP as STRING), CAST(FEESEQID as STRING), CAST(BIRTHDAY as INT), CAST(WEIGTH as STRING), CAST(GESTATION as STRING), CAST(FETUS as STRING), CAST(EXPDATE as STRING), LMP, CAST(PRINTGERMRECODEFLAG as STRING), CAST(ISDISTINGUISH as STRING), CAST(ISHEALTH as STRING), CAST(WBBARCODE as STRING), CAST(TAKETYPE as STRING), CAST(PEOPLETYPENAME as STRING), CAST(PEOPLETYPECODE as STRING), CAST(ISWXHEALTH as STRING), CAST(ERRHEALTH as STRING), CAST(PDFNAME as STRING), CAST(ISYNPT as STRING), YNPTDATE, CAST(ZJLX as STRING), CAST(ISYGHEALTH as STRING), CAST(PATIENTNAME as STRING), HEALTHDATE, CAST(ISPDF as STRING), CAST(GERMINSPECTIONTYPE as STRING), CAST(ADJUSTSAMPLETESTERNAME as STRING), CAST(FROM_UNIXTIME(SAMPLETIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(FROM_UNIXTIME(SAMPLEREACHTIME/1000,'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP), CAST(NURSENAME as STRING), CAST(GETSAMPLETESTERNAME as STRING), CAST(MEMO as STRING), CAST(SAMPLESTATE as STRING), CAST (UNIX_TIMESTAMP()*1000 as BIGINT) from ods_lcyl_lis_lis_test_reg where 1=1 and (BARCODE IS NOT NULL) and (EXEC_SQN IS NOT NULL) and (HIS_ITEMCODE IS NOT NULL) ;