Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tfayyaz/adbfa389d761743cbb40ca77fbf66d30 to your computer and use it in GitHub Desktop.
Save tfayyaz/adbfa389d761743cbb40ca77fbf66d30 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Copyright 2020 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# 1.1. BigQuery Storage & Spark DataFrames - Python"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Dataproc Cluster with Jupyter\n",
"\n",
"This notebook is designed to be run on Google Cloud Dataproc.\n",
"\n",
"Follow the links below for instructions on how to create a Dataproc Cluster with the Juypter component installed.\n",
"\n",
"* [Tutorial - Install and run a Jupyter notebook on a Dataproc cluster](https://cloud.google.com/dataproc/docs/tutorials/jupyter-notebook)\n",
"* [Blog post - Apache Spark and Jupyter Notebooks made easy with Dataproc component gateway](https://medium.com/google-cloud/apache-spark-and-jupyter-notebooks-made-easy-with-dataproc-component-gateway-fa91d48d6a5a)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Python 3 Kernel\n",
"\n",
"Use a Python 3 kernel (not PySpark) to allow you to configure the SparkSession in the notebook and include the [spark-bigquery-connector](https://github.com/GoogleCloudDataproc/spark-bigquery-connector) required to use the [BigQuery Storage API](https://cloud.google.com/bigquery/docs/reference/storage)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scala Version\n",
"\n",
"Check what version of Scala you are running so you can include the correct spark-bigquery-connector jar "
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"cat: /release: No such file or directory\n",
"Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL\n"
]
}
],
"source": [
"!scala -version"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Spark Session\n",
"\n",
"Include the correct version of the spark-bigquery-connector jar\n",
"\n",
"Scala version 2.11 - `'gs://spark-lib/bigquery/spark-bigquery-latest.jar'`.\n",
"\n",
"Scala version 2.12 - `'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'`."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder \\\n",
" .appName('1.1. BigQuery Storage & Spark DataFrames - Python')\\\n",
" .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Enable repl.eagerEval\n",
"\n",
"This will output the results of DataFrames in each step without the new need to show `df.show()` and also improves the formatting of the output"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"spark.conf.set(\"spark.sql.repl.eagerEval.enabled\",True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read BigQuery table into Spark DataFrame\n",
"\n",
"Use `filter()` to query data from a partitioned table."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- datehour: timestamp (nullable = true)\n",
" |-- wiki: string (nullable = true)\n",
" |-- title: string (nullable = true)\n",
" |-- views: long (nullable = true)\n",
"\n"
]
}
],
"source": [
"table = \"bigquery-public-data.wikipedia.pageviews_2020\"\n",
"df_wiki_pageviews = spark.read \\\n",
" .format(\"bigquery\") \\\n",
" .option(\"table\", table) \\\n",
" .option(\"filter\", \"datehour >= '2020-03-01' AND datehour < '2020-03-02'\") \\\n",
" .load()\n",
"\n",
"df_wiki_pageviews.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Select required columns and apply a filter using `where()` which is an alias for `filter()` then cache the table"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table border='1'>\n",
"<tr><th>title</th><th>wiki</th><th>views</th></tr>\n",
"<tr><td>2020_Democratic_P...</td><td>en</td><td>3242</td></tr>\n",
"<tr><td>Eurovision_Song_C...</td><td>en</td><td>2368</td></tr>\n",
"<tr><td>Colin_McRae</td><td>en</td><td>2360</td></tr>\n",
"<tr><td>Donald_trump</td><td>en</td><td>2223</td></tr>\n",
"<tr><td>Comparison_of_onl...</td><td>en</td><td>1398</td></tr>\n",
"<tr><td>Coronavirus</td><td>en</td><td>1872</td></tr>\n",
"<tr><td>-</td><td>en</td><td>136620</td></tr>\n",
"<tr><td>Bombshell_(2019_f...</td><td>en</td><td>1084</td></tr>\n",
"<tr><td>Brooklyn</td><td>en</td><td>1946</td></tr>\n",
"<tr><td>2019–20_coronavir...</td><td>en</td><td>8313</td></tr>\n",
"<tr><td>2019–20_Wuhan_cor...</td><td>en</td><td>1084</td></tr>\n",
"<tr><td>Apple_Network_Server</td><td>en</td><td>3524</td></tr>\n",
"<tr><td>Catholic_moral_th...</td><td>en</td><td>1328</td></tr>\n",
"<tr><td>Bernie_Sanders</td><td>en</td><td>1297</td></tr>\n",
"<tr><td>2019–20_coronavir...</td><td>en</td><td>1968</td></tr>\n",
"<tr><td>Brooklyn</td><td>en</td><td>1139</td></tr>\n",
"<tr><td>Charlie&#x27;s_Angels_...</td><td>en</td><td>1006</td></tr>\n",
"<tr><td>Corrupted_Blood_i...</td><td>en</td><td>1511</td></tr>\n",
"<tr><td>Donald_trump</td><td>en</td><td>1526</td></tr>\n",
"<tr><td>Coronavirus_disea...</td><td>en</td><td>1405</td></tr>\n",
"</table>\n",
"only showing top 20 rows\n"
],
"text/plain": [
"+--------------------+----+------+\n",
"| title|wiki| views|\n",
"+--------------------+----+------+\n",
"|2020_Democratic_P...| en| 3242|\n",
"|Eurovision_Song_C...| en| 2368|\n",
"| Colin_McRae| en| 2360|\n",
"| Donald_trump| en| 2223|\n",
"|Comparison_of_onl...| en| 1398|\n",
"| Coronavirus| en| 1872|\n",
"| -| en|136620|\n",
"|Bombshell_(2019_f...| en| 1084|\n",
"| Brooklyn| en| 1946|\n",
"|2019–20_coronavir...| en| 8313|\n",
"|2019–20_Wuhan_cor...| en| 1084|\n",
"|Apple_Network_Server| en| 3524|\n",
"|Catholic_moral_th...| en| 1328|\n",
"| Bernie_Sanders| en| 1297|\n",
"|2019–20_coronavir...| en| 1968|\n",
"| Brooklyn| en| 1139|\n",
"|Charlie's_Angels_...| en| 1006|\n",
"|Corrupted_Blood_i...| en| 1511|\n",
"| Donald_trump| en| 1526|\n",
"|Coronavirus_disea...| en| 1405|\n",
"+--------------------+----+------+\n",
"only showing top 20 rows"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_wiki_en = df_wiki_pageviews \\\n",
" .select(\"title\", \"wiki\", \"views\") \\\n",
" .where(\"views > 1000 AND wiki in ('en', 'en.m')\") \\\n",
" .cache()\n",
"\n",
"df_wiki_en"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Group by title and order by page views to see the top pages"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table border='1'>\n",
"<tr><th>title</th><th>total_views</th></tr>\n",
"<tr><td>Main_Page</td><td>10939337</td></tr>\n",
"<tr><td>United_States_Senate</td><td>5619797</td></tr>\n",
"<tr><td>-</td><td>3852360</td></tr>\n",
"<tr><td>Special:Search</td><td>1538334</td></tr>\n",
"<tr><td>2019–20_coronavir...</td><td>407042</td></tr>\n",
"<tr><td>2020_Democratic_P...</td><td>260093</td></tr>\n",
"<tr><td>Coronavirus</td><td>254861</td></tr>\n",
"<tr><td>The_Invisible_Man...</td><td>233718</td></tr>\n",
"<tr><td>Super_Tuesday</td><td>201077</td></tr>\n",
"<tr><td>Colin_McRae</td><td>200219</td></tr>\n",
"<tr><td>David_Byrne</td><td>189989</td></tr>\n",
"<tr><td>2019–20_coronavir...</td><td>156803</td></tr>\n",
"<tr><td>John_Mulaney</td><td>155605</td></tr>\n",
"<tr><td>2020_South_Caroli...</td><td>152137</td></tr>\n",
"<tr><td>AEW_Revolution</td><td>140503</td></tr>\n",
"<tr><td>Boris_Johnson</td><td>120957</td></tr>\n",
"<tr><td>Tom_Steyer</td><td>120926</td></tr>\n",
"<tr><td>Dyatlov_Pass_inci...</td><td>117704</td></tr>\n",
"<tr><td>Spanish_flu</td><td>108335</td></tr>\n",
"<tr><td>2020_coronavirus_...</td><td>107653</td></tr>\n",
"</table>\n",
"only showing top 20 rows\n"
],
"text/plain": [
"+--------------------+-----------+\n",
"| title|total_views|\n",
"+--------------------+-----------+\n",
"| Main_Page| 10939337|\n",
"|United_States_Senate| 5619797|\n",
"| -| 3852360|\n",
"| Special:Search| 1538334|\n",
"|2019–20_coronavir...| 407042|\n",
"|2020_Democratic_P...| 260093|\n",
"| Coronavirus| 254861|\n",
"|The_Invisible_Man...| 233718|\n",
"| Super_Tuesday| 201077|\n",
"| Colin_McRae| 200219|\n",
"| David_Byrne| 189989|\n",
"|2019–20_coronavir...| 156803|\n",
"| John_Mulaney| 155605|\n",
"|2020_South_Caroli...| 152137|\n",
"| AEW_Revolution| 140503|\n",
"| Boris_Johnson| 120957|\n",
"| Tom_Steyer| 120926|\n",
"|Dyatlov_Pass_inci...| 117704|\n",
"| Spanish_flu| 108335|\n",
"|2020_coronavirus_...| 107653|\n",
"+--------------------+-----------+\n",
"only showing top 20 rows"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pyspark.sql.functions as F\n",
"\n",
"df_wiki_en_totals = df_wiki_en \\\n",
".groupBy(\"title\") \\\n",
".agg(F.sum('views').alias('total_views'))\n",
"\n",
"df_wiki_en_totals.orderBy('total_views', ascending=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Write Spark Dataframe to BigQuery table\n",
"\n",
"Write the Spark Dataframe to BigQuery table using BigQuery Storage connector. This will also create the table if it does not exist. The GCS bucket and BigQuery dataset must already exist.\n",
"\n",
"If the GCS bucket and BigQuery dataset do not exist they will need to be created before running `df.write`\n",
"\n",
"- [Instructions here for creating a GCS bucket](https://cloud.google.com/storage/docs/creating-buckets)\n",
"- [Instructions here for creating a BigQuery Dataset](https://cloud.google.com/bigquery/docs/datasets) "
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"# Update to your GCS bucket\n",
"gcs_bucket = 'dataproc-bucket-name'\n",
"\n",
"# Update to your BigQuery dataset name you created\n",
"bq_dataset = 'dataset_name'\n",
"\n",
"# Enter BigQuery table name you want to create or overwite. \n",
"# If the table does not exist it will be created when you run the write function\n",
"bq_table = 'wiki_total_pageviews'\n",
"\n",
"df_wiki_en_totals.write \\\n",
" .format(\"bigquery\") \\\n",
" .option(\"table\",\"{}.{}\".format(bq_dataset, bq_table)) \\\n",
" .option(\"temporaryGcsBucket\", gcs_bucket) \\\n",
" .mode('overwrite') \\\n",
" .save()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Use BigQuery magic to query table\n",
"\n",
"Use the [BigQuery magic](https://googleapis.dev/python/bigquery/latest/magics.html) to check if the data was created successfully in BigQuery. This will run the SQL query in BigQuery and the return the results"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>title</th>\n",
" <th>total_views</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>Main_Page</td>\n",
" <td>10939337</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>United_States_Senate</td>\n",
" <td>5619797</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>-</td>\n",
" <td>3852360</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>Special:Search</td>\n",
" <td>1538334</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>2019–20_coronavirus_outbreak</td>\n",
" <td>407042</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>2020_Democratic_Party_presidential_primaries</td>\n",
" <td>260093</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>Coronavirus</td>\n",
" <td>254861</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>The_Invisible_Man_(2020_film)</td>\n",
" <td>233718</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>Super_Tuesday</td>\n",
" <td>201077</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>Colin_McRae</td>\n",
" <td>200219</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" title total_views\n",
"0 Main_Page 10939337\n",
"1 United_States_Senate 5619797\n",
"2 - 3852360\n",
"3 Special:Search 1538334\n",
"4 2019–20_coronavirus_outbreak 407042\n",
"5 2020_Democratic_Party_presidential_primaries 260093\n",
"6 Coronavirus 254861\n",
"7 The_Invisible_Man_(2020_film) 233718\n",
"8 Super_Tuesday 201077\n",
"9 Colin_McRae 200219"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%bigquery\n",
"SELECT title, total_views\n",
"FROM dataset_name.wiki_total_pageviews\n",
"ORDER BY total_views DESC\n",
"LIMIT 10"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment