Skip to content

Instantly share code, notes, and snippets.

@Bl3f
Last active October 19, 2018 13:41
Show Gist options
  • Save Bl3f/acd3d4b251eb565c96168635d84d0513 to your computer and use it in GitHub Desktop.
Save Bl3f/acd3d4b251eb565c96168635d84d0513 to your computer and use it in GitHub Desktop.
Airflow unit testing DAG (topological order)
import pytest
from airflow.hooks.base_hook import BaseHook
from airflow.models import Connection
def create_connection(conn_id, schema=None, *args, **kwargs):
if schema is None:
schema = conn_id
return Connection(conn_id=conn_id, schema=schema, *args, **kwargs)
@pytest.fixture()
def gcp_connection(mocker):
get_connection = mocker.patch.object(BaseHook, 'get_connection')
get_connection.return_value = create_connection('gcp')
return get_connection
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
dag = DAG('my_dag')
sensor_1 = ExternalTaskSensor(
external_dag_id='some_other_dag_1',
external_task_id='check_task',
task_id='sensor_1',
dag=dag,
)
bq_processing = BigQueryOperator(
task_id='bq_processing',
sql='SELECT * FROM my_table',
bigquery_conn_id='gcp',
dag=dag,
)
bq_processing.set_upstream(sensor_1)
import unittest
import pytest
from airflow.operators.sensors import ExternalTaskSensor
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from my_dag import dag
@pytest.mark.usefixtures("gcp_connection")
class TestMyDag(unittest.TestCase):
def test_dag_tasks_order(self):
topo_order = dag.topological_sort()
expected_order = [
('sensor_1', ExternalTaskSensor),
('bq_processing', BigQueryOperator),
]
for index, (name, klass) in enumerate(expected_order):
task = topo_order[index]
assert isinstance(task, klass)
assert task.task_id == name
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment