FLYR for Hospitality Now Available on Oracle Cloud Marketplace Arrow

Resource Hub / Tech Blog / The Benefits of Dynamic Pipeline Generation Airflow

The Benefits of Dynamic Pipeline Generation Airflow

As part of our responsibilities as data engineers on a team focused on delivering key customer metrics, the FLYR Cloud team challenges ourselves to build flexible ways of managing our pipelines.

Google Cloud is our platform of choice, so raw data is loaded into BigQuery before being transformed into our company’s internal unified schema. Our transformation logic is implemented as SQL and executed on top of Google BigQuery. Whenever we generate new data, downstream systems are notified via PubSub message.

Our pipelines are orchestrated by Apache Airflow, and – in the case of pipelines managed by our team – Directed Acyclic Graphs (DAGs) themselves do not contain any logic except to organize the steps of execution. This means that our meat and potatoes are in SQL transformations.

Balancing Skill Sets

Our team is not homogeneous in terms of skills. In addition to the data engineers, we also have data analysts. The commonality among these team members is strong SQL skills.

Based on previous experiences, such a team structure can lead to difficulties in sprint planning and delivering predictive increments when one skill set is overloaded and another is internally blocked.

The obvious line of prevention is to leverage our common SQL strengths and allow all team members to focus on writing SQL, rather than having to set up pipelines that are similar in structure over and over again.

However, we encountered a problem when tackling Python code writing, which can be very repetitive when adding or extending pipelines. This is even more true for entry-level programmers, especially for ones with no Python background.

Airflow Directed Acyclic Graph (DAG) actually is Python code

If you study Airflow DAG beyond a basic tutorial, you will see that in Airflow, it doesn’t matter how DAG is constructed as long as the result is a DAG and the object of the DAG class.

For Airflow,

Airflow codeblock


Airflow codeblock

are identical from the DAG structure perspective.

The biggest takeaway so far has been that Airflow DAG (*.py files) doesn’t have to (and should not) be used as a declarative pipeline definition. You can, and should, use all the boon and glory of Python when working with Airflow, keeping in mind the guiding principles outlined in PEP 20. Because of Python’s flexibility, we can actually make our DAGs dynamic and easily control their structure or behavior in runtime or configuration. This approach, which could be considered metaprogramming, has many benefits, such as a smaller code footprint, reduced development time, and greater flexibility to handle new situations with no base code modification. This also creates the possibility of minimizing risk.

Generating a Pipeline

According to Murphy’s Law, “Anything that can go wrong will go wrong.”

Our main motivation for generating pipelines is to reduce the risk of mistakes, which tends to happen proportionally to a team’s size and rapidly increases with time constraints and tight delivery timelines.

Since SQL is our team’s common language, we decided to define the pipeline structure with the file structure of SQL scripts.

File structure like below

SQL codeblock

should be interpreted as

SQL codeblock

Airflow generated graph

So we expect that each *.sql file is represented as a task, and each directory organizes tasks into sequences (based on alphabetical order).

Now let’s jump into some code. The first thing to do is to represent file structure in the Python structure.

Python codeblock

The returned list contains a list of QueryPipeline or SequencePipeline, which represents a single QUERY or multi-stage query sequence. We’ll explain more about these in the following section.

Now we need to build DAG from a structure representing file structure, which is actually DAG structure.

Python codeblock

The section above is where the first part of the magic happens – we are building pipeline definition while executing *.py code.

Let’s take a look at the Pipeline class first. This is the base class for QueryPipeline and SequencePipeline.

It also introduces something new into the pipeline: each pipeline segment (first-level file or directory), after being executed, should publish a notification to the common notification bus (PubSub). For the sake of simplicity, it is represented by DummyOperator and could be considered as a POST-transformation task(s).

Airflow codeblock

A few assumptions and simplifications in QueryPipeline:

*.sql file is actually a template with well-known placeholders (BQ dataset, project ). Its name points into the output table. Thanks to that, the generation of such segments of a pipeline is as simple as

Airflow codeblock

A quick explanation about the code above:

__query_string method is responsible for filling out all placeholders inside the query. For the sake of simplicity, we are only using two variables.

build_pipeline method is responsible for building a whole pipeline (stream in DAG) that delivers data. It is also worth mentioning that, in our case, delivery always requires at least two steps (tasks) – a transformation task and a “post_transformation” task, which is simplified by sending the delivery notification over PubSub )
get_transform method is responsible for building and configuring the transformation task, which is always BigQuery SQL code.

SequencePipeline is no more than a series of QueryPipelines transformations.

Airflow codeblock

The extra logic in SequencePipeline is because we are building a pipeline containing get_transform tasks from QueryPipelines built from individual, alphabetically-ordered SQL files in a folder, plus one post_transformation task.

Setting Up Sensors

Once we are set to generate a pipeline, we need to embed it into a global data pipeline by setting dependencies. That will assure that the pipeline is started when all dependent data sources are fresh.

We don’t want any other code or configuration except SQL to be written for the purpose of adding or modifying DAG. That way, we avoid the mistake of setting or not setting all required dependencies. We achieve this by parsing our SQL code in search of references to tables in specific projects and datasets (this is enough for our purpose).

The simplified naming convention is that table {customer_project_id}.{dataset}.{table_name} is delivered by DAG {customer_code}.{dataset}.{table_name}, and both customer_code and customer_project_id are part of the deployment configuration. Any reference to table project.dataset.table in SQL code should result in setting up a sensor for same-day execution to customer.dataset.table DAG.

Automatization is implemented by simple SQL Query Template parser in QueryPipeline class

Airflow codeblock

and in SequencePipeline

Airflow codeblock

Method get_dependencies allows us to identify all source tables’ dependencies and, by the rule of thumb described above, the DAGs that deliver them.

Now setting sensors is easy!

For each metric and each dependency, create ExternalTaskSensor and associate it with the corresponding dependent metric task group.

Airflow codeblock


Airflow allows us to leverage Python, allowing *.py to be more than a static and declarative DAG definition. The example above shows that you can build or extend the airflow pipeline without writing a single line of Python code. That brings us a few steps closer to “Ship Day One” and also allows for more skillset diversity within the team.

However, to quote Isaac Asimov, “it’s a poor atom blaster that won’t point both ways.” In other words, there are bad consequences as well as good ones. With the high flexibility of metaprogramming, it is possible to introduce code that is hard to debug and hard to read, and that behaves contrary to expectation. When this technique is used poorly, it can cause severe issues. A naming convention and a well-structured and defined DAG topology are a couple of convenient and easy ways to help limit problems.

Similar stories

This post delves into how BCQ can be leveraged to generate optimal bid prices in dynamic environments, ensuring stability, efficiency, and enabling real-time decision-making in the travel industry.
Open source, both as an idea and as software, is at the heart of what we do at FLYR, including in our marketing technology engineering department.
FLYR’s engineering team estimates new pricing strategy outcomes before production to ensure the most successful models are deployed.