Apache Airflow Part 2 — Connections, Hooks, reading and writing to Postgres, and XComs

In part 1, we went through have have basic DAGs that read, logged, and write to custom files, and got an overall sense of file location and places in Airflow. A lot of the work was getting Airflow running locally, and then at the end of the post, a quick start in having it do work.

In part 2 here, we’re going to look through and start some read and writes to a database, and show how tasks can run together in a directed, acyclical manner. Even though they’ll both be with a single database, you can think of stretching them out in other situations.

Again, this post will assume you’re going through and writing this code on your own with some copy paste. If you’re on the path of using what I have written, checkout the github repo here.

Creating database

This post is going to go through and write to postgres. We already created a database for Airflow itself to use, but we want to leave that alone.

So before we get to any of the python code, go and create the new database, add a new user with password, and then create the dts (short name for datetimes, since that’s all we’re doing here) table.

bigishdata=> create table dts (id serial primary key, run_time timestamp, execution_time timestamp);
CREATE TABLE
bigishdata=> \dt
            List of relations
 Schema | Name  | Type  |     Owner
--------+-------+-------+----------------
 public | dts   | table | bigishdatauser
(1 rows)

bigishdata=# \d dts
                                          Table "public.dts"
     Column     |            Type             | Collation | Nullable |             Default
----------------+-----------------------------+-----------+----------+---------------------------------
 id             | integer                     |           | not null | nextval('dts_id_seq'::regclass)
 run_time       | timestamp without time zone |           |          |
 execution_time | timestamp without time zone |           |          |
Indexes:
    "dts_pkey" PRIMARY KEY, btree (id)

Adding the Connection

Connections is well named term you’ll see all over in Airflow speak. They’re defined as “[t]he connection information to external systems” which could mean usernames, passwords, ports, etc. for when you want to connect to things like databases, AWS, Google Cloud, various data lakes or warehouses. Anything that requires information to connect to, you’ll be able to put that information in a Connection.

With airflow webserver running, go to the UI, find the Admin dropdown on the top navbar, and click Connections. Like example DAGs, you’ll see many default Connections, which are really great to see what information is needed for those connections, and also to see what connections are available and what platforms you can move data to and from.

Take the values for the database we’re using here — the (local)host, schema (meaning database name), login (username), password, port — and put that into the form shown below. At the top, you’ll see Conn Id, and in that input create a name for the connection. This name is clearly important, and you’ll see that we use that in order to say which Connection we want.

Screen Shot 2020-03-29 at 4.40.22 PM.png

When you save this, you can go to the Airflow database, find the connection table, and you can see the see the values you inputted in that form. You’ll also probably see that your password is there in plain text. For this post, I’m not going to talk about encrypting it, but you’re able to do that, and should, of course.

One more thing to look at is in the source code, the Connection model, form, and view. It’s a flask app! And great to see the source code to get a much better understanding for something like adding information for a connection.

Hooks

In order to use the information in a Connection, we use what is called a Hook. A Hook takes the information in the Connection, and hooks you up with the service that you created the Connection with. Another nicely named term.

Continue reading

Apache Airflow Part 1 — Introduction, setup, and writing data to files

When searching for “Python ETL pipeline frameworks”, you’ll see tons of posts about all of the different solutions and products available, where people throw around terms and small explanations of the them.

When you go through articles, the one you will see over and over is Apache Airflow. It’s defined on wikipedia as a “platform created by community to programmatically author, schedule and monitor workflows”. I’d call Airflow big, well used, and worth it to get started and create solutions because knowledge with a running Airflow environment really does help with tons of data work anywhere on the scale.

It isn’t quick to get started with the necessary understanding, but I’ve found that once getting over the initial hump, knowing Airflow is well worth it for the amount of use cases.

Searching for Airflow tutorials, I found most posts being super basic in order to get started and then being left hanging not knowing where to go after. That’s good for a start, but not far enough with what I want to see written.

They also seem to stray by talking about things, like Operators or Plugins, as if everyone knows about them. When starting out, I’d want to have a process that starts at the basic, and takes the more advanced with a good background first. The goal of this series is to get you over that initial hump.

To combat that, this will be a series that starts basic like the other tutorials, where by the end, we will have gone through all the steps of creating an Apache Airflow project from basics of getting it to run locally, writing to files, to using different hooks, connections, operators, to write to different data storages, and write custom plugins that can then be used to write larger, specific tasks.

These posts will talk through creating and having Airflow set up and running locally, and written as if you’re starting out and going to start on your own and I’m talking about what to do to an audience that will use the examples and write their own code. Ff you’d rather have the code first, not write it yourself, and focus on getting Airflow running go ahead and clone the full repo for the whole series here, and get to the point where you can run the tasks and see the outputs. That’s a big enough part of its own.

Twitter, even though I rarely tweet.

Part 1 Overview

I know I said how so many of the intro posts are intros only, and I’ll admit right away, that this is an intro post as well. If you’ve gone through this, skip this and go to part 2 (to be posted soon, or even skip to part 3 which will be much more about what a larger implementation looks and works like). I did feel it was worth it to write this first part to get everyone on the same page when I go further with the next posts. Starting with further technical work isn’t good practice if not everyone is there to begin with.

Here in part 1, we’re going to talk through getting Airflow set up and running locally, and create a very basic single task — writing dates to a file. Seems like two quick parts, but going through the fuller process and small will be lead to better understanding.

As always, get in contact if you think something I wrote is wrong, I’ll edit and make the fix.

Get Airflow Running

Open up a new terminal session and pwd. You’ll find you’re in the base directory for your user. As with all python projects, we’re going to want an environment in order to have everything packaged up. I’ll use virtualenv. With the following commands, I’ll have that set up, install airflow, and get the airflow config set.

jds:~ jackschultz$ pwd
/Users/jackschultz
jds:~ jackschultz$ mkdir venvs
jds:~ jackschultz$ virtualenv -p python3 venvs/bidaf # Stands for Bigish Data Airflow. In some of the screenshots it's a different value. Go and ignore that.
.....
jds:~ jackschultz$ source venvs/bidaf/bin/activate
(pmaf) jds:airflow jackschultz$ pip install 'airflow[postgres]' # needed for the Airflow db
.....
(pmaf) jds:airflow jackschultz$ mkdir ~/airflow && cd ~/airflow
(pmaf) jds:airflow jackschultz$ airflow version
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
**whatever the current version is**
(pmaf) jds:airflow jackschultz$ ls

The reason we’re in ~/airflow  is because that’s the default AIRFLOW_HOME env variable value. If you don’t want to be in the base directory, you can export AIRFLOW_HOME=~/dev/bigishdata/airflow and then use that as the directory. Do that if you’d like, but make sure that export AIRFLOW_HOME line is in ~/.bash_profile or ~/.bashrc or ~/.zshrc or whatever ​ ~/.*rc file you use for your terminal because we’re going to be using a bunch of tabs and want to make sure they all have the same AIRFLOW_HOME. If you’re not going to use ~/airflow as the home, you’re going to have problems unless you’re always exporting this env var.

I’m kind of making a big deal about AIRFLOW_HOME , but that’s because it caused me some problems when I started. For example, some of the screenshots will show different directories. This is because I played around with this for a while before settling on a final set up.

Airflow needs a database where it will store all the information about the tasks — when they were run, the statuses, the amount and a ton of other information you’re going to see — and it defaults to sqlite. That’s quick and easy to get going, but I’d say go right to postgres. In order to change that default, we need to go to the config file that the airflow version command created.

First though, create a database, a table (I call airflow), a user (airflowuser), and password for that user (airflowpassword). Search for examples of  how to create databases and users elsewhere.

Above, when you called airflow version, a config file was created –  ~AIRFLOW_HOME/airflow.cfg. With the database created, take that url and replace the default sql_alchemy_conn variable:

sql_alchemy_conn = postgresql+psycopg2://airflowuser:airflowpassword@localhost:5432/airflow

Back to the command line, and run:

(bidaf) jds:airflow jackschultz$ airflow initdb

And then back to the postgres console command line, describe the tables and see the following:

Screen Shot 2020-03-29 at 2.42.49 PM

With this, you can see some of the complexity in Airflow. Seeing this shows Airflow is set up. If you’re going through this series, you probably won’t understand the tables yet; by the end of the series you’ll know a lot about the relations.

Go again back to the command line and run:

(pmaf) jds:airflow jackschultz$ airflow webserver --port 8080

and see:

airflow-starting.png

Then go to localhost:8080 and the admin screen, which is the highly touted UI. Like the table names, the UI will look more than a little complex at the start, but very understandable with experience.

airflow-admin-screenshot.png

Simple DAG — Directed Acyclic Graph

In terms of terminology, you’ll see the abbreviation DAG all the time. A DAG a way to explain which tasks are run and in which order. The aforementioned task refers to what will actually be run.

Looking at the admin UI, you can see the example DAGs that come with Airflow to get started. When writing DAGs, you’ll probably go through many of those to see how they’re set up and what’s required to have them run. Don’t feel bad about that; these DAG examples are fantastic to use.

Below is the full file we’ll have running. Look through it a little, as you’ll probably understand some of what’s going on. When you get to the bottom, keep reading and I’ll go through what it’s like when writing this.

# AIRFLOW_HOME/dags/write_to_file.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt

filename = 'self_logs/dt.txt'

def write_to_file(**kwargs):
    kwarg_filename = kwargs['filename']
    execution_time = kwargs['ts']
    dtn = dt.datetime.now()
    with open(kwarg_filename, 'a') as f:  # 'a' means to append
        f.write(f'{dtn}, {execution_time}\n')

default_args = {
    'owner': 'airflow',
    'retries': 0
}
 
dag = DAG('writing_to_file',
          default_args=default_args,
          'start_date': dt.datetime.now(), 
          schedule_interval=dt.timedelta(seconds=10)
          )

write_to_file_operator = PythonOperator(task_id='write_to_file',
                                        python_callable=write_to_file,
                                        provide_context=True,
                                        op_kwargs={'filename': filename}, dag=dag)
 
write_to_file_operator

Start at the bottom and see the write_to_file_operator variable and how it’s an instance of PythonOperator.

An Operator is a class that “determines what actually gets done by a task”. A PythonOperator, when run, will run the python code that comes from the python_callable function. A BashOperator will run a bash command. There are tons of Operators that are open source that perform multiple tasks. You’ll see a few examples of these in the series, and also by the end will have written your own. For now, just go with the definition above about how Operators have the code for what the task does.

One thing about Operators you’ll seen in examples is how most of them take keyword arguments to talk about what to do. I’m not really a fan of that because it makes it seem like Airflow is only based on configs, which is one of the things I want to avoid with these tasks. I want to have the code written and not fully rely on cryptic Operators.

For now though, in the PythonOperator kwargs, you’ll see some things. First check the  python_callable, which is the function the Operator will call. Here it’s write_to_file that’s written above. Next, check the provide_context which we set to True. This flag says to give the callables information about the execution of the DAG. You also see  op_kwargs which will be passed to the python_callable. With this, we’re telling the function where to write the date.

As for the task_id, it is the name that will show up in the tree / graph view in the webserver. It can be whatever name you want, and there’s some consideration with making sure versions of that are correct, but for now, I’m keeping that name the same as the python_callable.

Going up to the callable itself, you’ll see first the filename that we’re going to write to. That’s from the op_kwargs from the PythonOperator instantiation.  You then see two timestamps. First is the execution time, which is the time that airflow scheduler starts that task. When running this DAG and looking at the values, you’ll see that time has certain number of microseconds, but always 10 seconds apart. The second timestamp, which is when the code is run, will be a varying number of seconds after the start of the execution. This is because of the work to get the code running. Keep this in mind when using timestamps in operators in the future. The rest of the function writes the two timestamps to that file.

Run First DAG

With the DAG file created, we want to run it and see what’s going on with the output.

First step is to open a new tab in the terminal, activate the venv, make sure you have the correct value for AIRFLOW_HOME, and run

(pmaf) jds:airflow jackschultz$ airflow scheduler

Go back to the browser and the admin page, and you’ll see the writing_to_file name in the DAG column, which means the webserver found that new DAG file with the name writing_to_file which we gave.

Click on the link for ​writing_to_file, which should take you to http://localhost:8080/admin/airflow/tree?dag_id=writing_to_file, and you should see this.

Screen Shot 2020-03-29 at 3.26.43 PM.png

This is the Tree View, and you can see the one operator is write_to_file which is the task_id we gave the PythonOperator.

Go upper left and click the ‘Off’ button to ‘On’ to get the task running. To watch this, go to the terminal and watch the scheduler start to throw out logs from the scheduling every 10 seconds. Go to browser and reload the tree view page and you’ll see red marks because of failure.

Screen Shot 2020-03-29 at 3.30.10 PM.png

The DAG is running, but why is it failing?

Debugging with logs

We can get to testing in the future, but for now, we’re going to debug using the logs.

In order to see what the issue is, go to logs/writing_to_file/write_to_file in the Finder and see new folders be created every 10 seconds, one for each task. Go ahead and view the log and you’ll see that there’s an error being sent.

Screen Shot 2020-03-29 at 3.43.42 PM.png

Turns out that in line 10, we’re trying to write to a file that doesn’t exist because we haven’t created self_logs/ directory. Either go to another terminal and mkdir self_logs/. With the scheduler still running, view back to the log directory and watch for new logs for newly executed tasks.

Screen Shot 2020-03-29 at 3.46.44 PM.png

Much better and correct looking log where we can see it going through.

Finally, go to self_logs/dt.txt and watch the datetimes come through. (And you can see I was writing this).

Screen Shot 2020-03-29 at 3.45.33 PM.png

One last step, and this is in terms of logging. When running code, many times you’ll want to print log statements, and in Airflow, printed values go to those log files. To show this, go back to the python_callable and add the following print line just before the file write:

print('Times to be written to file:', dtn, execution_time)

Save the file, and go back to watch the logs come in. What you’ll see is this line being added:

[2020-03-29 15:46:59,416] {logging_mixin.py:112} INFO - Times to be written to file: 2020-03-29 15:46:59.415603 2020-03-29T20:46:45.041274+00:00

This shows two things. First is that you can print and log to find errors in initial local development, and second, shows that code updates will be run on each execution of the task. The scheduler picked up the added line. In some web frameworks for example, if you change the code, you might have to restart your local server to have the changes be included. Here, we don’t have to, and those values will come in the logs.

Summary

If you got this far, you’re set up with a first DAG that writes to a file .We showed the steps to get airflow running locally, and then up and going with a basic self written task and seeing the activity.

That doesn’t sound like a lot, but with how big Airflow is, going from nothing to an initial set up, now matter how small, is a big part of the battle.

In Part 2 of this series, we’re going to take these tasks, hook them up to a different database, write the datetimes there, and have another task in the DAG format the time that was written. With that, you’ll be much more comfortable with being able to connect to services anywhere.