Skip to content

Instantly share code, notes, and snippets.

@rjurney
Created October 9, 2023 10:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjurney/bdafadc640b4fb6a6dfb2c4b67db4c1c to your computer and use it in GitHub Desktop.
Save rjurney/bdafadc640b4fb6a6dfb2c4b67db4c1c to your computer and use it in GitHub Desktop.
GraphFrames scales very well however... it requires nodes and edges all have one pyspark.sql.DataFrame schema :(
from pyspark.sql.types import StructField, IntegerType, LongType, StringType, TimestampType
def add_missing_columns(df, all_columns):
"""Add any missing columns from any DataFrame among several we want to merge."""
for col_name, schema_field in all_columns:
if col_name not in df.columns:
df = df.withColumn(col_name, F.lit(None).cast(schema_field.dataType))
return df
# Get all column names with their types
all_columns = set(
list(zip(posts_df.columns, posts_df.schema)) + list(zip(users_df.columns, users_df.schema)) + list(zip(votes_df.columns, votes_df.schema))
)
[(x[0], x[1].dataType) for x in all_columns]
# Now apply this function to each of your DataFrames to get a consistent schema
posts_df = add_missing_columns(posts_df, all_columns)
users_df = add_missing_columns(users_df, all_columns)
votes_df = add_missing_columns(votes_df, all_columns)
# Now UNION the three DataFrames to get our nodes
nodes_df = posts_df.unionByName(users_df).unionByName(votes_df)
nodes_df.limit(10).toPandas()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment