Skip to content

Instantly share code, notes, and snippets.

@mbiemann
Last active April 24, 2022 20:23
Show Gist options
  • Save mbiemann/5ea84deaed0842e58bdf220785e11205 to your computer and use it in GitHub Desktop.
Save mbiemann/5ea84deaed0842e58bdf220785e11205 to your computer and use it in GitHub Desktop.
Databricks Kinesis PySpark - shardsPerTask
import boto3
print("check || cluster_workes >= target || cluster_workes >= stream_shards / shards_per_task || cluster_workes >= stream_shards / (stream_shards / cluster_workes)")
cluster_workes = int(spark.sparkContext.getConf().get("spark.databricks.clusterUsageTags.clusterWorkers"))
stream_shards = boto3.client("kinesis").describe_stream_summary(StreamName=source_stream)["StreamDescriptionSummary"]["OpenShardCount"]
shards_per_task = int(stream_shards / cluster_workes)
if shards_per_task < 1:
raise Exception(f"Sizing Error: Cluster Workers can't be {cluster_workes}. It must be up to {stream_shards}.")
target = int(stream_shards / shards_per_task)
check = cluster_workes >= target
print(f"{check} || {cluster_workes} >= {target} || {cluster_workes} >= {stream_shards} / {shards_per_task} || {cluster_workes} >= {stream_shards} / ({stream_shards} / {cluster_workes})")
if not check:
raise Exception(f"Sizing Error: Cluster Workers can't be {cluster_workes}. It must be {target}.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment