spark往doris写入数据时删除某些数据该如何配置

Viewed 16

doris-2.1.10 scala-2.12
在之前我已经执行spark应用写入了一些数据在doris中,现在我根据操作通过withColumn给其中一些数据加上了删除标记:
df.withColumn("should_delete",when(col("op") === "d", lit(1).otherwise(0))).drop("op")
当op为d时,表示这条数据之前已经写入了,现在要删除
然后按照查询的资料,修改写入方法,没有生效,下面是我的写入方法:
df
.write
.format("doris")
.option("doris.table.identifier", table)
.option("doris.fenodes", feNodes)
.option("user", user)
.option("password", pwd)
.option("sink.batch.size", sinkBatchSize)
.option("doris.delete.sign.column", "should_delete")
.option("doris.delete.sign.value", "1")
.option("doris.request.connect.timeout.ms", dorisConnectTimeOut)
.option("doris.request.read.timeout.ms", dorisConnectTimeOut)
.option("doris.request.query.timeout.s", dorisQueryTimeOut)
.option("doris.sink.task.partition.size", partitionSize)
.option("doris.sink.task.use.repartition", value = true)
.save()
可以指导一下是哪里有问题吗,我写入的表是Unique Key模型表

1 Answers

参考这个例子:

import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions.lit

object DorisBatchDeleteApp {

  /*
  *
  * CREATE TABLE test.example_table (
  *      id BIGINT NOT NULL,     value STRING )
  * UNIQUE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 10
  * PROPERTIES(     "enable_unique_key_merge_on_write" = "true"
  *  ,"replication_num"="1");
  *
  * insert into test.example_table values(1,"xxxxx");
  *
  * set show_hidden_columns=true;
  *
  * desc test.example_table;
  *
  * 删除 id = 1 的数据
  *
  *
  *
  * 依赖:
  *<dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>spark-doris-connector-spark-2</artifactId>
            <version>25.2.0</version>
        </dependency>
        <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.4.8</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.4.8</version>
            </dependency>
        </dependencies>
  *
  * */

  def main(args: Array[String]): Unit = {
    // 1. 创建 SparkSession(本地模式,适合 IDEA)
    val spark = SparkSession.builder()
      .appName("Doris Upsert + Delete Demo")
      .master("local[1]")
      .config("spark.sql.adaptive.enabled", "false") // 避免小文件问题(可选)
      .getOrCreate()

    import spark.implicits._

    // 1. 构造 10 行新数据,并添加 __DORIS_DELETE_SIGN__ = 0(表示正常写入)
    val newData = (2 to 11).zipWithIndex.map { case (id, idx) =>
      (id.toLong, ('a' + idx).toChar.toString * 3)
    }
    val newDF = newData.toDF("id", "namexxx") // 注意:这里先用临时名避免冲突
      .withColumn("__DORIS_DELETE_SIGN__", lit(0)) // 0 表示非删除
      .select($"id", $"namexxx".as("name"), $"__DORIS_DELETE_SIGN__") // 重命名并调整顺序

    // 2. 构造要删除的记录:id=1,name 可任意(如 "xxx"),标记为删除
    val deleteDF = Seq((1L, "xxx")).toDF("id", "name")
      .withColumn("__DORIS_DELETE_SIGN__", lit(1))

    // 3. 现在两个 DF 都是 3 列:id, name, __DORIS_DELETE_SIGN__
    val unionDF = newDF.union(deleteDF)

    // 5. 打印预览
    println(">>> 最终写入数据(含删除标记):")
    unionDF.show(false)

    // 6. 写入 Doris
    unionDF.write
      .format("doris")
      .option("doris.table.identifier", "test.example_table")     // 替换为你的库.表
      .option("doris.fenodes", "10.16.10.6:18739")                      // 替换为你的 FE 地址
      .option("user", "root")
      .option("password", "")                                         // 按需填写
      .option("doris.write.properties.columns", "id,name,__DORIS_DELETE_SIGN__")
      .option("doris.write.properties.merge_type", "MERGE")           // 必须为 MERGE
      .mode(SaveMode.Append)
      .save()

    println(">>> 数据已写入 Doris!")
    spark.stop()
  }
}