Skip to content

Instantly share code, notes, and snippets.

@juanignaciosl
Last active February 15, 2022 07:09
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juanignaciosl/bee7e6577c0d6c0caba6 to your computer and use it in GitHub Desktop.
Save juanignaciosl/bee7e6577c0d6c0caba6 to your computer and use it in GitHub Desktop.
Spark notes

Spark notes

Pieces

Mostly taken from [3]

The RDD is how Spark simplifies complex operations like join or groupBy and hides the fact that under the hood, you’re dealing with fragmented data.

The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. Furthermore, since with fewer partitions there’s more data in each partition, you increase the memory pressure on your program. On the flip side, with too many partitions, your performance may degrade as you take a greater hit from network and disk I/O.

Parameters

  • yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.

Tips

  • [1 and 7] Avoid groupByKey when performing an associative reductive operation.
  • [1] Avoid reduceByKey When the input and output value types are different.
  • [1] Avoid the flatMap-join-groupBy pattern.
  • [4] In general, if you use some data twice, cache it.

Shuffling

  • [1] One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor.
  • [4] Accumulators are a way to efficiently update a variable in parallel during execution. Accumulators differ from broadcast variables in that they may only be read from on the driver process, but they allow Spark programs to efficiently aggregate results.
  • [1] An extra shuffle can be advantageous to performance when it increases parallelism.
  • [4] You avoid shipping data avoiding operations that trigger shuffles like repartition and coalesce, ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Sorting

  • [1] Take repartitionAndSortWithinPartitions into account.

Plan execution

Taken from theIntroduction to Apache Spark on Databricks.

Example:

== Physical Plan ==
*Project [avg(price)#276,carat#282]
+- *BroadcastHashJoin [color#109], [color#284], Inner, BuildRight, None
   :- *TungstenAggregate(key=[cut#108,color#109], functions=[(avg(cast(price#113 as bigint)),mode=Final,isDistinct=false)], output=[color#109,avg(price)#276])
   :  +- Exchange hashpartitioning(cut#108, color#109, 200), None
   :     +- *TungstenAggregate(key=[cut#108,color#109], functions=[(avg(cast(price#113 as bigint)),mode=Partial,isDistinct=false)], output=[cut#108,color#109,sum#314,count#315L])
   :        +- *Project [cut#108,color#109,price#113]
   :           +- *Filter isnotnull(color#109)
   :              +- *Scan csv [cut#108,color#109,price#113] Format: CSV, InputPaths: dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv, PushedFilters: [IsNotNull(color)], ReadSchema: struct<cut:string,color:string,price:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
      +- *Project [carat#282,color#284]
         +- *Filter isnotnull(color#284)
            +- *Scan csv [carat#282,color#284] Format: CSV, InputPaths: dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv, PushedFilters: [IsNotNull(color)], ReadSchema: struct<carat:double,color:string>

spark-jobserver

Bugs

Sources

Official documentation highlights

Articles

  1. How to tune your Apache Spark Jobs part 1.
  2. How to tune your Apache Spark Jobs part 2.
  3. Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle.
  4. Spark Shuffle Introduction.
  5. Spark Architecture: Shuffle.
  6. Spark Memory Management.
  7. Avoid GroupByKey.
  8. Best Practices for YARN Resource Management.

Tooling

// Examples with GitHub tops: https://github.com/JJ/top-github-users-data/
def file = sc.textFile("../top-github-users-data/data/all-users-data.csv")
def data = file.filter(line => line.split(";")(2).forall(_.isDigit))
val biggestContributor = data.map { line => val splitted = line.split(";"); (splitted(0), splitted(2).toInt) }.reduce((a, b) => if (a._2 >= b._2) a else b)
// biggestContributor: (String, Int) = (mmoreram,5503)
val totalContributions = data.map(line => line.split(";")(2).toInt).reduce((a, b) => a + b)
// totalContributions: Int = 496723
val contributionsByCity = data.map { line => val splitted = line.split(";"); (splitted(1).trim(), splitted(2).toInt) }.reduceByKey((a, b) => a + b)
contributionsByCity.collect()
// res16: Array[(String, Int)] = Array((Soria,229), (Las Palmas,3693), (Asturias,10475), (Pontevedra,3827), (León,909), (Tenerife,5720), (Cáceres,542), (Jaén,2366), (Sevilla,14970), (Valladolid,4876), (Granada,33739), (Burgos,671), (Córdoba,4978), (Ciudad Real,2199), (Zamora,45), (Gerona,2425), (Segovia,437), (Alicante,4941), (Bilbao,10791), (Salamanca,826), (Palencia,372), (Coruña,5162), (Cantabria,2598), (Castellón,1333), (Lugo,1222), (Tarragona,1778), (Málaga,9766), (Murcia,2100), (Barcelona,124291), (Ourense,1590), (Zaragoza,16643), (Cádiz,2989), (Badajoz,391), (Ávila,25), (Toledo,1275), (Madrid,170902), (Rioja,1159), (Cuenca,164), (Albacete,1193), (Huesca,707), (Lleida,1797), (Almería,4025), (Baleares,2489), (Donostia,5209), (Huelva,1924), (Valencia,21684), (Navarra,2945), (Álava,2331))
val biggestContributorCity = contributionsByCity.reduce((a, b) => if (a._2 >= b._2) a else b)
// biggestContributorCity: (String, Int) = (Madrid,170902)
val mostFollowed = data.map { line => val splitted = line.split(";"); (splitted(0), splitted(4).toInt) }.reduce((a, b) => if (a._2 >= b._2) a else b)
// mostFollowed: (String, Int) = (mrdoob,5600)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment