Created
November 12, 2021 20:32
-
-
Save tuulos/866d285def5082f45653131d6c1c6564 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from metaflow import FlowSpec, step, Parameter, IncludeFile, catch | |
import math, time, uuid, datetime, random, string, sys | |
from decimal import Decimal | |
import requests | |
class CustomClass(): | |
def __str__(self): | |
return 'a' * int(1024**2) | |
def __repr__(self): | |
return str(self) | |
class DefaultCardFlow(FlowSpec): | |
str_param = Parameter('str_param', default='刺身は美味しい') | |
file_param = IncludeFile('file_param') | |
json_param = Parameter('json_param', default='{"states": {[{"CA", 0}, {"NY", 1}]}') | |
float_param = Parameter('float_param', default=math.pi) | |
@step | |
def start(self): | |
""" | |
This step creates a bunch of artifacts of various kinds. They | |
should show up nicely on the default card 🔬. | |
""" | |
self.python_objects() | |
self.images() | |
self.raise_exception() | |
self.large_python_objects() | |
self.custom_python_objects() | |
self.pandas() | |
self.numpy() | |
for i in range(1000): | |
print("%d) Let's produce some logs too 🐘🐘🐘" % i) | |
for i in range(2000): | |
print("%d) This is stderr 🐒🐒🐒" % i, file=sys.stderr) | |
self.next(self.split) | |
@step | |
def split(self): | |
""" | |
Spawn 10 foreach tasks. | |
""" | |
self.foreach1 = list(range(1, 11)) | |
self.next(self.first_foreach, foreach='foreach1') | |
@step | |
def first_foreach(self): | |
""" | |
Topmost foreach step: Launches a variable number of child tasks. | |
""" | |
self.first_index = self.input | |
self.foreach2 = list(range(self.input)) | |
self.next(self.second_foreach, foreach='foreach2') | |
@step | |
def second_foreach(self): | |
""" | |
Innermost foreach: Splits into two static branches. | |
""" | |
self.second_index = self.input | |
self.combined = (self.first_index, self.second_index) | |
self.next(self.static_a, self.static_b) | |
@step | |
def static_a(self): | |
""" | |
Innermost static branch A. The `combined` artifact tells | |
our index. | |
""" | |
self.combined = ('a', self.first_index, self.second_index) | |
self.next(self.join_static) | |
@step | |
def static_b(self): | |
""" | |
Innermost static branch B. The `combined` artifact tells | |
our index. | |
""" | |
self.combined = ('b', self.first_index, self.second_index) | |
self.next(self.join_static) | |
@step | |
def join_static(self, inputs): | |
""" | |
Join the two static branches. | |
""" | |
self.next(self.join_second_foreach) | |
@step | |
def join_second_foreach(self, inputs): | |
""" | |
Join the innermost foreach. | |
""" | |
self.next(self.join_first_foreach) | |
@step | |
def join_first_foreach(self, inputs): | |
""" | |
Join the topmost foreach. | |
""" | |
self.next(self.many_small_artifacts) | |
@step | |
def many_small_artifacts(self): | |
""" | |
Create 1000 random small artifacts to test how a step | |
with many artifacts renders on the default card. | |
""" | |
choices = [None, 2454, datetime.datetime.utcnow(), {'a', 'b', 'c'}] | |
for i in range(1000): | |
setattr(self, 'small_artifact_%d' % i, random.choice(choices)) | |
self.next(self.many_large_artifacts) | |
@step | |
def many_large_artifacts(self): | |
""" | |
Create 127 large, one megabyte artifacts. Interesting to see | |
how they'll show up on the default card. | |
""" | |
for i in range(127): | |
blob = chr(i).encode('ascii') * 1024**2 | |
setattr(self, 'large_artifact_%d' % i, blob) | |
self.next(self.catch_step) | |
@catch(var='step_failed') | |
@step | |
def catch_step(self): | |
""" | |
This step fails and stores the exception in an artifact, step_failed. | |
Hopefully this artifact will be visible on the default card. | |
""" | |
d = {} | |
print("Fail!") | |
print(d[3]) | |
self.next(self.end) | |
@step | |
def end(self): | |
""" | |
The end. | |
""" | |
pass | |
def python_objects(self): | |
self.py_int = 434 | |
self.py_float = math.pi | |
self.py_complex = complex(1,2) | |
self.py_list = [1,2,3] | |
self.py_tuple = (1,2,3) | |
self.py_range = range(10) | |
self.py_str = '刺身は美味しい' | |
self.py_bytes = b'\x00\x01\x02' | |
self.py_bytearray = bytearray(b'\xf0\xf1\xf2') | |
self.py_set = {1,2,3} | |
self.py_frozenset = frozenset({4,5,6}) | |
self.py_dict = {'a': 1, 'null': None, True: False} | |
self.py_type = type(str) | |
self.py_bool = True | |
self.py_none = None | |
def large_python_objects(self): | |
self.large_dict = {} | |
for suit in ['clubs', 'diamonds', 'hearts', 'spades']: | |
self.large_dict[suit] = ['ace'] +\ | |
list(range(2, 10)) +\ | |
['jack', 'queen', 'king'] | |
self.large_int = 2**65 | |
# Large string (may be truncated) | |
self.large_str = requests.get('https://www.usconstitution.net/const.txt').text | |
# Large dictionary with many keys (may be truncated) | |
self.large_dict_many_keys = {str(uuid.uuid4()): time.time() | |
for _ in range(1000000)} | |
# Large dictionary with a large value (may be truncated) | |
self.large_dict_large_val = {'constitution': self.large_str} | |
# Large dictionary (may be truncated) | |
self.large_dict_deep = d = {} | |
for i in range(100): | |
d[i] = d = {} | |
d['bottom!'] = True | |
# Large blob | |
self.large_blob = b'\x00' * (100 * 1024**2) | |
def custom_python_objects(self): | |
# A python object from stdlib (just print repr()) | |
self.custom_datetime = datetime.datetime.utcnow() | |
# A custom Python object | |
self.custom_class = CustomClass() | |
# A custom Python object (just print repr()) | |
self.custom_decimal = Decimal(0.1) | |
def images(self): | |
# A gif file | |
self.img_gif = requests.get('https://www.gif-vif.com/hacker-cat.gif').content | |
# A jpg file | |
self.img_jpg = requests.get('https://www.nasa.gov/centers/goddard/images/content/638831main_globe_east_2048.jpg').content | |
# A png file | |
self.img_png = requests.get('https://datavisdotblog.files.wordpress.com/2019/08/small-multiples.png').content | |
def raise_exception(self): | |
try: | |
raise Exception('This is an exception!') | |
except Exception as x: | |
# Exception object | |
# We could print traceback too: | |
# traceback.format_tb(self.exception.__traceback__) | |
self.exception = x | |
def pandas(self): | |
import pandas | |
d = {'this is column %s' % x: [random.randint(1, 10**i) for _ in range(1000)] | |
for i, x in enumerate(string.ascii_uppercase)} | |
d['nulls'] = [None] * 1000 | |
d['times'] = [datetime.datetime.utcnow()] * 1000 | |
self.dataframe = pandas.DataFrame(d) | |
def numpy(self): | |
import numpy | |
self.np_array = numpy.arange(10000000, dtype='u8') | |
if __name__ == '__main__': | |
DefaultCardFlow() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment