问题:用spark 远程远程访问,jdbc 方式 正确;Spark Doris Connector方式报错,我把代码贴上,请高人指点:
第一种方式(jdbc),可以正确查询到数据
public static void main( String[] args )
{
//创建sparkconf
SparkConf sparkConf = new SparkConf()
.setAppName("SparkDorisConnectorDemo")
.setMaster("local[*]");
//创建sparksession
SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
String querystr = "(select * from test_events) tmp ";
//读取doris
// Encoder<Example_list_tbl> rowEncoder = Encoders.bean(Example_list_tbl.class);
Dataset<Row> allrowSparkDF =
sparkSession
.read()
.format("jdbc")
.option("url", "jdbc:mysql://******:31930/testdb?rewriteBatchedStatements=true")
.option("fetchsize", "500000")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "PleaseChangeMe")
.option("dbtable",querystr)
.load();
allrowSparkDF.show();
sparkSession.close();
}
第二种方式:Spark Doris Connector 测试没有通过
public static void main(String[] args) {
// 创建sparkconf
SparkConf sparkConf = new SparkConf().setAppName("SparkDorisConnectorDemo").setMaster("local[*]");
SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> dorisSparkDF = sparkSession.read().format("doris")
.option("doris.table.identifier", "testdb.test_events")
.option("doris.fenodes", "******:31830")
.option("user", "root")
.option("password", "PleaseChangeMe").load();
dorisSparkDF.show(5);
sparkSession.close();
}
以下是错误信息
25/08/16 22:49:13 WARN BackendClient: Connect to doris Doris BE{host='doris-cluster-be-0.doris-cluster-be-internal.doris.svc.cluster.local', port=9060} failed.
org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.UnknownHostException: doris-cluster-be-0.doris-cluster-be-internal.doris.svc.cluster.local
at org.apache.doris.shaded.org.apache.thrift.transport.TSocket.open(TSocket.java:255)
at org.apache.doris.spark.backend.BackendClient.open(BackendClient.java:87)
at org.apache.doris.spark.backend.BackendClient.(BackendClient.java:72)
at org.apache.doris.spark.rdd.ScalaValueReader.client$lzycompute(ScalaValueReader.scala:50)
at org.apache.doris.spark.rdd.ScalaValueReader.client(ScalaValueReader.scala:50)
at org.apache.doris.spark.rdd.ScalaValueReader.org$apache$doris$spark$rdd$ScalaValueReader$$lockClient(ScalaValueReader.scala:239)
at org.apache.doris.spark.rdd.ScalaValueReader.(ScalaValueReader.scala:138)
at org.apache.doris.spark.sql.ScalaDorisRowValueReader.(ScalaDorisRowValueReader.scala:32)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader$lzycompute(AbstractDorisRDDIterator.scala:43)
at org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader(AbstractDorisRDDIterator.scala:36)
at org.apache.doris.spark.rdd.AbstractDorisRDDIterator.hasNext(AbstractDorisRDDIterator.scala:56)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: doris-cluster-be-0.doris-cluster-be-internal.doris.svc.cluster.local
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:196)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
at java.net.Socket.connect(Socket.java:606)
at org.apache.doris.shaded.org.apache.thrift.transport.TSocket.open(TSocket.java:250)
... 32 more
--------------
说明一下:doris on k8s 安装(参考https://www.cnblogs.com/kubesphere/p/18597667 运维实战:K8s 上的 Doris 高可用集群最佳实践 )
root@k8s-master:/data/plugins/doris# vi 05-doris-cluster-fe-conf.yaml
# store metadata, must be created before start FE.
# Default value is ${DORIS_HOME}/doris-meta
# meta_dir = ${DORIS_HOME}/doris-meta
# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
http_port = 8030
rpc_port = 9020
query_port = 9030
edit_log_port = 9010
arrow_flight_sql_port = -1
# Choose one if there are more than one ip except loopback address.
# Note that there should at most one ip match this list.
# If no ip match this rule, will choose one randomly.
# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
# Default value is empty.
# priority_networks = 10.244.0.0/16;192.168.0.0/16
# Advanced configurations
log_roll_size_mb = 1024
# INFO, WARN, ERROR, FATAL
sys_log_level = INFO
# NORMAL, BRIEF, ASYNC
sys_log_mode = ASYNC
# sys_log_roll_num = 10
# sys_log_verbose_modules = org.apache.doris
# audit_log_dir = $LOG_DIR
# audit_log_modules = slow_query, query
# audit_log_roll_num = 10
# meta_delay_toleration_second = 10
# qe_max_connection = 1024
# qe_query_timeout_second = 300
# qe_slow_log_ms = 5000
enable_fqdn_mode = true
initial_root_password = *aa7530f7c48740e92a4c0d2138324611e314d397
root@k8s-master:/data/plugins/doris# vi 06-doris-cluster-be-conf.yaml
# https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
# https://jemalloc.net/jemalloc.3.html
JEMALLOC_PROF_PRFIX=""
# ports for admin, web, heartbeat service
be_port = 9060
webserver_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
arrow_flight_sql_port = -1
# HTTPS configures
enable_https = false
# path of certificate in PEM format.
ssl_certificate_path = "$DORIS_HOME/conf/cert.pem"
# path of private key in PEM format.
ssl_private_key_path = "$DORIS_HOME/conf/key.pem"
# Choose one if there are more than one ip except loopback address.
# Note that there should at most one ip match this list.
# If no ip match this rule, will choose one randomly.
# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
# Default value is empty.
# priority_networks = 10.244.0.0/16;192.168.0.0/16
# data root path, separate by ';'
# You can specify the storage type for each root path, HDD (cold data) or SSD (hot data)
# eg:
# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris
# storage_root_path = /home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD
# /home/disk2/doris,medium:HDD(default)
#
# you also can specify the properties by setting '<property>:<value>', separate by ','
# property 'medium' has a higher priority than the extension of path
#
# Default value is ${DORIS_HOME}/storage, you should create it by hand.
# storage_root_path = ${DORIS_HOME}/storage
# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
# Advanced configurations
# INFO, WARNING, ERROR, FATAL
sys_log_level = INFO
# sys_log_roll_mode = SIZE-MB-1024
# sys_log_roll_num = 10
# sys_log_verbose_modules = *
# log_buffer_level = -1
# aws sdk log level
# Off = 0,
# Fatal = 1,
# Error = 2,
# Warn = 3,
# Info = 4,
# Debug = 5,
# Trace = 6
# Default to turn off aws sdk log, because aws sdk errors that need to be cared will be output through Doris logs
aws_log_level=0
## If you are not running in aws cloud, you can disable EC2 metadata
AWS_EC2_METADATA_DISABLED=true
-----------------------------
root@k8s-master:/data/plugins/doris# vi 08-doris-cluster-fe.yaml
secretName: doris-cluster-secret
defaultMode: 420
- name: doris-cluster-fe-conf
configMap:
name: doris-cluster-fe-conf
defaultMode: 420
# nodeName: node1
containers:
- name: doris-cluster-fe
# image: 'docker.io/apache/doris:fe-3.0.3'
# image: 'docker.io/apache/doris:fe-3.0.6.1'
image: 'docker.io/selectdb/doris.fe-ubuntu:3.0.3'
imagePullPolicy: IfNotPresent
command:
- /opt/apache-doris/fe_entrypoint.sh
args:
- $(ENV_FE_ADDR)
ports:
- name: http-port
containerPort: 8030
protocol: TCP
- name: edit-log-port
containerPort: 9010
protocol: TCP
- name: rpc-port
containerPort: 9020
protocol: TCP
- name: query-port
containerPort: 9030
protocol: TCP
env:
- name: TZ
value: Asia/Shanghai
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: POD_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: CONFIGMAP_MOUNT_PATH
value: /etc/doris
- name: USER
value: root
- name: DORIS_ROOT
value: /opt/apache-doris
- name: ENV_FE_ADDR
value: doris-cluster-fe-service
- name: FE_QUERY_PORT
value: '9030'
- name: ELECT_NUMBER
value: '1'
resources:
limits:
cpu: '1'
memory: 4Gi
requests:
cpu: '1'
memory: 1Gi
volumeMounts:
- name: podinfo
mountPath: /etc/podinfo
- name: log
mountPath: /opt/apache-doris/fe/log
- name: meta
mountPath: /opt/apache-doris/fe/doris-meta
- name: doris-cluster-fe-conf
mountPath: /etc/doris
- name: basic-auth
mountPath: /etc/basic_auth
livenessProbe:
tcpSocket:
port: 9030
initialDelaySeconds: 80
timeoutSeconds: 180
periodSeconds: 5
successThreshold: 1
failureThreshold: 3
readinessProbe:
httpGet:
path: /api/health
port: 8030
scheme: HTTP
timeoutSeconds: 1
periodSeconds: 5
successThreshold: 1
failureThreshold: 3
startupProbe:
tcpSocket:
port: 9030
timeoutSeconds: 1
periodSeconds: 5
successThreshold: 1
failureThreshold: 60
lifecycle:
preStop:
exec:
command:
- /opt/apache-doris/fe_prestop.sh
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app.kubernetes.io/component
operator: In
values:
- doris-cluster-fe
topologyKey: kubernetes.io/hostname
schedulerName: default-scheduler
volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: meta
spec:
accessModes:
- ReadWriteOnce
storageClassName: doris-storageclass-meta
resources:
requests:
storage: 5G
volumeMode: Filesystem
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: log
spec:
accessModes:
- ReadWriteOnce
storageClassName: doris-storageclass-log
resources:
requests:
storage: 5G
volumeMode: Filesystem
root@k8s-master:/data/plugins/doris# vi 09-doris-cluster-be.yaml
- name: basic-auth
secret:
secretName: doris-cluster-secret
defaultMode: 420
- name: doris-cluster-be-conf
configMap:
name: doris-cluster-be-conf
defaultMode: 420
initContainers:
- name: default-init
# image: 'docker.io/alpine:latest'
image: 'docker.io/selectdb/alpine:latest'
imagePullPolicy: IfNotPresent
command:
- /bin/sh
args:
- '-c'
- sysctl -w vm.max_map_count=2000000 && swapoff -a
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
securityContext:
privileged: true
#nodeName: node2
containers:
- name: be
# image: 'docker.io/apache/doris:be-3.0.3'
# image: 'docker.io/apache/doris:be-3.0.6.1'
image: 'docker.io/selectdb/doris.be-ubuntu:3.0.3'
imagePullPolicy: IfNotPresent
command:
- /opt/apache-doris/be_entrypoint.sh
args:
- $(ENV_FE_ADDR)
ports:
- name: webserver-port
containerPort: 8040
protocol: TCP
- name: brpc-port
containerPort: 8060
protocol: TCP
- name: heartbeat-port
containerPort: 9050
protocol: TCP
- name: be-port
containerPort: 9060
protocol: TCP
env:
- name: TZ
value: Asia/Shanghai
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: POD_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: CONFIGMAP_MOUNT_PATH
value: /etc/doris
- name: USER
value: root
- name: DORIS_ROOT
value: /opt/apache-doris
- name: ENV_FE_ADDR
value: doris-cluster-fe-service
- name: FE_QUERY_PORT
value: '9030'
resources:
limits:
cpu: '2'
memory: 4Gi
requests:
cpu: '1'
memory: 1Gi
volumeMounts:
- name: podinfo
mountPath: /etc/podinfo
- name: be-storage
mountPath: /opt/apache-doris/be/storage
- name: be-log
mountPath: /opt/apache-doris/be/log
- name: doris-cluster-be-conf
mountPath: /etc/doris
- name: basic-auth
mountPath: /etc/basic_auth
livenessProbe:
tcpSocket:
port: 9050
initialDelaySeconds: 80
timeoutSeconds: 180
periodSeconds: 5
successThreshold: 1
failureThreshold: 3
readinessProbe:
httpGet:
path: /api/health
port: 8040
scheme: HTTP
timeoutSeconds: 1
periodSeconds: 5
successThreshold: 1
failureThreshold: 3
startupProbe:
tcpSocket:
port: 9050
timeoutSeconds: 1
periodSeconds: 5
successThreshold: 1
failureThreshold: 60
lifecycle:
preStop:
exec:
command:
- /opt/apache-doris/be_prestop.sh
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app.kubernetes.io/component
operator: In
values:
- doris-cluster-be
topologyKey: kubernetes.io/hostname
schedulerName: default-scheduler
volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: be-storage
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20G
storageClassName: doris-storageclass-meta
volumeMode: Filesystem
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: be-log
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5G
storageClassName: doris-storageclass-meta
volumeMode: Filesystem