spark 访问 doris on k8s 总是报错

Viewed 3

问题:用spark 远程远程访问,jdbc 方式 正确;Spark Doris Connector方式报错,我把代码贴上,请高人指点:

第一种方式(jdbc),可以正确查询到数据
public static void main( String[] args )
{
//创建sparkconf
SparkConf sparkConf = new SparkConf()
.setAppName("SparkDorisConnectorDemo")
.setMaster("local[*]");
//创建sparksession
SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

    String querystr = "(select * from test_events) tmp ";
    //读取doris

// Encoder<Example_list_tbl> rowEncoder = Encoders.bean(Example_list_tbl.class);

    Dataset<Row> allrowSparkDF = 
    sparkSession
      .read()
      .format("jdbc")
      .option("url", "jdbc:mysql://******:31930/testdb?rewriteBatchedStatements=true")
      .option("fetchsize", "500000")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "PleaseChangeMe")
      .option("dbtable",querystr)
      .load();
    
    allrowSparkDF.show();
    sparkSession.close();
}

第二种方式:Spark Doris Connector 测试没有通过
	public static void main(String[] args) {
	// 创建sparkconf
	SparkConf sparkConf = new SparkConf().setAppName("SparkDorisConnectorDemo").setMaster("local[*]");
	SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();


	Dataset<Row> dorisSparkDF = sparkSession.read().format("doris")
			.option("doris.table.identifier", "testdb.test_events")
			.option("doris.fenodes", "******:31830")
			.option("user", "root")
			.option("password", "PleaseChangeMe").load();

	dorisSparkDF.show(5);
	sparkSession.close();
}

以下是错误信息
25/08/16 22:49:13 WARN BackendClient: Connect to doris Doris BE{host='doris-cluster-be-0.doris-cluster-be-internal.doris.svc.cluster.local', port=9060} failed.
org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.UnknownHostException: doris-cluster-be-0.doris-cluster-be-internal.doris.svc.cluster.local
at org.apache.doris.shaded.org.apache.thrift.transport.TSocket.open(TSocket.java:255)
at org.apache.doris.spark.backend.BackendClient.open(BackendClient.java:87)
at org.apache.doris.spark.backend.BackendClient.(BackendClient.java:72)
at org.apache.doris.spark.rdd.ScalaValueReader.client$lzycompute(ScalaValueReader.scala:50)
at org.apache.doris.spark.rdd.ScalaValueReader.client(ScalaValueReader.scala:50)
at org.apache.doris.spark.rdd.ScalaValueReader.org$apache$doris$spark$rdd$ScalaValueReader$$lockClient(ScalaValueReader.scala:239)
at org.apache.doris.spark.rdd.ScalaValueReader.(ScalaValueReader.scala:138)
at org.apache.doris.spark.sql.ScalaDorisRowValueReader.(ScalaDorisRowValueReader.scala:32)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader$lzycompute(AbstractDorisRDDIterator.scala:43)
at org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader(AbstractDorisRDDIterator.scala:36)
at org.apache.doris.spark.rdd.AbstractDorisRDDIterator.hasNext(AbstractDorisRDDIterator.scala:56)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: doris-cluster-be-0.doris-cluster-be-internal.doris.svc.cluster.local
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:196)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
at java.net.Socket.connect(Socket.java:606)
at org.apache.doris.shaded.org.apache.thrift.transport.TSocket.open(TSocket.java:250)
... 32 more
--------------
说明一下:doris on k8s 安装(参考https://www.cnblogs.com/kubesphere/p/18597667 运维实战:K8s 上的 Doris 高可用集群最佳实践 )

root@k8s-master:/data/plugins/doris# vi 05-doris-cluster-fe-conf.yaml

# store metadata, must be created before start FE.
# Default value is ${DORIS_HOME}/doris-meta
# meta_dir = ${DORIS_HOME}/doris-meta

# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers

http_port = 8030
rpc_port = 9020
query_port = 9030
edit_log_port = 9010
arrow_flight_sql_port = -1

# Choose one if there are more than one ip except loopback address.
# Note that there should at most one ip match this list.
# If no ip match this rule, will choose one randomly.
# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
# Default value is empty.
#    priority_networks = 10.244.0.0/16;192.168.0.0/16

# Advanced configurations
log_roll_size_mb = 1024
# INFO, WARN, ERROR, FATAL
sys_log_level = INFO
# NORMAL, BRIEF, ASYNC
sys_log_mode = ASYNC
# sys_log_roll_num = 10
# sys_log_verbose_modules = org.apache.doris
# audit_log_dir = $LOG_DIR
# audit_log_modules = slow_query, query
# audit_log_roll_num = 10
# meta_delay_toleration_second = 10
# qe_max_connection = 1024
# qe_query_timeout_second = 300
# qe_slow_log_ms = 5000
enable_fqdn_mode = true
initial_root_password = *aa7530f7c48740e92a4c0d2138324611e314d397

root@k8s-master:/data/plugins/doris# vi 06-doris-cluster-be-conf.yaml

# https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
# https://jemalloc.net/jemalloc.3.html
JEMALLOC_PROF_PRFIX=""

# ports for admin, web, heartbeat service
be_port = 9060
webserver_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
arrow_flight_sql_port = -1

# HTTPS configures
enable_https = false
# path of certificate in PEM format.
ssl_certificate_path = "$DORIS_HOME/conf/cert.pem"
# path of private key in PEM format.
ssl_private_key_path = "$DORIS_HOME/conf/key.pem"

# Choose one if there are more than one ip except loopback address.
# Note that there should at most one ip match this list.
# If no ip match this rule, will choose one randomly.
# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
# Default value is empty.
#  priority_networks = 10.244.0.0/16;192.168.0.0/16

# data root path, separate by ';'
# You can specify the storage type for each root path, HDD (cold data) or SSD (hot data)
# eg:
# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris
# storage_root_path = /home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD
# /home/disk2/doris,medium:HDD(default)
#
# you also can specify the properties by setting '<property>:<value>', separate by ','
# property 'medium' has a higher priority than the extension of path
#
# Default value is ${DORIS_HOME}/storage, you should create it by hand.
# storage_root_path = ${DORIS_HOME}/storage

# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers

# Advanced configurations
# INFO, WARNING, ERROR, FATAL
sys_log_level = INFO
# sys_log_roll_mode = SIZE-MB-1024
# sys_log_roll_num = 10
# sys_log_verbose_modules = *
# log_buffer_level = -1

# aws sdk log level
#    Off = 0,
#    Fatal = 1,
#    Error = 2,
#    Warn = 3,
#    Info = 4,
#    Debug = 5,
#    Trace = 6
# Default to turn off aws sdk log, because aws sdk errors that need to be cared will be output through Doris logs
aws_log_level=0
## If you are not running in aws cloud, you can disable EC2 metadata
AWS_EC2_METADATA_DISABLED=true


-----------------------------
root@k8s-master:/data/plugins/doris# vi 08-doris-cluster-fe.yaml 

        secretName: doris-cluster-secret
        defaultMode: 420
    - name: doris-cluster-fe-conf
      configMap:
        name: doris-cluster-fe-conf
        defaultMode: 420
          #      nodeName: node1 
  containers:
    - name: doris-cluster-fe
      #          image: 'docker.io/apache/doris:fe-3.0.3'
      # image: 'docker.io/apache/doris:fe-3.0.6.1'
      image: 'docker.io/selectdb/doris.fe-ubuntu:3.0.3'
      imagePullPolicy: IfNotPresent
      command:
        - /opt/apache-doris/fe_entrypoint.sh
      args:
        - $(ENV_FE_ADDR)
      ports:
        - name: http-port
          containerPort: 8030
          protocol: TCP
        - name: edit-log-port
          containerPort: 9010
          protocol: TCP
        - name: rpc-port
          containerPort: 9020
          protocol: TCP
        - name: query-port
          containerPort: 9030
          protocol: TCP
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: HOST_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.hostIP
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
        - name: CONFIGMAP_MOUNT_PATH
          value: /etc/doris
        - name: USER
          value: root
        - name: DORIS_ROOT
          value: /opt/apache-doris
        - name: ENV_FE_ADDR
          value: doris-cluster-fe-service
        - name: FE_QUERY_PORT
          value: '9030'
        - name: ELECT_NUMBER
          value: '1'
      resources:
        limits:
          cpu: '1'
          memory: 4Gi
        requests:
          cpu: '1'
          memory: 1Gi
      volumeMounts:
        - name: podinfo
          mountPath: /etc/podinfo
        - name: log
          mountPath: /opt/apache-doris/fe/log
        - name: meta
          mountPath: /opt/apache-doris/fe/doris-meta
        - name: doris-cluster-fe-conf
          mountPath: /etc/doris
        - name: basic-auth
          mountPath: /etc/basic_auth
      livenessProbe:
        tcpSocket:
          port: 9030
        initialDelaySeconds: 80
        timeoutSeconds: 180
        periodSeconds: 5
        successThreshold: 1
        failureThreshold: 3
      readinessProbe:
        httpGet:
          path: /api/health
          port: 8030
          scheme: HTTP
        timeoutSeconds: 1
        periodSeconds: 5
        successThreshold: 1
        failureThreshold: 3
      startupProbe:
        tcpSocket:
          port: 9030
        timeoutSeconds: 1
        periodSeconds: 5
        successThreshold: 1
        failureThreshold: 60
      lifecycle:
        preStop:
          exec:
            command:
             - /opt/apache-doris/fe_prestop.sh
      terminationMessagePath: /dev/termination-log
      terminationMessagePolicy: File
  restartPolicy: Always
  terminationGracePeriodSeconds: 30
  dnsPolicy: ClusterFirst
  securityContext: {}
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 100
          podAffinityTerm:
            labelSelector:
              matchExpressions:
                - key: app.kubernetes.io/component
                  operator: In
                  values:
                    - doris-cluster-fe
            topologyKey: kubernetes.io/hostname
  schedulerName: default-scheduler

volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: meta
spec:
accessModes:
- ReadWriteOnce
storageClassName: doris-storageclass-meta
resources:
requests:
storage: 5G
volumeMode: Filesystem
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: log
spec:
accessModes:
- ReadWriteOnce
storageClassName: doris-storageclass-log
resources:
requests:
storage: 5G
volumeMode: Filesystem

root@k8s-master:/data/plugins/doris# vi 09-doris-cluster-be.yaml

    - name: basic-auth
      secret:
        secretName: doris-cluster-secret
        defaultMode: 420
    - name: doris-cluster-be-conf
      configMap:
        name: doris-cluster-be-conf
        defaultMode: 420
  initContainers:
    - name: default-init
      # image: 'docker.io/alpine:latest'
      image: 'docker.io/selectdb/alpine:latest'
      imagePullPolicy: IfNotPresent
      command:
        - /bin/sh
      args:
        - '-c'
        - sysctl -w vm.max_map_count=2000000 && swapoff -a
      resources: {}
      terminationMessagePath: /dev/termination-log
      terminationMessagePolicy: File
      securityContext:
        privileged: true
          #nodeName: node2
  containers:
    - name: be
      #          image: 'docker.io/apache/doris:be-3.0.3'
      # image: 'docker.io/apache/doris:be-3.0.6.1'
      image: 'docker.io/selectdb/doris.be-ubuntu:3.0.3'
      imagePullPolicy: IfNotPresent
      command:
        - /opt/apache-doris/be_entrypoint.sh
      args:
        - $(ENV_FE_ADDR)
      ports:
        - name: webserver-port
          containerPort: 8040
          protocol: TCP
        - name: brpc-port
          containerPort: 8060
          protocol: TCP
        - name: heartbeat-port
          containerPort: 9050
          protocol: TCP
        - name: be-port
          containerPort: 9060
          protocol: TCP
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: HOST_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.hostIP
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
        - name: CONFIGMAP_MOUNT_PATH
          value: /etc/doris
        - name: USER
          value: root
        - name: DORIS_ROOT
          value: /opt/apache-doris
        - name: ENV_FE_ADDR
          value: doris-cluster-fe-service
        - name: FE_QUERY_PORT
          value: '9030'
      resources:
        limits:
          cpu: '2'
          memory: 4Gi
        requests:
          cpu: '1'
          memory: 1Gi
      volumeMounts:
        - name: podinfo
          mountPath: /etc/podinfo
        - name: be-storage
          mountPath: /opt/apache-doris/be/storage
        - name: be-log
          mountPath: /opt/apache-doris/be/log
        - name: doris-cluster-be-conf
          mountPath: /etc/doris
        - name: basic-auth
          mountPath: /etc/basic_auth
      livenessProbe:
        tcpSocket:
          port: 9050
        initialDelaySeconds: 80
        timeoutSeconds: 180
        periodSeconds: 5
        successThreshold: 1
        failureThreshold: 3
      readinessProbe:
        httpGet:
          path: /api/health
          port: 8040
          scheme: HTTP
        timeoutSeconds: 1
        periodSeconds: 5
        successThreshold: 1
        failureThreshold: 3
      startupProbe:
        tcpSocket:
          port: 9050
        timeoutSeconds: 1
        periodSeconds: 5
        successThreshold: 1
        failureThreshold: 60
      lifecycle:
        preStop:
          exec:
            command:
              - /opt/apache-doris/be_prestop.sh
      terminationMessagePath: /dev/termination-log
      terminationMessagePolicy: File
  restartPolicy: Always
  terminationGracePeriodSeconds: 30
  dnsPolicy: ClusterFirst
  securityContext: {}
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 100
          podAffinityTerm:
            labelSelector:
              matchExpressions:
                - key: app.kubernetes.io/component
                  operator: In
                  values:
                    - doris-cluster-be
            topologyKey: kubernetes.io/hostname
  schedulerName: default-scheduler

volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: be-storage
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20G
storageClassName: doris-storageclass-meta
volumeMode: Filesystem
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: be-log
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5G
storageClassName: doris-storageclass-meta
volumeMode: Filesystem

0 Answers