Spark读取Doris报错

Viewed 11

我使用IDEA开发Spark任务希望读取到Doris表的数据,但是报错,麻烦大佬们指点指点:
Doris版本:doris-2.0.2-rc05
Spark版本:3.3.1
连接器依赖:

<dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>spark-doris-connector-spark-3.1</artifactId>
            <version>25.0.1</version>
</dependency>
package doris.read

import config.ConfigHelper
import org.apache.doris.spark.sparkContextFunctions
import org.apache.spark.{SparkConf, SparkContext}

object DorisReaderRDDJobs {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("doris_rdd_reader").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        val dorisSparkRDD = sc.dorisRDD(
            tableIdentifier = Some("ecommerce_xxb.user_op_log"),
            cfg = Some(Map(
                "doris.fenodes" -> ConfigHelper.get("doris_fenodes").get(),
                "doris.request.auth.user" -> ConfigHelper.get("doris_username").get(),
                "doris.request.auth.password" -> ConfigHelper.get("doris_password").get(),
                "doris.request.connect.timeout.ms" -> Long.MaxValue.toString,
                "doris.request.read.timeout.ms" -> Long.MaxValue.toString
            ))
        )
        dorisSparkRDD.foreach(println)
    }
}

报错信息:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (D-20230630YDGEA executor driver): org.apache.spark.util.TaskCompletionListenerException: null

Previous exception in task: null
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader$lzycompute(AbstractDorisRDDIterator.scala:43)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader(AbstractDorisRDDIterator.scala:36)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.hasNext(AbstractDorisRDDIterator.scala:56)
scala.collection.Iterator.foreach(Iterator.scala:943)
scala.collection.Iterator.foreach$(Iterator.scala:943)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.foreach(AbstractDorisRDDIterator.scala:27)
org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1003)
org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1003)
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:136)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:206)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:143)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:136)
at org.apache.spark.scheduler.Task.run(Task.scala:146)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1003)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:1001)
at doris.read.DorisReaderRDDJobs$.main(DorisReaderRDDJobs.scala:22)
at doris.read.DorisReaderRDDJobs.main(DorisReaderRDDJobs.scala)
Caused by: org.apache.spark.util.TaskCompletionListenerException: null

Previous exception in task: null
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader$lzycompute(AbstractDorisRDDIterator.scala:43)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.reader(AbstractDorisRDDIterator.scala:36)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.hasNext(AbstractDorisRDDIterator.scala:56)
scala.collection.Iterator.foreach(Iterator.scala:943)
scala.collection.Iterator.foreach$(Iterator.scala:943)
org.apache.doris.spark.rdd.AbstractDorisRDDIterator.foreach(AbstractDorisRDDIterator.scala:27)
org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1003)
org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1003)
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:136)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:206)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:143)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:136)
at org.apache.spark.scheduler.Task.run(Task.scala:146)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

1 Answers

参考gihub spark-demo :https://github.com/apache/doris/tree/master/samples/doris-demo/spark-demo