Skip to content

Instantly share code, notes, and snippets.

@canimus
Created December 21, 2021 22:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save canimus/1817d7d37755e85e28cbae9e7e3f0ff1 to your computer and use it in GitHub Desktop.
Save canimus/1817d7d37755e85e28cbae9e7e3f0ff1 to your computer and use it in GitHub Desktop.
PySpark FFill Implementation
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql import Window as W
from pyspark.sql.window import WindowSpec
__all__ = ["forward_fill"]
def _window_all_previous_rows(partition, order) -> WindowSpec:
"""Select the window on which values are filled in a forward manner."""
return W.partitionBy(partition).orderBy(order).rowsBetween(W.unboundedPreceding, 0)
def _window_all_following_rows(partition, order) -> WindowSpec:
"""Select the window on which values are filled in a backward manner."""
return W.partitionBy(partition).orderBy(order).rowsBetween(0, W.unboundedFollowing)
def forward_fill(
df: DataFrame, partition: str, order: str, variable: str, number_of_days: int = 7
) -> DataFrame:
"""
It fills the null values of the "variable" column based on the last non-null value in a window defined by the "number_of_days". 7 days by default
The values are filled in a forward manner. This is equivalent to the so-called `ffill` in pandas or numpy
"""
# Write an Exception if a date appear more than one time
if df.count() > df.dropDuplicates(["name", "date"]).count():
raise ValueError("Data has duplicated order values for partition.")
_last_5_business_days = _window_all_previous_rows(partition, order)
_is_weekend = F.col("day_of_week") > 5
_is_business_day = ~_is_weekend
_date_with_value = F.when(F.col(variable).isNotNull(), F.col(order))
# RULE: We forward fill values within 7 days prior the considered row
_last_value_within_5_business_days = F.when(
F.col("history") <= number_of_days,
F.last(variable, ignorenulls=True).over(_last_5_business_days),
)
return (
df
.withColumn("day_of_week", F.dayofweek(order))
.withColumn("is_weekend", _is_weekend)
.filter(_is_business_day)
.withColumn(
"last_date_with_non_null_value",
F.last(_date_with_value, ignorenulls=True).over(_last_5_business_days),
)
.withColumn("history", F.datediff(order, "last_date_with_non_null_value"))
.na.fill(value=0, subset=["history"])
.withColumn(
"ffill",
F.coalesce(
F.col(variable),
_last_value_within_5_business_days,
),
)
)
def backward_fill(
df: DataFrame, partition: str, order: str, variable: str, number_of_days: int = 7
) -> DataFrame:
"""
It fills the null values of the "variable" column based on the next non-null value in a window defined by the "number_of_days".
The values are filled in a backward manner.
"""
# Write an Exception if a date appear more than one time
if df.count() > df.dropDuplicates(["name", "date"]).count():
raise ValueError("Data has duplicated order values for partition.")
_next_5_business_days = _window_all_following_rows(partition, order)
_is_weekend = F.col("day_of_week") > 5
_is_business_day = ~_is_weekend
_date_with_value = F.when(F.col(variable).isNotNull(), F.col(order))
# RULE: We forward fill values within 7 days prior the considered row
_next_value_within_5_business_days = F.when(
F.col("history") <= number_of_days,
F.last(variable, ignorenulls=True).over(
_window_all_following_rows(partition, order)
),
)
return (
df.withColumn("day_of_week", F.dayofweek(order))
.withColumn("is_weekend", _is_weekend)
.filter(_is_business_day)
.withColumn(
"next_date_with_non_null_value",
F.first(_date_with_value, ignorenulls=True).over(_next_5_business_days),
)
.withColumn("history", F.datediff("next_date_with_non_null_value", order))
.na.fill(value=0, subset=["history"])
.withColumn(
"bfill",
F.coalesce(
F.col(variable),
_next_value_within_5_business_days
),
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment