Skip to content

Instantly share code, notes, and snippets.

@rabernat
Created December 8, 2022 00:08
Show Gist options
  • Save rabernat/c7ca31ffac475ca84e470ddaa3014f59 to your computer and use it in GitHub Desktop.
Save rabernat/c7ca31ffac475ca84e470ddaa3014f59 to your computer and use it in GitHub Desktop.
S3 Listing Benchmarks
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import asyncio
from time import perf_counter
import functools
from rich import print
from math import log
import json
import uuid
import modal
import modal.aio
s3_image = modal.Image.debian_slim().pip_install(["aiobotocore", "numpy"])
stub = modal.aio.AioStub("s3-test", image=s3_image)
def execution_timer(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tic = perf_counter()
result = await func(*args, **kwargs)
toc = perf_counter()
return {"function": func.__name__, "time": (toc - tic), **kwargs}
return wrapper
def generate_tree(depth=1, leaves=2, root: str = "root"):
if depth == 0:
return
if depth == 1:
for n in range(leaves):
yield "/".join((root, f"{n}.leaf"))
else:
for n in range(leaves):
new_root = "/".join((root, str(n)))
yield from generate_tree(depth=depth - 1, leaves=leaves, root=new_root)
def tree_size(depth, leaves):
return leaves**depth
# size = leaves**depth
# depth = log(size, leaves)
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys"))
@execution_timer
async def create_tree(*, depth: int, leaves: int, root: str):
from aiobotocore.session import get_session
bucket = "arraylake-test"
session = get_session()
async with session.create_client("s3", region_name="us-east-1") as client:
await asyncio.gather(
*(
client.put_object(Bucket=bucket, Key=key, Body=b"x01")
for key in generate_tree(depth, leaves, root)
),
return_exceptions=True,
)
async def list_objects_and_directories(client, bucket, root, include_subdirs=True):
if not root.endswith("/"):
root += "/"
is_finished = False
continuation_token = None
objects = []
directories = []
while not is_finished:
kwargs = {} if include_subdirs else {"Delimiter": "/"}
if continuation_token:
kwargs["ContinuationToken"] = continuation_token
result = await client.list_objects_v2(Bucket=bucket, Prefix=root, **kwargs)
objects += result.get("Contents", [])
directories += result.get("CommonPrefixes", [])
is_finished = not result["IsTruncated"]
continuation_token = result.get("NextContinuationToken")
return objects, [item["Prefix"] for item in directories]
async def count_objects_recursive(client, bucket, root) -> int:
objects, directories = await list_objects_and_directories(
client, bucket, root, include_subdirs=False
)
count = len(objects)
subdir_counts = await asyncio.gather(
*(count_objects_recursive(client, bucket, dir) for dir in directories)
)
return count + sum(subdir_counts)
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys"))
@execution_timer
async def list_flat(*, depth: int, leaves: int, root: str):
from aiobotocore.session import get_session
bucket = "arraylake-test"
session = get_session()
async with session.create_client("s3", region_name="us-east-1") as client:
objects, directories = await list_objects_and_directories(
client, bucket, root, include_subdirs=True
)
count = len(objects)
expected = tree_size(depth, leaves)
if count / expected < 0.9:
raise ValueError(f"Expected {expected} objects, got {count} objects")
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys"))
@execution_timer
async def list_recursive(*, depth: int, leaves: int, root: str):
from aiobotocore.session import get_session
bucket = "arraylake-test"
session = get_session()
async with session.create_client("s3", region_name="us-east-1") as client:
count = await count_objects_recursive(client, bucket, root)
expected = tree_size(depth, leaves)
if count / expected < 0.9:
raise ValueError(f"Expected {expected} objects, got {count} objects")
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys"))
@execution_timer
async def delete_tree(*, depth: int, leaves: int, root: str):
from aiobotocore.session import get_session
bucket = "arraylake-test"
session = get_session()
async with session.create_client("s3", region_name="us-east-1") as client:
await asyncio.gather(
*(
client.delete_object(Bucket=bucket, Key=key)
for key in generate_tree(depth, leaves, root)
),
return_exceptions=True,
)
@stub.function(secret=modal.Secret.from_name("ryan-earthmover-aws-keys"))
async def clear_all(*, root: str):
from aiobotocore.session import get_session
bucket = "arraylake-test"
session = get_session()
async with session.create_client("s3", region_name="us-east-1") as client:
objects, _ = await list_objects_and_directories(
client, bucket, root, include_subdirs=True
)
print("Deleting", len(objects), "objects")
await asyncio.gather(
*(
client.delete_object(Bucket=bucket, Key=obj["Key"])
for obj in objects
),
return_exceptions=True,
)
async def benchmark_run(*, depth: int, leaves: int, nruns=4):
root = "tree/" + uuid.uuid4().hex
print("Total Nodes:", tree_size(depth, leaves), "Depth:", depth, "Leaves:", leaves, "Root:", root)
create_time = await create_tree(depth=depth, leaves=leaves, root=root)
results = [create_time]
print(create_time)
await asyncio.sleep(1) # give s3 some time to catch up
for run in range(nruns):
list_time_flat = await list_flat(depth=depth, leaves=leaves, root=root)
print(list_time_flat)
results.append(list_time_flat)
list_time_recursive = await list_recursive(depth=depth, leaves=leaves, root=root)
print(list_time_recursive)
results.append(list_time_recursive)
delete_time = await delete_tree(depth=depth, leaves=leaves, root=root)
print(delete_time)
results.append(delete_time)
return results
async def main():
async with stub.run():
await clear_all(root="root")
coros =[]
for max_depth in range(14, 17, 2):
size = 2**max_depth
for depth in range(1, max_depth + 1):
if not max_depth % depth == 0:
continue
f = max_depth // depth
leaves = 2**f
assert leaves**depth == size
coros.append(benchmark_run(depth=depth, leaves=leaves))
for task in asyncio.as_completed(coros):
result = await task
fname = f"s3_listing_results/depth-{result[0]['depth']}_leaves-{result[0]['leaves']}.json"
print("Writing", fname)
with open(fname, mode="w") as fp:
json.dump(result, fp)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment