1. Introduction
When working in Apache Spark, we often deal with more than one DataFrame. We’ll often want to combine data from these DataFrames into a new DataFrame. Spark enables us to do this by way of joins.
In this tutorial, we’ll learn different ways of joining two Spark DataFrames.
2. Setup
Let’s create two sample DataFrames that we’ll be using throughout this article:
import org.apache.spark.sql.SparkSession
val spark =
SparkSession.builder().appName("Joins").master("local").getOrCreate()
We import SparkSession, the entry point for accessing the DataFrame API. It comes with some configuration options like appName and master, which we set to “local”. This signifies that we are running Spark on the local machine:
import spark.implicits._
The spark.implicits._ import provides the toDF() method which converts sequences to Spark DataFrames*:*
val TopHorrorsIGN2022 = Seq(
(9, "Pearl"),
(6, "The Sadness"),
(6, "Offseason"),
(7, "Hatching"),
(8, "x")
).toDF("IMDB Rating", "IGN Movie Picks")
val TopHorrorsTheAVClub2022 = Seq(
(7, "Nope"),
(9, "Pearl"),
(8, "x"),
(5, "Barbarian"),
(5, "Bones And All")
).toDF("IMDB Rating", "AVC Movie Picks")
We created two DataFrames, TopHorrorsIGN2022 and TopHorrorsTheAVClub2022, which both hold data on the top five horror movies of 2022 according to IGN and the AVClub respectfully.
3. Joins
In this section, we’ll go through eight methods of joining two DataFrames, namely inner joins, outer joins, left outer joins, right outer joins, left semi joins, left anti joins, cartesian/cross joins, and self joins.
3.1. Inner Joins
An inner join will merge rows whenever matching values are common to both DataFrames. Values that don’t match won’t appear in the resulting DataFrame:
val innerJoin =
TopHorrorsIGN2022.join(TopHorrorsTheAVClub2022, Seq("IMDB Rating"))
innerJoin.show()
/** | IMDB Rating | IGN Movie Picks | AVC Movie Picks |
* |:------------|:----------------|:----------------|
* | 9 | Pearl | Pearl |
* | 7 | Hatching | Nope |
* | 8 | x | x |
*/
Here we display movies with common IMDB ratings in both IGN and AVClub. Seq(“IMDB Rating”) signifies the column on which to join the two DataFrames.
Inner joins are the default join type in Spark. We can explicitly specify the join type we need as a third parameter:
val innerJoin_v2 =
TopHorrorsIGN2022.join(TopHorrorsTheAVClub2022, Seq("IMDB Rating"), "inner")
3.2. Outer Joins
An outer join brings together all rows from both DataFrames, whether they have matching column values or not. However, if there’s no corresponding row in either the left or right DataFrame, Spark inserts null to fill the gap:
import org.apache.spark.sql.functions.col
val cols = List(col("IGN Movie Picks"), col("AVC Movie Picks"))
val query = TopHorrorsIGN2022(
"IGN Movie Picks"
) === TopHorrorsTheAVClub2022("AVC Movie Picks")
val outerJoin = TopHorrorsIGN2022
.join(TopHorrorsTheAVClub2022, query, "outer")
.select(cols: _*)
outerJoin.show()
/** | IGN Movie Picks | AVC Movie Picks |
* |:----------------|:----------------|
* | null | Barbarian |
* | null | Bones And All |
* | Hatching | null |
* | null | Nope |
* | Offseason | null |
* | Pearl | Pearl |
* | The Sadness | null |
* | x | x |
*/
In this example, we supply a list of columns “cols” to display using the select() method on the outerJoin DataFrame.
3.3. Left Outer Joins
Left outer joins include all the rows of the left DataFrame and only the rows from the right that match. Similar to outer joins, left outer joins insert a null on the right when there’s no value matching the one on the left:
val leftOuterJoin = TopHorrorsIGN2022
.join(TopHorrorsTheAVClub2022, query, "left_outer")
.select(cols: _*)
leftOuterJoin.show()
/** | IGN Movie Picks | AVC Movie Picks |
* |:----------------|:----------------|
* | Pearl | Pearl |
* | The Sadness | null |
* | Offseason | null |
* | Hatching | null |
* | x | x |
*/
Notably, it’s to view movies common in both DataFrames.
3.4. Right Outer Joins
Right outer joins are the opposite of left outer joins. This method includes all rows of the right DataFrame and only the rows from the left that match:
val rightOuterJoin = TopHorrorsIGN2022
.join(TopHorrorsTheAVClub2022, query, "right_outer")
.select(cols: _*)
rightOuterJoin.show()
/** | IGN Movie Picks | AVC Movie Picks |
* |:----------------|:----------------|
* | null | Nope |
* | Pearl | Pearl |
* | x | x |
* | null | Barbarian |
* | null | Bones And All |
*/
3.5. Left Semi Joins
Left semi joins uniquely don’t include any rows from the right DataFrame. The resultant DataFrame only include values in the left DataFrame that correspond to the right. In case these values appear twice, all the duplicate rows appear in the result:
val leftSemiJoin = TopHorrorsIGN2022
.join(TopHorrorsTheAVClub2022, query, "left_semi")
.select("IGN Movie Picks", "IMDB Rating")
leftSemiJoin.show()
/** | IGN Movie Picks | IMDB Rating |
* |:----------------|:------------|
* | Pearl | 9 |
* | x | 8 |
*/
Here we see IGN movies that were also selected by AVClub.
3.6. Left Anti Joins
Left anti joins are the inverse of left semi joins. Unlike the latter, which retains rows that correspond to the right DataFrame, a left anti join only keeps rows from the left DataFrame that don’t have corresponding values on the right:
val leftAntiJoin = TopHorrorsIGN2022
.join(TopHorrorsTheAVClub2022, query, "left_anti")
.select("IGN Movie Picks", "IMDB Rating")
leftAntiJoin.show()
/** | IGN Movie Picks | IMDB Rating |
* |:----------------|:------------|
* | The Sadness | 6 |
* | Offseason | 6 |
* | Hatching | 7 |
*/
3.7. Cartesian/Cross Joins
This is a unique join method that’s used in very specific cases. Joining two DataFrames, each with 500 rows, leads to a resultant DataFrame with (500 * 500) rows. The explosion in the number of rows is one of the main reasons to be careful with this joining method.
Cross joins create a cartesian product of the two DataFrames. In other words, every possible combination of rows will be in the resultant DataFrame:
val crossJoin = TopHorrorsIGN2022.crossJoin(TopHorrorsTheAVClub2022)
crossJoin.show(5)
/** | IMDB Rating | IGN Movie Picks | IMDB Rating | AVC Movie Picks |
* |:------------|:----------------|:------------|:----------------|
* | 9 | Pearl | 7 | Nope |
* | 9 | Pearl | 9 | Pearl |
* | 9 | Pearl | 8 | x |
* | 9 | Pearl | 5 | Barbarian |
* | 9 | Pearl | 5 | Bones And All |
* only showing top 5 rows
*/
We only display the first five rows, however, the resultant DataFrame should contain 25 (5 * 5) rows.
3.8. Self Joins
Self joins are a way of joining data within the same DataFrame. Let’s create a new DataFrame to see how self joins work:
val TopHorrors2022 = Seq(
("Nope", "Jordan Peele", "Pearl", "Ti West"),
("Pearl", "Ti West", "The Sadness", "Rob Jabbaz"),
("x", "Ti West", "Offseason", "Robert Cole"),
("Barbarian", "Zach Cregger", "Hatching", "Hanna Bergolm"),
("Bones And All", "Luca Guadagninino", "x", "Ti West")
).toDF("AVC Movie Picks", "Director_AVC", "IGN Movie Picks", "Director_IGN")
Self joins are unique since we can use any of the join() methods described previously:
val selfJoin = TopHorrors2022
.alias("df1")
.join(
TopHorrors2022.alias("df2"),
col("df1.Director_AVC") === col("df2.Director_IGN"),
"left_Semi"
)
.select("AVC Movie Picks", "Director_AVC")
Here we make use of the alias() method to simulate two DataFrames when using the join() method:
selfJoin.show()
/** | AVC Movie Picks | Director_AVC |
* |:----------------|:-------------|
* | Pearl | Ti West |
* | x | Ti West |
*/
The result shows AVClub movies with directors that also directed IGN movies.
4. Conclusion
In this article, we learned eight ways of joining two Spark DataFrames, namely, inner joins, outer joins, left outer joins, right outer joins, left semi joins, left anti joins, cartesian/cross joins, and self joins.
These join types come in handy when dealing with joining two DataFrames. Depending on the kind of output we need there’s a join type that fits the case.
As always, all the code can be found over on GitHub.