Skip to content

Instantly share code, notes, and snippets.

@matthewpick
Last active August 7, 2020 20:44
Show Gist options
  • Save matthewpick/d7ca9504179c84df945d37a8c34ed107 to your computer and use it in GitHub Desktop.
Save matthewpick/d7ca9504179c84df945d37a8c34ed107 to your computer and use it in GitHub Desktop.
import io.delta.tables.DeltaTable
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
object DeltaWriter {
def generateSymlinkManifest(deltaPath: String, sparkSession: SparkSession): Unit = {
val deltaTable = DeltaTable.forPath(sparkSession, deltaPath)
deltaTable.generate("symlink_format_manifest")
}
def write(deltaPath: String, df: DataFrame, primaryKey: String, overwrite: Boolean = false): Unit = {
val deltaTableExists = DeltaTable.isDeltaTable(df.sparkSession, deltaPath)
var deltaTableEmpty = true
var deltaTableTransactionCount = 0L
var deltaTable: DeltaTable = null
if (deltaTableExists) {
try {
deltaTable = DeltaTable.forPath(df.sparkSession, deltaPath)
deltaTableTransactionCount = deltaTable.history().count()
deltaTableEmpty = deltaTableTransactionCount == 0
} catch {
case e: AnalysisException =>
println("Error with " + deltaPath)
e.printStackTrace()
}
}
if (deltaTable == null) {
deltaTableEmpty = true
}
if (overwrite) {
df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(deltaPath)
generateSymlinkManifest(deltaPath, df.sparkSession)
} else if (deltaTableEmpty) {
df.write.format("delta").save(deltaPath)
generateSymlinkManifest(deltaPath, df.sparkSession)
} else {
try {
deltaTable.as("oldData")
.merge(df.as("newData"), s"oldData.${primaryKey} = newData.${primaryKey}")
.whenMatched("newData.deleted = true")
.delete()
.whenMatched
.updateAll()
.whenNotMatched
.insertAll()
.execute()
generateSymlinkManifest(deltaPath, df.sparkSession)
} catch {
case e: AnalysisException =>
println("Schemas for " + deltaPath)
deltaTable.toDF.printSchema()
df.printSchema()
throw e
}
}
}
def vacuum(deltaPath: String, sparkSession: SparkSession): Unit = {
val deltaTable = DeltaTable.forPath(sparkSession, deltaPath)
deltaTable.vacuum()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment