Airflow: running your DAG — #03

Adenilson Castro
5 min readNov 21, 2021

--

In this post, we’ll finish this tiny Airflow series. We’ve started this path learning some concepts regarding the Airflow, then we’ve been through the construction a simple ETL and now we’ll automate the ETL process using the Airflow itself.

First of all, you should install the Airflow in your machine. I highly recommend you to use Docker for this process, but you can also install it locally. The documentation have a clear tutorial on how to install the Airflow using the Docker Compose tool. Additionally to the volumes created in this tutorial, I’ve crated a local folder to keep the SQLite database in my machine so I can access the DB using the Dbeaver.

To automate the ETL execution, we should create an Airflow DAG. It will be responsible for dealing with the schedule in which our script will be executed, call the tasks in the order we define, among other things. First, let’s define some important constants that will help our development:

  • DAG_DESC = “A short description of our DAG”
  • DAG_ID = “the DAG unique id”
  • OWNER = “the owner of the DAG”
  • START_DATE = the date in which our DAG will be executed for the first time
  • SCHEDULE = a CRON expression that defines the frequency of our DAG

For our case, it would be something like this:

  • DAG_DESC = “DAG to extract news from the NewsAPI and insert them into an SQLite database”
  • DAG_ID = “news_api”
  • OWNER = “Adenilson Castro”
  • START_DATE = datetime(2021, 11, 21)
  • SCHEDULE = “0 17 * * *” (which will run everyday at 17; check this site for more details)

Once these constants are defined, we can create the default_args of our DAG, that will hold some important definitions about how the DAG should be executed:

The ‘depends_on_past’ defines whether our DAG depends on its previous executions or not; the ‘retries’ specify the number of times that the Airflow should try again, in case of a DAG failure and work together with the ‘retry_delay’ parameter, that defines the time interval between these tries; the ‘priotity_weight’ defines the priority in the executor queue. These are not all the parameters that the Airflow acepts, but, for our case, they can handle this simple ETL. For the case of a more complex extraction, some parameters like the e-mail in case of failure and the callback calls are very helpfull. A list of all the parameters is available here.

Once our DAG is parameterized, we should create now the functions that will actualy call our previously created Python script that will extract the data and insert them into the database. So our first function is just a dummy code, just to demonstrate that our DAG has actually started:

The second function will import our Python module with the ETL code and call its main function, which will start the ETL process:

Once created, we should create our DAG object and create the tasks that will be executed. In our case, we will use the PythonOperator, as we are executing some Python code:

We first use the previously created constants and then define the task, which is a PythonOperator object that receives a unique id, a Python callable code, that in our case are our start and extract functions and a third parameter, the provide_context, that passes a few more arguments, mainly used by Jinja templating.

As the DAG object is already created, we should now define the order in which our tasks will be executed. To do so, in the end of our file, we write some kind of sequential statements, like this:

start_task >> etl_task

This will determine that the Airflow should first execute the “start_task” then the “etl_task”, exactly in this order, as we can see by the “>>” symbols. If we had more tasks, it would look like “task1 >> task2 >> task3 >> task n” or we could create some logically group of tasks that should be executed first, like “[task1 >> task2] >> task3”, which would execute first the task1 and task2, being the task3 only started after both of the previous tasks had concluded. Anyway, Airflow allows you to create any logic you wants for this.

Thas it! Once concluded, you should put the DAG Python file inside the ‘dags’ folder that was created where you started your Docker container and the Airflow scheduler will, in a few seconds, make it available in your Airflow instance. To visualize it, access your http://localhost:8080/, the initial page of the Airflow and search for your DAG, that will look something like this:

Once you find your DAG, it will provide you many details about the execution, including the description we’ve added in our DAG defintion, a tree view that contains the status of the execution of each task, a graph view that shows the DAG execution flow more explicity, highlighting the tasks dependencies, your DAG code, and more importantly, a log of your execution, where you can check if an error occurred or if everything is doing fine. It is important to note that your DAG will execute accordingly the schedule you’ve defined. If you want to force its execution you can press the “Trigger DAG” button.

Now that we have our DAG up and running, you can open the database using the Dbeaver or any other program you may like to visualize the data, that will look something like this (In Portuguese, because I’m in Brazil):

And that’s it! You now have a simple DAG running in your local Airflow instance. We’ve been through many technologies and this tutorial is far from providing you technical skills of how to deploy an Airflow instance in production using an AWS cloud infrastructure, for example. However, you could see here the initial concepts involved in the process (Airflow itself, python, docker, mysql…) and it is a starting point for your personal development. I really hope you have learned something!

If you have any doubt, the code I used is available in my github page and you can contact me any time. See ya!

--

--

Adenilson Castro
Adenilson Castro

Written by Adenilson Castro

Data Science Engineer @ Mercado Livre

No responses yet