In part 1, we went through have have basic DAGs that read, logged, and write to custom files, and got an overall sense of file location and places in Airflow. A lot of the work was getting Airflow running locally, and then at the end of the post, a quick start in having it do work.
In part 2 here, we’re going to look through and start some read and writes to a database, and show how tasks can run together in a directed, acyclical manner. Even though they’ll both be with a single database, you can think of stretching them out in other situations.
Again, this post will assume you’re going through and writing this code on your own with some copy paste. If you’re on the path of using what I have written, checkout the github repo here.
This post is going to go through and write to postgres. We already created a database for Airflow itself to use, but we want to leave that alone.
So before we get to any of the python code, go and create the new database, add a new user with password, and then create the
dts (short name for datetimes, since that’s all we’re doing here) table.
bigishdata=> create table dts (id serial primary key, run_time timestamp, execution_time timestamp); CREATE TABLE bigishdata=> \dt List of relations Schema | Name | Type | Owner --------+-------+-------+---------------- public | dts | table | bigishdatauser (1 rows) bigishdata=# \d dts Table "public.dts" Column | Type | Collation | Nullable | Default ----------------+-----------------------------+-----------+----------+--------------------------------- id | integer | | not null | nextval('dts_id_seq'::regclass) run_time | timestamp without time zone | | | execution_time | timestamp without time zone | | | Indexes: "dts_pkey" PRIMARY KEY, btree (id)
Adding the Connection
Connections is well named term you’ll see all over in Airflow speak. They’re defined as “[t]he connection information to external systems” which could mean usernames, passwords, ports, etc. for when you want to connect to things like databases, AWS, Google Cloud, various data lakes or warehouses. Anything that requires information to connect to, you’ll be able to put that information in a Connection.
airflow webserver running, go to the UI, find the Admin dropdown on the top navbar, and click Connections. Like example DAGs, you’ll see many default Connections, which are really great to see what information is needed for those connections, and also to see what connections are available and what platforms you can move data to and from.
Take the values for the database we’re using here — the (local)host, schema (meaning database name), login (username), password, port — and put that into the form shown below. At the top, you’ll see Conn Id, and in that input create a name for the connection. This name is clearly important, and you’ll see that we use that in order to say which Connection we want.
When you save this, you can go to the Airflow database, find the
connection table, and you can see the see the values you inputted in that form. You’ll also probably see that your password is there in plain text. For this post, I’m not going to talk about encrypting it, but you’re able to do that, and should, of course.
One more thing to look at is in the source code, the Connection model, form, and view. It’s a flask app! And great to see the source code to get a much better understanding for something like adding information for a connection.
In order to use the information in a Connection, we use what is called a Hook. A Hook takes the information in the Connection, and hooks you up with the service that you created the Connection with. Another nicely named term.