记stream_load 发送异步请求时经常报Broken pipe问题

Viewed 19
public void insert(List<String> rowData){
        long start = System.currentTimeMillis();
        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                DORIS_HOST,
                DORIS_HTTP_PORT,
                DORIS_DB,
                DORIS_TABLE);
        CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
        try {
            httpclient.start();
            long startTime=System.currentTimeMillis();

            AsyncRequestProducer requestProducer= AsyncRequestBuilder.put(loadUrl)
                    .addHeader(org.apache.hc.core5.http.HttpHeaders.EXPECT, "100-continue")
                    .addHeader(org.apache.hc.core5.http.HttpHeaders.AUTHORIZATION, basicAuthHeader(DORIS_USER, DORIS_PASSWORD))
                    .setEntity(new AsyncEntityProducer() {
                        private boolean isFirstSend=true;
                        @Override
                        public int available() {
                            return Integer.MAX_VALUE;
                        }

                        @Override
                        public void produce(DataStreamChannel channel)  {
                            try {
                                if(isFirstSend) {
                                    channel.endStream();
                                    isFirstSend=false;
                                }
                                else {
                                    long transTime = System.currentTimeMillis();
                                    for (String oneRow : rowData) {
                                        ByteBuffer buffer =  ByteBuffer.wrap(oneRow.getBytes());
                                        while(buffer.hasRemaining()){
                                            channel.write(buffer);
                                        }
                                    }
                                    logger.info("String stream trans spent:{}ms",System.currentTimeMillis()-transTime);
                                    channel.endStream();
                                }
                            }catch (Exception e){
                                logger.error("produce failed",e);
                            }

                        }

                        @Override
                        public void releaseResources() {
                        }

                        @Override
                        public long getContentLength() {
                            return 0;
                        }

                        @Override
                        public String getContentType() {
                            return null;
                        }

                        @Override
                        public String getContentEncoding() {
                            return null;
                        }

                        @Override
                        public boolean isChunked() {
                            return true;
                        }

                        @Override
                        public Set<String> getTrailerNames() {
                            return null;
                        }

                        @Override
                        public boolean isRepeatable() {
                            return true;
                        }

                        @Override
                        public void failed(Exception cause) {
                            logger.error("AsyncEntityProducer failed",cause);
                        }

                    }).build();



            // 创建异步响应消费者
            FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
                @Override
                public void completed(SimpleHttpResponse response) {
                    logger.info("completed async request spent:{}ms" ,System.currentTimeMillis()-startTime);

                }
                @Override
                public void failed(Exception ex) {
                    logger.error("async consumer failed ",ex);
                }

                @Override
                public void cancelled() {
                    logger.info("cancelled");
                }
            };
            // 发送异步请求
            Future<SimpleHttpResponse> future = httpclient.execute(requestProducer,
                    SimpleResponseConsumer.create(),callback);

            // 等待请求完成
            SimpleHttpResponse httpResponse = future.get();
            if (200==httpResponse.getCode()){
                logger.info(new String(httpResponse.getBodyBytes()));
                logger.info("insert to Doris-tableName:{} count:{}, spent:{}ms",DORIS_TABLE,rowData.size(),System.currentTimeMillis()-start);
            }

        } catch (Exception e) {
            logger.error(e.getMessage(),e);
        }finally {
            try{
                httpclient.close();
            } catch (IOException e) {
                logger.error("connection closing failed",e);
            }

        }

    }

image.png
报错在往channel 写buffer时候报错,入库有时会报这个问题,频率不是很高,但是会丢失数据
image.png

1 Answers

Doris具体什么版本呢,方便私聊(hhj_0530)看看吗?