flink 1.8.1 写入doris,写入65个表,经常报错Connection reset,flink重试后正常写入

Viewed 8

2026-03-19 08:59:56.093 [pool-29-thread-1] INFO DorisSinkFunction - 调用doris接口返回结果:{TxnId=51986622, Label=159c734d-cfe0-434e-98ae-35c775c02810, Comment=, TwoPhaseCommit=false, Status=Success, Message=OK, NumberTotalRows=7, NumberLoadedRows=7, NumberFilteredRows=0, NumberUnselectedRows=0, LoadBytes=12472, LoadTimeMs=2030, BeginTxnTimeMs=0, StreamLoadPutTimeMs=3, ReadDataTimeMs=0, WriteDataTimeMs=867, ReceiveDataTimeMs=5, CommitAndPublishTimeMs=1157}
2026-03-19 08:59:59.688 [pool-30-thread-1] ERROR DorisSinkFunction - Writing records to doris failed.
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_202]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_202]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.doHttpPut(DorisSinkFunction.java:329) [fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.handleList(DorisSinkFunction.java:244) [fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.flush(DorisSinkFunction.java:202) [fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.lambda$open$0(DorisSinkFunction.java:119) [fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_202]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_202]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_202]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_202]
2026-03-19 08:59:59.689 [pool-30-thread-1] ERROR DorisSinkFunction - 异常数据写入中,当前异常数据数量:7
2026-03-19 09:00:00.319 [Sink: DorisSink (5/15)#0] WARN DorisSinkFunction - 关闭任务尝试flush失败
java.lang.RuntimeException: Writing records to doris failed.
at com.flink.connector.function.DorisSinkFunction.checkFlushException(DorisSinkFunction.java:164) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.flush(DorisSinkFunction.java:183) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.close(DorisSinkFunction.java:176) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) [f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115) [f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) [f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) [f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1062) [f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:970) [f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:935) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:935) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_202]
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_202]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_202]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.doHttpPut(DorisSinkFunction.java:329) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.handleList(DorisSinkFunction.java:244) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.flush(DorisSinkFunction.java:202) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.lambda$open$0(DorisSinkFunction.java:119) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_202]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_202]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_202]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_202]
... 1 common frames omitted
2026-03-19 09:00:00.322 [Sink: DorisSink (5/15)#0] WARN Task - Sink: DorisSink (5/15)#0 (8a18297aaa2464895a4e3c0057ffc3fa_e6af7a5afde7b150425a19b307d3e10d_4_0) switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: Writing records to doris failed.
at com.flink.connector.function.DorisSinkFunction.checkFlushException(DorisSinkFunction.java:164) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.invoke(DorisSinkFunction.java:139) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.invoke(DorisSinkFunction.java:38) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[f32cb9a713bf46098ad901b48e354377.jar:na]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) [flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.1.jar:1.18.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.1.jar:1.18.1]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_202]
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_202]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_202]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.doHttpPut(DorisSinkFunction.java:329) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.handleList(DorisSinkFunction.java:244) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.flush(DorisSinkFunction.java:202) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at com.flink.connector.function.DorisSinkFunction.lambda$open$0(DorisSinkFunction.java:119) ~[fb84c2f8ac4141b9b5a796cfed404774@connector-doris.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_202]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_202]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_202]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_202]
... 1 common frames omitted
2026-03-19 09:00:00.323 [Sink: DorisSink (5/15)#0] INFO Task - Freeing task resources for Sink: DorisSink (5/15)#0 (8a18297aaa2464895a4e3c0057ffc3fa_e6af7a5afde7b150425a19b307d3e10d_4_0).
2026-03-19 09:00:00.335 [flink-pekko.actor.default-dispatcher-2344] INFO TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Sink: DorisSink (5/15)#0 8a18297aaa2464895a4e3c0057ffc3fa_e6af7a5afde7b150425a19b307d3e10d_4_0.
2026-03-19 09:00:00.353 [pool-33-thread-1] INFO DorisSinkFunction - 调用doris接口返回结果:{TxnId=51986741, Label=951dd473-cc20-43cb-890c-ae3f4afb22dc, Comment=, TwoPhaseCommit=false, Status=Success, Message=OK, NumberTotalRows=5, NumberLoadedRows=5, NumberFilteredRows=0, NumberUnselectedRows=0, LoadBytes=9024, LoadTimeMs=3637, BeginTxnTimeMs=0, StreamLoadPutTimeMs=3, ReadDataTimeMs=0, WriteDataTimeMs=1232, ReceiveDataTimeMs=4, CommitAndPublishTimeMs=2400}

0 Answers