Skip to content

Instantly share code, notes, and snippets.

@inactivist
Forked from diogoffmelo/clean_up_target.py
Created August 25, 2020 12:34
Show Gist options
  • Save inactivist/56e76ac21975e97a80b9b04de725d0a5 to your computer and use it in GitHub Desktop.
Save inactivist/56e76ac21975e97a80b9b04de725d0a5 to your computer and use it in GitHub Desktop.
from collections import defaultdict, OrderedDict
import luigi
from luigi.task import flatten, getpaths
def topological_sorting(struct, outnodes_funct, transform_funct):
struct = flatten(struct.keys()) if isinstance(struct, dict) else flatten(struct)
visited = OrderedDict()
def dvisit(root):
if root in visited.keys():
return
outnodes = flatten(outnodes_funct(root))
for o in outnodes:
dvisit(o)
visited.update({root: transform_funct(root)})
for root in struct:
dvisit(root)
return OrderedDict(reversed(visited.items()))
def to_dag(struct, outnodes_funct):
inv_dag = defaultdict(list)
def inv_visit_function(root):
outnodes = flatten(outnodes_funct(root))
for o in outnodes:
inv_dag[o].append(root)
return outnodes
dag = topological_sorting(struct,
outnodes_funct,
inv_visit_function)
return dag, inv_dag
def clear_task_output(task):
for output in flatten(task.output()):
# This works for LocalTargetOutput
# Add here your per class notion of 'clear'
if output.exists():
output.remove()
def clear_task_dag_output(struct, dag):
def outnodes_funct(root):
return dag[root]
for root in flatten(struct):
topological_sorting(root, outnodes_funct, clear_task_output)
def task_outnodes_funct(task):
return flatten(task.requires())
class DAG(object):
def __init__(self, lasttask):
# lasttask(s) should be the last task to be executed (no task depends on it)
self.struct = lasttask
self._build()
def _build(self):
self.dag, self.inv_dag = to_dag(self.struct, task_outnodes_funct)
def clean_backward(self, tasks):
# Clean (recursively) all dependencies of tasks
return self._clean(tasks, direction='backward')
def clean_forward(self, tasks):
# Clean (recursively) all tasks that depend on those
return self._clean(tasks, direction='forward')
def clean_all(self, tasks):
return self._clean(tasks, direction='all')
def _clean(self, tasks, direction=None):
if direction in ['all', 'backward']:
clear_task_dag_output(tasks, self.dag)
if direction in ['all', 'forward']:
clear_task_dag_output(tasks, self.inv_dag)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment