Apache Airflow Part 2 — Connections, Hooks, reading and writing to Postgres, and XComs

In part 1, we went through have have basic DAGs that read, logged, and write to custom files, and got an overall sense of file location and places in Airflow. A lot of the work was getting Airflow running locally, and then at the end of the post, a quick start in having it do work.

In part 2 here, we’re going to look through and start some read and writes to a database, and show how tasks can run together in a directed, acyclical manner. Even though they’ll both be with a single database, you can think of stretching them out in other situations.

Again, this post will assume you’re going through and writing this code on your own with some copy paste. If you’re on the path of using what I have written, checkout the github repo here.

Creating database

This post is going to go through and write to postgres. We already created a database for Airflow itself to use, but we want to leave that alone.

So before we get to any of the python code, go and create the new database, add a new user with password, and then create the dts (short name for datetimes, since that’s all we’re doing here) table.

bigishdata=> create table dts (id serial primary key, run_time timestamp, execution_time timestamp);
CREATE TABLE
bigishdata=> \dt
            List of relations
 Schema | Name  | Type  |     Owner
--------+-------+-------+----------------
 public | dts   | table | bigishdatauser
(1 rows)

bigishdata=# \d dts
                                          Table "public.dts"
     Column     |            Type             | Collation | Nullable |             Default
----------------+-----------------------------+-----------+----------+---------------------------------
 id             | integer                     |           | not null | nextval('dts_id_seq'::regclass)
 run_time       | timestamp without time zone |           |          |
 execution_time | timestamp without time zone |           |          |
Indexes:
    "dts_pkey" PRIMARY KEY, btree (id)

Adding the Connection

Connections is well named term you’ll see all over in Airflow speak. They’re defined as “[t]he connection information to external systems” which could mean usernames, passwords, ports, etc. for when you want to connect to things like databases, AWS, Google Cloud, various data lakes or warehouses. Anything that requires information to connect to, you’ll be able to put that information in a Connection.

With airflow webserver running, go to the UI, find the Admin dropdown on the top navbar, and click Connections. Like example DAGs, you’ll see many default Connections, which are really great to see what information is needed for those connections, and also to see what connections are available and what platforms you can move data to and from.

Take the values for the database we’re using here — the (local)host, schema (meaning database name), login (username), password, port — and put that into the form shown below. At the top, you’ll see Conn Id, and in that input create a name for the connection. This name is clearly important, and you’ll see that we use that in order to say which Connection we want.

Screen Shot 2020-03-29 at 4.40.22 PM.png

When you save this, you can go to the Airflow database, find the connection table, and you can see the see the values you inputted in that form. You’ll also probably see that your password is there in plain text. For this post, I’m not going to talk about encrypting it, but you’re able to do that, and should, of course.

One more thing to look at is in the source code, the Connection model, form, and view. It’s a flask app! And great to see the source code to get a much better understanding for something like adding information for a connection.

Hooks

In order to use the information in a Connection, we use what is called a Hook. A Hook takes the information in the Connection, and hooks you up with the service that you created the Connection with. Another nicely named term.

Continue reading

Predicting PGA Tour Scoring Average from Statistics Using Linear Regression

First off, I admit, that’s probably the most boring title for a blog post ever. It gets a negative value on the clickbait scale that is generally unseen in the modern, “every click equals dollars” era that we live in. On the other hand, it tells you exactly what this article is about — predicting scoring average using stats.

In this article, I’ll go through getting the data from the database, cleaning that data for use, and then running a linear regression in order to generate coefficients for each of the stats to generate scoring average predictions. Oh, and some analysis and commentary at the end!

Shameless shoutout to my other blog, Golf on the Mind. Check it out and subscribe to the newsletter / twitter / instagram if you’re into golf at all. Or ignore, and keep reading for some code!

Here's a pic of a golf course to get you in the mood.

Here’s a pic of a golf course to get you in the mood.

Getting the data

Last time if you remember, I spent all this effort taking the csv stat files, and putting the information into a database. Start there if you haven’t read that post yet. It’ll show how I grabbed the stats and formatted them.

Now that you’re back in the present we need to create a query that gets the stats for the players for a specific year. An example row in a CSV file of the data would be something like:

player_id, player_name, stat_1_value, stat_2_value, … , stat_n_value

for stats 1 to n where n (the number of stats), and the which stats themselves (driving distance, greens in regulation, etc.) vary depending on inputs.

Now let me say, I am not an expert in writing sql queries. And since people on the internet loooove to dole out hate in comments sections, I’m just going to say that there’s probably a better way of writing this query. Feel free to let me know and I can throw an edit in here, but this query works just fine.

select players.id,
  players.name,
  max(case when stat_lines.stat_id=330 then stat_lines.raw else null end) as putting_average,
  max(case when stat_lines.stat_id=157 then stat_lines.raw else null end) as driving_distance,
  max(case when stat_lines.stat_id=250 then stat_lines.raw else null end) as gir,
  max(case when stat_lines.stat_id=156 then stat_lines.raw else null end) as driving_accuracy,
  max(case when stat_lines.stat_id=382 then stat_lines.raw else null end) as scoring_average
from players
  join stat_lines on stat_lines.player_id = players.id
  join stats on stat_lines.stat_id=stats.id
where stat_lines.year=2012 and (stats.id=157 or stats.id=330 or stats.id=382 or stats.id=250 or stats.id=156) and stat_lines.raw is not null
group by players.name,players.id;

High level overview time! We’re selecting player id, and player name, along with their stats for putting average, driving distance, greens in regulation, driving accuracy and scoring average for the year 2012. In order to get the right stats, we need to know the stat id for the stats.

One more thing. This query is funky, and I probably could have designed the schema differently to make this prettier. For example, I could have just gone with one table, stat_lines, with fields for player_name and stat_name (along with all the current fields) and then the sql would be very simple. But there are other applications to keep in mind. What if you wanted to display all stats by a player? Or all of a players stats for a certain year? With the way I have the schema set up, those queries are simple and logical. For this specific case, I’ll deal with the complexity.

Loading the Data

That query above is great, but it’s not going to cut it if I have to specify what the year, and the stat ids in that string every time I run the script. Gotta be dynamic here.

Continue reading