Skip to content

Instantly share code, notes, and snippets.

@diogobaltazar
Last active December 13, 2019 14:40
Show Gist options
  • Save diogobaltazar/51e98e21a93db3cb2b6deb8774e2f834 to your computer and use it in GitHub Desktop.
Save diogobaltazar/51e98e21a93db3cb2b6deb8774e2f834 to your computer and use it in GitHub Desktop.
PySpark SQL comparison
# DATA ########################################################
''' test table
['str_col', 'int_col']
('abc', 1)
('def1', 2)
('def1', 3)
('def1', 3)
+-------+-------+
|str_col|int_col|
+-------+-------+
| abc| 1|
| def1| 2|
| def1| 3|
| def1| 3|
+-------+-------+
test_2 table:
['int_col', 'dt_col', 'str_col']
(3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
(3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
(4, datetime.datetime(2020, 1, 1, 0, 0), 'kil')
+-------+-------------------+------------+
|int_col| dt_col| str_col|
+-------+-------------------+------------+
| 3|2019-01-01 00:00:00|non-matching|
| 3|2019-01-01 00:00:00| def1|
| 4|2020-01-01 00:00:00| kil|
+-------+-------------------+------------+
'''
# SELECT DISTINCT #############################################
stat = (
'select distinct str_col '
+ 'from test '
#+ 'where test.str_col ~ \'[0-9]\''
)
df_res = df.select('str_col').distinct()
# REGEX #######################################################
stat = (
'select * '
+ 'from test '
+ 'where test.str_col ~ \'[0-9]\''
)
df_res = df.filter(df['str_col'].rlike('[0-9]'))
# LIMIT #######################################################
stat = (
'select * '
+ 'from test '
+ 'limit 1'
)
df_res = df.limit(1)
# GROUPING AND AGGREGATING ####################################
# grouping is used to get information F about the group
# grouping must be followed by an aggregation F of the group
# group by A, agg F by A, show (A, F)
# showing other cols requires grouping them as well
# (how would I display the agg result with more cols?)
stat = (
'select round(avg(int_col), 2) as avg '
+ 'from test '
)
df_res = df.agg({"int_col": "avg"})
df_res = df.agg(F.round(F.avg(df['int_col']), 2).alias('avg'))
stat = (
'select round(avg(int_col), 2) as avg '
+ 'from test '
+ 'group by str_col'
)
df_res = (
df
.groupBy('str_col')
.agg(F.round(F.avg(df['int_col']), 2).alias('avg'))
)
stat = (
'select str_col '
+ 'from test '
+ 'group by str_col '
)
df_res = (
df
.groupBy('str_col')
.agg(F.count(df['int_col']))# obliged to specify an agg f
.select('str_col')
)
stat = (
'select * '
+ 'from test '
+ 'group by str_col, int_col '
)
df_res = (
df
.groupBy('str_col', 'int_col')
.agg(F.count(df['int_col'])) # obliged to specify an agg f
.select('str_col', 'int_col')
)
''' result
['str_col', 'int_col'] [('def1', 3), ('abc', 1), ('def1', 2)]
+-------+-------+
|str_col|int_col|
+-------+-------+
| def1| 2|
| abc| 1|
| def1| 3|
+-------+-------+
'''
stat = (
'select max(int_col) '
+ 'from test '
)
df_res = df.agg({"int_col": "max"})
stat = (
'select max(int_col) '
+ 'from test '
+ 'group by str_col'
)
df_res = df.groupBy('str_col').agg({"int_col": "max"})
stat = (
'select max(int_col) as max_int '
+ 'from test '
+ 'group by str_col '
+ 'having max(int_col) < 3 ' # could not use max_int
)
df_res = (
df
.groupBy('str_col')
.agg(F.max(df['int_col']).alias('max_int'))
.filter(F.col('max_int') < 3) # could not use df['max_int']
)
# ORDER/SORT ################################################
stat = (
'select * '
+ 'from test '
+ 'order by int_col'
)
df_res = df.sort(df['int_col'])
# CAST ######################################################
stat = (
'select dt_col::date '
+ 'from test_2 '
)
df_res = (
df
.withColumn(
'new_col'
, F.date_format('dt_col', "yyyy-MM-dd")
)
.select('int_col', 'new_col')
.withColumnRenamed('new_col', 'dt_col')
)
''' result
['dt_col'] [(datetime.date(2019, 1, 1),)]
+-------+----------+
|int_col| dt_col|
+-------+----------+
| 3|2019-01-01|
+-------+----------+
'''
# JOIN ######################################################
# A.B (on 1 column)
stat = (
'select * '
+ 'from test as t '
+ 'join test_2 as t2 ' # defaults to inner join
+ 'on t.int_col = t2.int_col;'
)
df_res = (
df
.join(df_2, 'int_col') # defaults to inner join
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
+-------+-------+-------------------+------------+
|int_col|str_col| dt_col| str_col|
+-------+-------+-------------------+------------+
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-01 00:00:00| def1|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-01 00:00:00| def1|
+-------+-------+-------------------+------------+
'''
# A.B (on 2 columns)
stat = (
'select * '
+ 'from test as t '
+ 'join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'and t.str_col = t2.str_col'
)
df_res = (
df
.join(df_2, ['int_col', 'str_col'], how = 'inner')
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
+-------+-------+-------------------+
|int_col|str_col| dt_col|
+-------+-------+-------------------+
| 3| def1|2019-01-01 00:00:00|
| 3| def1|2019-01-01 00:00:00|
+-------+-------+-------------------+
'''
# A+A.B
stat = (
'select * '
+ 'from test as t '
+ 'left join test_2 as t2 '
+ 'on t.int_col = t2.int_col;'
)
df_res = (
df
.join(df_2, 'int_col', 'left')
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
+-------+-------+-------------------+------------+
|int_col|str_col| dt_col| str_col|
+-------+-------+-------------------+------------+
| 1| abc| null| null|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 2| def1| null| null|
+-------+-------+-------------------+------------+
'''
# A-B
stat = (
'select * '
+ 'from test as t '
+ 'left join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'where dt_col is null; # need not specify the tbl
)
df_res = (
df
.join(df_2, 'int_col', 'left')
.filter(F.col('dt_col').isNull()) # col means whatever ds res col, UNLESS it's ambiguous
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
+-------+-------+------+-------+
|int_col|str_col|dt_col|str_col|
+-------+-------+------+-------+
| 1| abc| null| null|
| 2| def1| null| null|
+-------+-------+------+-------+
'''
# B+A.B
stat = (
'select * '
+ 'from test as t '
+ 'right join test_2 as t2 '
+ 'on t.int_col = t2.int_col;'
)
df_res = (
df
.join(df_2, 'int_col', 'right')
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
+-------+-------+-------------------+------------+
|int_col|str_col| dt_col| str_col|
+-------+-------+-------------------+------------+
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 3| def1|2019-01-21 00:00:00| def1|
| 4| null|2020-01-02 00:00:00| kil|
+-------+-------+-------------------+------------+
'''
# B-A
stat = (
'select * '
+ 'from test as t '
+ 'right join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'where str_col is null;'
)
df_res = (
df
.join(df_2, 'int_col', 'right')
.filter(df['str_col'].isNul())
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
+-------+-------+-------------------+-------+
|int_col|str_col| dt_col|str_col|
+-------+-------+-------------------+-------+
| 4| null|2020-01-02 00:00:00| kil|
+-------+-------+-------------------+-------+
'''
# A+B
stat = (
'select * '
+ 'from test as t '
+ 'full join test_2 as t2 '
+ 'on t.int_col = t2.int_col;'
)
df_res = (
df
.join(df_2, 'int_col', 'full')
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
+-------+-------+-------------------+------------+
|int_col|str_col| dt_col| str_col|
+-------+-------+-------------------+------------+
| 1| abc| null| null|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 2| def1| null| null|
| 4| null|2020-01-02 00:00:00| kil|
+-------+-------+-------------------+------------+
'''
# (A-B)+(B-A)
stat = (
'select * '
+ 'from test as t '
+ 'full join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'where str_col is null '
+ 'or dt_col is null;' # mind the OR instead of AND
)
def_res = (
df
.join(df_2, 'int_col', 'full')
.where(df['str_col'].isNull() | df['str_col'].isNull())
)
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
+-------+-------+-------------------+-------+
|int_col|str_col| dt_col|str_col|
+-------+-------+-------------------+-------+
| 1| abc| null| null|
| 2| def1| null| null|
| 4| null|2020-01-02 00:00:00| kil|
+-------+-------+-------------------+-------+
'''
# UNION/INTERSECTION/DIFFERENCE ###############################
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'union all ' # all == include duplicate rows
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
)
df_res = (
df.select('int_col', 'str_col')
.union(df_2.select('int_col', 'str_col'))
)
''' result
['int_col', 'str_col']
(1, 'abc')
(2, 'def1')
(3, 'def1')
(3, 'def1')
(3, 'non-matching')
(3, 'def1')
(4, 'kil')
+-------+------------+
|int_col| str_col|
+-------+------------+
| 1| abc|
| 2| def1|
| 3| def1|
| 3| def1|
| 3|non-matching|
| 3| def1|
| 4| kil|
+-------+------------+
'''
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'union '
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
)
df_res = (
df.select('int_col', 'str_col')
.union(df_2.select('int_col', 'str_col'))
.distinct() # remove duplicate rows
)
''' result
['int_col', 'str_col']
(3, 'def1')
(4, 'kil')
(2, 'def1')
(1, 'abc')
(3, 'non-matching')
+-------+------------+
|int_col| str_col|
+-------+------------+
| 3| def1|
| 4| kil|
| 1| abc|
| 3|non-matching|
| 2| def1|
+-------+------------+
'''
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'intersect '
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
)
df_res = (
df.select('int_col', 'str_col')
.intersect(df_2.select('int_col', 'str_col'))
)
'''result
['str_col', 'int_col']
('def1', 3)
+-------+-------+
|str_col|int_col|
+-------+-------+
| def1| 3|
+-------+-------+
'''
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'except '
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
)
df_res = (
df
.join(df_2, ['int_col', 'str_col'], 'left')
.filter(df_2['dt_col'].isNull()) # so the col that df doesn't have
.select('str_col', 'int_col')
)
'''result
['str_col', 'int_col']
('abc', 1)
('def1', 2)
+-------+-------+
|str_col|int_col|
+-------+-------+
| def1| 2|
| abc| 1|
+-------+-------+
'''
# DATA TYPES ##################################################
timestamp = 'https://www.postgresql.org/docs/9.1/functions-datetime.html'
# EXERCISES ###################################################
stat = (
'select '
+ 'extract(year from dt_col) as year'
+ ', sum(int_col) ' # aggregation funct
+ 'from test_2 '
+ 'group by year '
+ 'order by year desc' # alias col year is recognized
)
df_res = (
df
.select(
'int_col' # explicit col used up ahead
, F.year('dt_col').alias('year'))
.groupBy('year')
.agg(F.sum('int_col').alias('sum')) # aggregation funct
.sort(F.col('year').desc()) # recon alias col == F.col(col)
)
''' result
['year', 'sum']
(2020.0, 4)
(2019.0, 6)
+----+---+
|year|sum|
+----+---+
|2020| 4|
|2019| 6|
+----+---+
'''
# CURRYING
stat = (
'select * '
+ 'from test as t '
+ 'where t.int_col in ('
+ 'select extract(day from dt_col) as day '
+ 'from test_2 as t2'
+ ')'
)
# the function must return a pyspark.sql.functions.udf
# and take any type of args
def in_days(col_name, days_df):
days = list(
map(
lambda _: _[col_name]
, days_df.collect()
)
)
# currying: returning a function
# taking as args the non-specified args
# which must be of type pyspark.sql.column
@F.udf(returnType=T.BooleanType())
def in_days_curry(col_value):
return col_value in days
return in_days_curry
df_res = (
df
.filter(
in_days('day', df_2.select(F.dayofmonth('dt_col').alias('day'))) # currying with:
(df['int_col']) # int_col column
)
)
'''result
['str_col', 'int_col']
('abc', 1)
('def1', 2)
+-------+-------+
|str_col|int_col|
+-------+-------+
| abc| 1|
| def1| 2|
+-------+-------+
'''
# get all df.str_col, search them in df_2.str_col, get sum S of df_2.int_col,
# filter df by int_col, searching for divisors of S
stat = (
'with S as ('
+ 'select sum(t2.int_col) as S '
+ 'from test_2 as t2 '
+ 'where t2.str_col not in ('
# all distinct test strings
+ 'select distinct t.str_col '
+ 'from test as t '
+ ')'
+ ') '
+ 'select * '
+ 'from test as t, S '
+ 'where ((S.S * 2) % t.int_col) = 0 ' # as to get all even and 1
)
def filt_str(df):
# cols of df
df_col_vals = list(map(lambda _: _.str_col, df.collect()))
@F.udf(returnType = T.BooleanType())
def filt_str_udf(str_col_val):
return str_col_val not in df_col_vals
return filt_str_udf
def filt_by_divisers(dividend):
@F.udf(returnType = T.BooleanType())
def filt_by_divisers_udf(col_val):
return dividend % col_val == 0
return filt_by_divisers_udf
df_res = (
df
.filter(
filt_by_divisers( # currying
df_2
.filter( # currying
filt_str(df.select('str_col'))
(df_2['str_col'])
)
.select(F.sum('int_col').alias('sum'))
.head().sum * 2 # as to get all even and 1
)
(df['int_col'])
)
)
'''result
['str_col', 'int_col', 's']
('abc', 1, 7)
('def1', 2, 7)
+-------+-------+
|str_col|int_col|
+-------+-------+
| abc| 1|
| def1| 2|
+-------+-------+
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment