Apache Airflow: The ETL — #02
In the previous post, we’ve learned some core concepts regarding the Apache Airflow, like the DAG and the Operators. In this post, we’ll work on a simple ETL process, which will extract the most recent news from the NewsAPI and insert them into an SQLite database.
ETL stands for Extract Transform Load, a process in which some data is extracted from a source, transformed according the user needs and finally loaded into a storing place. It’s widely used by companies to collect data from the most varied sources and combine the corrected information in one place, like a data warehouse for further use. It can scale to complex algorithms that should deal with a vast amount of data, but, for now, we will construct a simple ETL process so you can have a big picture about how everything connects.
First, create a folder to accommodate our script, start a Python virtual environment and activate it. I use the virtualenv, but you can choose anyone you’re used to:
Once activated, let’s install some packages that we’ll use in our development:
Once the environment is up and running, let’s get a News’ API key. Just go to https://newsapi.org/ and request a new key, which is free for development purposes. For our application, we will use the current top headlines for a country, Brazil in our case, with the following endpoint:
As the API and the Airflow are ready, let’s get to the code. We’ll create a Python script, the news_etl.py, that will contain the ETL code. The news_etl file has three main purposes: call the API and get the news, comprising the EXTRACT step, apply some transformation in the received data, which corresponds to the TRANSFORM step, and finally load the cleansed data into a database, in the LOAD step.
First, we will use the Python requests library to make the HTTP request to the NewsAPI and load the result into a JSON format, using the parameters COUNTRY and API_KEY as a Python constant:
To store an article information, we will use some auxiliar lists:
And, finally, we will check if our request had a successful response. In a positive case, we will store each article information in our lists, like this:
In case of a request error occurs, we will only print an error message (which demands proper error handling, but let’s just keep the main focus right now).
Once the article data is inside each list, we will create a Python dictionary to prepare our data:
With this dictionary, we can use Pandas to create a News dataframe:
Observe that the dataframe columns correspond to each available information about the articles.
As we already have our raw data, it is time to transform it, the second step in the ETL process. In this phase, a transform function is called and there are too many things that could be checked, like if some information is NULL or corrupted. A draft of this function is shown below:
Observe that there is no transformation code implemented. Well, that is up to you now! My suggestion for you is to check if there is some NULL piece of data in the dataframe, fill in the blanck spaces with a standard infomation or remove it and return the corrected dataframe.
Finally, once the news dataframe is correct, it is time to load it into some database. We will use a combination of the SQLite and the SQLalchemy for this operation. First of all, we should create an SQLite connection to our database file and an SQLalchemy engine to work with the database from Pandas:
As we are using a new database, we should first create a table to receive our data. For this process, we will use a SQL query to create a new table if it does not exists, with the following code:
With our query ready, it is time to load it into SQLite. We will use a try/except statement for this, where the first step is to execute the query for the table creation:
Once executed, we will use a Pandas method to load our dataframe inside our table. Yes, Pandas allows you to directly input a dataframe to a SQL database! This operation is performed using the following method:
Where the first parameter is the table name, the second is the database SQLalchemy engine, the index disables the use of the dataframe index as a column and the last one appends the data to the database table.
Finally, we can now commit our changes and close the connection:
Now you can open the SQLite file using the SQLite browser or Debeaver to check whether the data is properly inserted in our table:
That’s it, you’ve just created your first ETL!
Of course, it was just a didactic example. In real-world problems, there will be a massive amount of data from multiple sources and a lot of different situations to handle. But this simple ETL aimed to demonstrate the whole process of getting data from a source, transforming it (you applied the suggested transformations, right?), and loading it to a target, which comprises a big picture of one of the Data Engineering activities.
That’s it for today. In the next post, we will see how to automate it using Airflow, constructing a DAG, and scheduling it to get the most recent news every day. See you there!