Skip to content

Instantly share code, notes, and snippets.

Last active December 13, 2019 14:40
Show Gist options
  • Save diogobaltazar/51e98e21a93db3cb2b6deb8774e2f834 to your computer and use it in GitHub Desktop.
Save diogobaltazar/51e98e21a93db3cb2b6deb8774e2f834 to your computer and use it in GitHub Desktop.
PySpark SQL comparison
# DATA ########################################################
''' test table
['str_col', 'int_col']
('abc', 1)
('def1', 2)
('def1', 3)
('def1', 3)
| abc| 1|
| def1| 2|
| def1| 3|
| def1| 3|
test_2 table:
['int_col', 'dt_col', 'str_col']
(3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
(3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
(4, datetime.datetime(2020, 1, 1, 0, 0), 'kil')
|int_col| dt_col| str_col|
| 3|2019-01-01 00:00:00|non-matching|
| 3|2019-01-01 00:00:00| def1|
| 4|2020-01-01 00:00:00| kil|
# SELECT DISTINCT #############################################
stat = (
'select distinct str_col '
+ 'from test '
#+ 'where test.str_col ~ \'[0-9]\''
df_res ='str_col').distinct()
# REGEX #######################################################
stat = (
'select * '
+ 'from test '
+ 'where test.str_col ~ \'[0-9]\''
df_res = df.filter(df['str_col'].rlike('[0-9]'))
# LIMIT #######################################################
stat = (
'select * '
+ 'from test '
+ 'limit 1'
df_res = df.limit(1)
# GROUPING AND AGGREGATING ####################################
# grouping is used to get information F about the group
# grouping must be followed by an aggregation F of the group
# group by A, agg F by A, show (A, F)
# showing other cols requires grouping them as well
# (how would I display the agg result with more cols?)
stat = (
'select round(avg(int_col), 2) as avg '
+ 'from test '
df_res = df.agg({"int_col": "avg"})
df_res = df.agg(F.round(F.avg(df['int_col']), 2).alias('avg'))
stat = (
'select round(avg(int_col), 2) as avg '
+ 'from test '
+ 'group by str_col'
df_res = (
.agg(F.round(F.avg(df['int_col']), 2).alias('avg'))
stat = (
'select str_col '
+ 'from test '
+ 'group by str_col '
df_res = (
.agg(F.count(df['int_col']))# obliged to specify an agg f
stat = (
'select * '
+ 'from test '
+ 'group by str_col, int_col '
df_res = (
.groupBy('str_col', 'int_col')
.agg(F.count(df['int_col'])) # obliged to specify an agg f
.select('str_col', 'int_col')
''' result
['str_col', 'int_col'] [('def1', 3), ('abc', 1), ('def1', 2)]
| def1| 2|
| abc| 1|
| def1| 3|
stat = (
'select max(int_col) '
+ 'from test '
df_res = df.agg({"int_col": "max"})
stat = (
'select max(int_col) '
+ 'from test '
+ 'group by str_col'
df_res = df.groupBy('str_col').agg({"int_col": "max"})
stat = (
'select max(int_col) as max_int '
+ 'from test '
+ 'group by str_col '
+ 'having max(int_col) < 3 ' # could not use max_int
df_res = (
.filter(F.col('max_int') < 3) # could not use df['max_int']
# ORDER/SORT ################################################
stat = (
'select * '
+ 'from test '
+ 'order by int_col'
df_res = df.sort(df['int_col'])
# CAST ######################################################
stat = (
'select dt_col::date '
+ 'from test_2 '
df_res = (
, F.date_format('dt_col', "yyyy-MM-dd")
.select('int_col', 'new_col')
.withColumnRenamed('new_col', 'dt_col')
''' result
['dt_col'] [(, 1, 1),)]
|int_col| dt_col|
| 3|2019-01-01|
# JOIN ######################################################
# A.B (on 1 column)
stat = (
'select * '
+ 'from test as t '
+ 'join test_2 as t2 ' # defaults to inner join
+ 'on t.int_col = t2.int_col;'
df_res = (
.join(df_2, 'int_col') # defaults to inner join
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
|int_col|str_col| dt_col| str_col|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-01 00:00:00| def1|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-01 00:00:00| def1|
# A.B (on 2 columns)
stat = (
'select * '
+ 'from test as t '
+ 'join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'and t.str_col = t2.str_col'
df_res = (
.join(df_2, ['int_col', 'str_col'], how = 'inner')
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1')
|int_col|str_col| dt_col|
| 3| def1|2019-01-01 00:00:00|
| 3| def1|2019-01-01 00:00:00|
# A+A.B
stat = (
'select * '
+ 'from test as t '
+ 'left join test_2 as t2 '
+ 'on t.int_col = t2.int_col;'
df_res = (
.join(df_2, 'int_col', 'left')
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
|int_col|str_col| dt_col| str_col|
| 1| abc| null| null|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 2| def1| null| null|
# A-B
stat = (
'select * '
+ 'from test as t '
+ 'left join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'where dt_col is null; # need not specify the tbl
df_res = (
.join(df_2, 'int_col', 'left')
.filter(F.col('dt_col').isNull()) # col means whatever ds res col, UNLESS it's ambiguous
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
| 1| abc| null| null|
| 2| def1| null| null|
# B+A.B
stat = (
'select * '
+ 'from test as t '
+ 'right join test_2 as t2 '
+ 'on t.int_col = t2.int_col;'
df_res = (
.join(df_2, 'int_col', 'right')
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
|int_col|str_col| dt_col| str_col|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 3| def1|2019-01-21 00:00:00| def1|
| 4| null|2020-01-02 00:00:00| kil|
# B-A
stat = (
'select * '
+ 'from test as t '
+ 'right join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'where str_col is null;'
df_res = (
.join(df_2, 'int_col', 'right')
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
|int_col|str_col| dt_col|str_col|
| 4| null|2020-01-02 00:00:00| kil|
# A+B
stat = (
'select * '
+ 'from test as t '
+ 'full join test_2 as t2 '
+ 'on t.int_col = t2.int_col;'
df_res = (
.join(df_2, 'int_col', 'full')
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1')
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching')
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
|int_col|str_col| dt_col| str_col|
| 1| abc| null| null|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 3| def1|2019-01-01 00:00:00|non-matching|
| 3| def1|2019-01-21 00:00:00| def1|
| 2| def1| null| null|
| 4| null|2020-01-02 00:00:00| kil|
# (A-B)+(B-A)
stat = (
'select * '
+ 'from test as t '
+ 'full join test_2 as t2 '
+ 'on t.int_col = t2.int_col '
+ 'where str_col is null '
+ 'or dt_col is null;' # mind the OR instead of AND
def_res = (
.join(df_2, 'int_col', 'full')
.where(df['str_col'].isNull() | df['str_col'].isNull())
''' result
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col']
('abc', 1, None, None, None)
('def1', 2, None, None, None)
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil')
|int_col|str_col| dt_col|str_col|
| 1| abc| null| null|
| 2| def1| null| null|
| 4| null|2020-01-02 00:00:00| kil|
# UNION/INTERSECTION/DIFFERENCE ###############################
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'union all ' # all == include duplicate rows
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
df_res = ('int_col', 'str_col')
.union('int_col', 'str_col'))
''' result
['int_col', 'str_col']
(1, 'abc')
(2, 'def1')
(3, 'def1')
(3, 'def1')
(3, 'non-matching')
(3, 'def1')
(4, 'kil')
|int_col| str_col|
| 1| abc|
| 2| def1|
| 3| def1|
| 3| def1|
| 3|non-matching|
| 3| def1|
| 4| kil|
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'union '
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
df_res = ('int_col', 'str_col')
.union('int_col', 'str_col'))
.distinct() # remove duplicate rows
''' result
['int_col', 'str_col']
(3, 'def1')
(4, 'kil')
(2, 'def1')
(1, 'abc')
(3, 'non-matching')
|int_col| str_col|
| 3| def1|
| 4| kil|
| 1| abc|
| 3|non-matching|
| 2| def1|
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'intersect '
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
df_res = ('int_col', 'str_col')
.intersect('int_col', 'str_col'))
['str_col', 'int_col']
('def1', 3)
| def1| 3|
stat = (
'select t.int_col, t.str_col '
+ 'from test as t '
+ 'except '
'select t2.int_col, t2.str_col '
+ 'from test_2 as t2 '
df_res = (
.join(df_2, ['int_col', 'str_col'], 'left')
.filter(df_2['dt_col'].isNull()) # so the col that df doesn't have
.select('str_col', 'int_col')
['str_col', 'int_col']
('abc', 1)
('def1', 2)
| def1| 2|
| abc| 1|
# DATA TYPES ##################################################
timestamp = ''
# EXERCISES ###################################################
stat = (
'select '
+ 'extract(year from dt_col) as year'
+ ', sum(int_col) ' # aggregation funct
+ 'from test_2 '
+ 'group by year '
+ 'order by year desc' # alias col year is recognized
df_res = (
'int_col' # explicit col used up ahead
, F.year('dt_col').alias('year'))
.agg(F.sum('int_col').alias('sum')) # aggregation funct
.sort(F.col('year').desc()) # recon alias col == F.col(col)
''' result
['year', 'sum']
(2020.0, 4)
(2019.0, 6)
|2020| 4|
|2019| 6|
stat = (
'select * '
+ 'from test as t '
+ 'where t.int_col in ('
+ 'select extract(day from dt_col) as day '
+ 'from test_2 as t2'
+ ')'
# the function must return a pyspark.sql.functions.udf
# and take any type of args
def in_days(col_name, days_df):
days = list(
lambda _: _[col_name]
, days_df.collect()
# currying: returning a function
# taking as args the non-specified args
# which must be of type pyspark.sql.column
def in_days_curry(col_value):
return col_value in days
return in_days_curry
df_res = (
in_days('day','dt_col').alias('day'))) # currying with:
(df['int_col']) # int_col column
['str_col', 'int_col']
('abc', 1)
('def1', 2)
| abc| 1|
| def1| 2|
# get all df.str_col, search them in df_2.str_col, get sum S of df_2.int_col,
# filter df by int_col, searching for divisors of S
stat = (
'with S as ('
+ 'select sum(t2.int_col) as S '
+ 'from test_2 as t2 '
+ 'where t2.str_col not in ('
# all distinct test strings
+ 'select distinct t.str_col '
+ 'from test as t '
+ ')'
+ ') '
+ 'select * '
+ 'from test as t, S '
+ 'where ((S.S * 2) % t.int_col) = 0 ' # as to get all even and 1
def filt_str(df):
# cols of df
df_col_vals = list(map(lambda _: _.str_col, df.collect()))
@F.udf(returnType = T.BooleanType())
def filt_str_udf(str_col_val):
return str_col_val not in df_col_vals
return filt_str_udf
def filt_by_divisers(dividend):
@F.udf(returnType = T.BooleanType())
def filt_by_divisers_udf(col_val):
return dividend % col_val == 0
return filt_by_divisers_udf
df_res = (
filt_by_divisers( # currying
.filter( # currying
.head().sum * 2 # as to get all even and 1
['str_col', 'int_col', 's']
('abc', 1, 7)
('def1', 2, 7)
| abc| 1|
| def1| 2|
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment