doris 2.1.5版本 Thrift 协议读取 datetime类型的字段,值为零值

Viewed 31

doris版本
2.1.5

语言
java jdk 17

thrift
libthrift version 0.16.0

2 Answers

具体如何执行的,读取的什么字段呢?请给出一个完整复现步骤

package com.yichenkeji.doris;
 
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.*;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
 
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
 
public class QueryPlanMain {
    private static String queryPlanUrlFormat = "%s/api/%s/%s/_query_plan";
 
 
    public static void main(String[] args) throws Exception {
        String dorisUrl = "http://127.0.0.1:8030";
        String username = "root";
        String password = "1234564";
        String database = "article";
        String table = "dim_user";
//        String[] columns = {"id","name","merger_name","area_code","level"};
        String[] columns = {"*"};
        String sql ="select "+String.join(",",columns)+" from "+database+"."+table
//                +" where id<= 10 "
//                + " order by id " //不支持order by
//                + " limit 10" //不支持limit
                ;
        JSONObject queryPlanResult = getQueryPlan(dorisUrl,username,password,database,table,sql);
        System.out.println("查询计划执行结果:"+queryPlanResult);
        //根据查询计划结果,查询数据
        if (queryPlanResult != null){
            readData(queryPlanResult,username,password,database,table);
        }
    }
 
    private static JSONObject getQueryPlan(String dorisUrl, String username,String password,String database, String table, String sql) throws Exception {
        try (CloseableHttpClient client = HttpClients.custom().build()) {
 
            String queryPlanUrl = String.format(queryPlanUrlFormat, dorisUrl, database, table);
 
 
            HttpPost post = new HttpPost(queryPlanUrl);
            System.out.println("执行查询计划url: "+queryPlanUrl);
            post.setHeader(HttpHeaders.EXPECT, "100-continue");
            post.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));
 
            // The param is specific SQL, and the query plan is returned
            Map<String, String> params = new HashMap<>();
            params.put("sql", sql);
            System.out.println("执行查询计划参数: "+ params);
            StringEntity entity = new StringEntity(JSON.toJSONString(params));
            post.setEntity(entity);
 
            try (CloseableHttpResponse response = client.execute(post)) {
 
                if (response.getEntity() != null) {
                    //解析查询计划
                    JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
                    System.out.println("执行查询计划response: " + queryPlanJSONObject);
                    JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data");
                    //查询计划异常返回exception信息
                    if (dataJSONObject.containsKey("exception")) {
                        throw new RuntimeException(dataJSONObject.getString("exception"));
                    }
                    return dataJSONObject;
                }
            }
        }
        return null;
    }
 
    private static String basicAuthHeader(String username, String password) {
        final String info = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(info.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    private static void readData(JSONObject data,String username,String password,String database,String table) throws Exception {
 
        String queryPlan = data.getString("opaqued_query_plan");
        JSONObject partitions = data.getJSONObject("partitions");
        long readTotal = 0L;
        long totalTime = 0L;
        for (Map.Entry<String, Object> tablet : partitions.entrySet()) {
            System.out.println("tablet信息:"+ tablet);
            long startTime = System.currentTimeMillis();
            Long tabletId = Long.parseLong(tablet.getKey());
            JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));
            //get first backend
            String routingsBackend = value.getJSONArray("routings").getString(0);
            String backendHost = routingsBackend.split(":")[0];
            String backendPort = routingsBackend.split(":")[1];
 
            //connect backend
            TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
            TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));
            TProtocol protocol = factory.getProtocol(transport);
            TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);
            if (!transport.isOpen()) {
                transport.open();
            }
 
            //build params
            TScanOpenParams params = new TScanOpenParams();
            params.cluster = "default_cluster";
            params.database = database;
            params.table = table;
            params.tablet_ids = Collections.singletonList(tabletId);
            params.opaqued_query_plan = queryPlan;
            // max row number of one read batch
            params.setBatchSize(1000);
            params.setQueryTimeout(3600);
            params.setMemLimit(2147483648L);
            params.setUser(username);
            params.setPasswd(password);
 
            //open scanner
            TScanOpenResult tScanOpenResult = client.openScanner(params);
            if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {
                throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",
                        routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));
            }
 
            TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
            nextBatchParams.setContextId(tScanOpenResult.getContextId());
            boolean eos = false;
            //read data
            int offset = 0;
            long nums = 0L;
            while (!eos) {
                nextBatchParams.setOffset(offset);
                TScanBatchResult next = client.getNext(nextBatchParams);
                if (!TStatusCode.OK.equals(next.getStatus().getStatusCode())) {
                    throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",
                            routingsBackend, next.getStatus().getStatusCode(), next.getStatus().getErrorMsgs()));
                }
                eos = next.isEos();
                if (!eos) {
                    int i = convertArrow(next);
                    offset += i;
                    nums += i;
                }
            }
            readTotal += nums;
            long cost_time =(System.currentTimeMillis() - startTime);
            totalTime +=cost_time;
            System.out.println("tabletId["+tabletId+"]任务结束,总数据量:"+nums+",总花费:"+cost_time+"ms");
            //close
            TScanCloseParams closeParams = new TScanCloseParams();
            closeParams.setContextId(tScanOpenResult.getContextId());
            client.closeScanner(closeParams);
            if (transport.isOpen()) {
                transport.close();
            }
        }
        System.out.println("总任务结束,总数据量:"+readTotal+",总花费:"+totalTime+"ms");
 
    }
 
 
    private static int convertArrow(TScanBatchResult nextResult) throws Exception {
        long startTime = System.currentTimeMillis();
        int offset = 0;
        RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
        ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);
        VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
        while (arrowStreamReader.loadNextBatch()) {
            List<FieldVector> fieldVectors = root.getFieldVectors();
            //total data rows
            int rowCountInOneBatch = root.getRowCount();
 
            //按行获取数据
//            for (int row = 0; row < rowCountInOneBatch; row++) {
//                List<Object> rowData = new ArrayList<>();
//                for (FieldVector fieldVector : fieldVectors) {
//                    Types.MinorType minorType = fieldVector.getMinorType();
//                    Object v = convertValue(row, minorType, fieldVector);
//                    rowData.add(v);
//                }
                System.out.println(rowData);
//            }
            //按列获取数据
            List<Object> rowData = new ArrayList<>();
            for (FieldVector fieldVector : fieldVectors) {
                Types.MinorType minorType = fieldVector.getMinorType();
                List<Object> filedData = new ArrayList<>();
                for (int row = 0; row < rowCountInOneBatch; row++) {
                    Object v = convertValue(row, minorType, fieldVector);
                    filedData.add(v);
 
                }
//                System.out.println(filedData);
            }
            offset += root.getRowCount();
        }
 
        //处理完之后要关闭,否则容易内存溢出
        arrowStreamReader.close();
 
        return offset;
    }
 
    private static Object convertValue(int rowIndex,
                                       Types.MinorType minorType,
                                       FieldVector fieldVector) {
        Object fieldValue;
        switch (minorType) {
            case BIT:
                BitVector bitVector = (BitVector) fieldVector;
                fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
                break;
            case TINYINT:
                TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
                fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
                break;
            case SMALLINT:
                SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
                fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
                break;
            case INT:
                IntVector intVector = (IntVector) fieldVector;
                fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
                break;
            case BIGINT:
                BigIntVector bigIntVector = (BigIntVector) fieldVector;
                fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
                break;
            case FLOAT4:
                Float4Vector float4Vector = (Float4Vector) fieldVector;
                fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
                break;
            case FLOAT8:
                Float8Vector float8Vector = (Float8Vector) fieldVector;
                fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
                break;
            case VARBINARY:
                VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
                fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
                break;
            case DECIMAL:
                DecimalVector decimalVector = (DecimalVector) fieldVector;
                fieldValue = decimalVector.getObject(rowIndex).stripTrailingZeros();
                break;
            case VARCHAR:
                VarCharVector date = (VarCharVector) fieldVector;
                fieldValue = new String(date.get(rowIndex));
                break;
            case LIST:
                ListVector listVector = (ListVector) fieldVector;
                fieldValue = listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex);
                break;
            default:
                fieldValue = fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex);
        }
        return fieldValue;
    }
}

这是测试样例,article.dim_user 表里面有一个字段叫做 event_time 是一个datetime类型。