Skip to content

Instantly share code, notes, and snippets.

@brandonhamric
Last active May 9, 2016 06:03
Show Gist options
  • Save brandonhamric/57e7bab407a5ebdd77e0a574586fa9f0 to your computer and use it in GitHub Desktop.
Save brandonhamric/57e7bab407a5ebdd77e0a574586fa9f0 to your computer and use it in GitHub Desktop.
Create an unique hash for an RDD
from pyspark.shuffle import ExternalSorter
from pyspark.rdd import _parse_memory
def hash_rdd(rdd, id_func=lambda el: repr(el), hash_function='sha256', num_partitions=200):
"""
This function returns a unique hash representing all records in the rdd. Order of items doesn't affect the hash.
params:
id_func - a function that gets a unique string identifier from an rdd element. If none is specified, use repr(element)
hash_function - is the name of the hashlib algorithm you want to use.
num_partitions - how many partitions should be used (this directly affects the length of the hash).
returns:
unique hash for the given rdd. The length of the hash is the length of the digest for the given hash algorithm times the number of partitions.
Using any function to combine hash digests reduces the collision resistance of the hashes.
http://robotics.stanford.edu/~xb/crypto06b/blackboxhash.pdf
"""
worker_memory = _parse_memory(rdd.ctx._conf.get("spark.python.worker.memory", "512m"))
memory_utilization = 0.9
serializer = rdd._jrdd_deserializer
def sortPartitionAndHash(iterator):
sorter = ExternalSorter(worker_memory * memory_utilization, serializer).sorted
sorted_items_iter = iter(sorter(iterator, key=id_func))
import hashlib
hash_obj = hashlib.new('sha256')
count = 0
for item in sorted_items_iter:
hash_obj.update(id_func(item))
count += 1
return [(hash_obj.hexdigest(), count)]
sorted_element_hashes = rdd\
.repartition(num_partitions)\
.mapPartitions(sortPartitionAndHash, True)\
.filter(lambda (hash, count): count>0)\
.map(lambda (hash, count): hash)\
.collect()
return "".join(sorted(sorted_element_hashes))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment