public void insert(List<String> rowData){
long start = System.currentTimeMillis();
final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
DORIS_HOST,
DORIS_HTTP_PORT,
DORIS_DB,
DORIS_TABLE);
CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
try {
httpclient.start();
long startTime=System.currentTimeMillis();
AsyncRequestProducer requestProducer= AsyncRequestBuilder.put(loadUrl)
.addHeader(org.apache.hc.core5.http.HttpHeaders.EXPECT, "100-continue")
.addHeader(org.apache.hc.core5.http.HttpHeaders.AUTHORIZATION, basicAuthHeader(DORIS_USER, DORIS_PASSWORD))
.setEntity(new AsyncEntityProducer() {
private boolean isFirstSend=true;
@Override
public int available() {
return Integer.MAX_VALUE;
}
@Override
public void produce(DataStreamChannel channel) {
try {
if(isFirstSend) {
channel.endStream();
isFirstSend=false;
}
else {
long transTime = System.currentTimeMillis();
for (String oneRow : rowData) {
ByteBuffer buffer = ByteBuffer.wrap(oneRow.getBytes());
while(buffer.hasRemaining()){
channel.write(buffer);
}
}
logger.info("String stream trans spent:{}ms",System.currentTimeMillis()-transTime);
channel.endStream();
}
}catch (Exception e){
logger.error("produce failed",e);
}
}
@Override
public void releaseResources() {
}
@Override
public long getContentLength() {
return 0;
}
@Override
public String getContentType() {
return null;
}
@Override
public String getContentEncoding() {
return null;
}
@Override
public boolean isChunked() {
return true;
}
@Override
public Set<String> getTrailerNames() {
return null;
}
@Override
public boolean isRepeatable() {
return true;
}
@Override
public void failed(Exception cause) {
logger.error("AsyncEntityProducer failed",cause);
}
}).build();
// 创建异步响应消费者
FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(SimpleHttpResponse response) {
logger.info("completed async request spent:{}ms" ,System.currentTimeMillis()-startTime);
}
@Override
public void failed(Exception ex) {
logger.error("async consumer failed ",ex);
}
@Override
public void cancelled() {
logger.info("cancelled");
}
};
// 发送异步请求
Future<SimpleHttpResponse> future = httpclient.execute(requestProducer,
SimpleResponseConsumer.create(),callback);
// 等待请求完成
SimpleHttpResponse httpResponse = future.get();
if (200==httpResponse.getCode()){
logger.info(new String(httpResponse.getBodyBytes()));
logger.info("insert to Doris-tableName:{} count:{}, spent:{}ms",DORIS_TABLE,rowData.size(),System.currentTimeMillis()-start);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
}finally {
try{
httpclient.close();
} catch (IOException e) {
logger.error("connection closing failed",e);
}
}
}
报错在往channel 写buffer时候报错,入库有时会报这个问题,频率不是很高,但是会丢失数据