Doris 4.0.1 arrow flight

Viewed 17

没有改动过脚本和表,升级后直接测试执行。
报错:
root@gg-doris-00001:/opt/doris/test/readTest# python3 db_arrow.py "select * from ODS.ods_iot_device_inverter_realtime_inc limit 1"
/usr/local/lib/python3.10/dist-packages/adbc_driver_manager/dbapi.py:329: Warning: Cannot disable autocommit; conn will not be DB-API 2.0 compliant
warnings.warn(
数据库查询出错: INTERNAL: [FlightSQL] get flight info statement failed, after executeQueryStatement handleQuery, java.lang.RuntimeException: after executeQueryStatement handleQuery, error code: ERR_UNKNOWN_ERROR, error msg: IllegalArgumentException, msg: null (Internal; ExecuteQuery)
getData()函数执行耗时: 0.6600 秒

import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
import time
import pandas as pd
from pathlib import Path

SETTING = {
"Doris_USER": "root",
"Doris_PASSWORD": "root@123",
"Doris_Dev_In_HOST": "172.20.171.122",
"Doris_Dev_In_PORT": 8070,
}

def getData(sql: str, batch_size: int) -> pd.DataFrame | None:
"""
如果提供了sql和env参数,则使用这些参数执行查询
否则使用默认查询
"""

# 检查SQL是否为空
if not sql or sql.strip() == "":
    print("SQL语句为空,无法执行查询")
    return None
        
df: pd.DataFrame | None = None
try:
    conn = flight_sql.connect(
        uri=f"grpc://{SETTING['Doris_Dev_In_HOST']}:{SETTING['Doris_Dev_In_PORT']}",
        db_kwargs={
            adbc_driver_manager.DatabaseOptions.USERNAME.value: SETTING["Doris_USER"],
            adbc_driver_manager.DatabaseOptions.PASSWORD.value: SETTING["Doris_PASSWORD"]
                        },
    )
    with conn.cursor() as cur:
        cur.execute(f"set batch_size = {batch_size}")
        cur.execute(sql)
        df = cur.fetchallarrow().to_pandas()
    conn.close()
except Exception as e:
    print(f"数据库查询出错: {e}")

return df

def main(query: str, batch_size: int) -> None:
# 测量 getData() 函数的执行时间
start_time = time.time()
df = getData(query, batch_size)
end_time = time.time()

# 计算并打印执行耗时(秒)
execution_time = end_time - start_time
print(f"getData()函数执行耗时: {execution_time:.4f} 秒")
if df is None or df.empty:
    print("未获取到数据")
    return

print(f"获取到 {len(df)} 条数据记录")
data_size_bytes = df.memory_usage(deep=True).sum()
data_size_mb = data_size_bytes / (1024 * 1024)
print(f"数据内存大小: {data_size_bytes} 字节 (~{data_size_mb:.2f} MB)")
print("\n第一行数据:")
print(df.iloc[0])
# 筛选指定条件的记录
# target_date = '2025-03-27 15:45:00'
# target_lat = 34.2500
# target_lon = 115.0000

# filtered_df = df[
#     (df['date_time'].astype(str) == target_date) & 
#     (df['latitude'] == target_lat) & 
#     (df['longitude'] == target_lon)
# ]

# if len(filtered_df) > 0:
#     print(f"\n找到符合条件的记录:")
#     print(filtered_df.iloc[0])
# else:
#     print(f"\n未找到符合条件的记录")

if name == 'main':
import argparse

parser = argparse.ArgumentParser(description="使用 Arrow 查询 Doris 并打印结果概览")
parser.add_argument("sql", nargs="?", help="要执行的 SQL 语句(可选,若提供 --sql-file 则忽略此参数)")
parser.add_argument(
    "-f", "--sql-file",
    dest="sql_file",
    help="从当前目录读取的 SQL 文件名(优先于直接传入的 SQL 字符串)"
)
parser.add_argument(
    "--batch-size",
    type=int,
    default=1024,
    help="设置 Arrow batch_size (默认 512)",
)
args = parser.parse_args()

# 优先使用 --sql-file 指定的文件内容作为 SQL
sql_query: str | None = args.sql
if args.sql_file:
    try:
        file_path = Path(args.sql_file)
        if not file_path.is_absolute():
            file_path = (Path.cwd() / file_path)
        sql_query = file_path.read_text(encoding="utf-8").strip()
    except Exception as e:
        print(f"读取SQL文件出错: {e}")
        raise

if not sql_query:
    print("未提供可执行的 SQL(请传入 SQL 字符串或使用 --sql-file 指定文件)")
    raise SystemExit(1)

main(sql_query, args.batch_size)
0 Answers