如何在写入hive orc表时合并spark中的小文件

我正在从s3读取csv文件并将其作为orc写入hive表。在写作时,它写的是很多小文件。我需要合并所有这些文件。我有以下属性集:

 spark.sql("SET hive.merge.sparkfiles = true")
 spark.sql("SET hive.merge.mapredfiles = true")
 spark.sql("SET hive.merge.mapfiles = true")
 spark.sql("set hive.merge.smallfiles.avgsize = 128000000")
 spark.sql("set hive.merge.size.per.task = 128000000")

除了这些配置,我尝试重新分区(1)和coalesce(1),它将合并到单个文件中,但它删除了hive表并再次创建它。

 masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);

如果我使用Append模式而不是Overwrite,它会在每个分区下创建复制文件。

  masterFile.repartition(1).write.mode(SaveMode.Append).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);

在这两种情况下,spark作业都会运行两次而在第二次执行时会失败

有没有什么方法可以使用Append模式重新分区/合并而不会在每个分区中重复部分文件?

0
投票
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>)

.orc()方法将数据写为文件而不是触摸元信息。所以它无法覆盖HIVE中的表格。

如果您想覆盖hive表中的数据使用方法.insertInto(hive_table_name),其中hive_table_name是HIVE中表的全名(schema + table_name)

根据你的例子

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).insertInto(hiveTableName)

也可以用元数据信息覆盖数据。具有覆盖修饰符的方法.saveAsTable(hive_table_name)也将覆盖Metastore中的数据。

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).saveAsTable(hiveTableName)