Data Modeling with Postgres
Overview
In this project, I will be applying Data Modeling with Postgres and build an ETL pipeline using Python.
Use Case: A startup wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. Currently, they are collecting data in json format and the analytics team is particularly interested in understanding what songs users are listening to.
Song Dataset
Songs dataset is a subset of [Million Song Dataset](http://millionsongdataset.com/).
Sample Record :
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
Log Dataset
Logs dataset is generated by [Event Simulator](https://github.com/Interana/eventsim).
Sample Record :
{"artist": null, "auth": "Logged In", "firstName": "Walter", "gender": "M", "itemInSession": 0, "lastName": "Frye", "length": null, "level": "free", "location": "San Francisco-Oakland-Hayward, CA", "method": "GET","page": "Home", "registration": 1540919166796.0, "sessionId": 38, "song": null, "status": 200, "ts": 1541105830796, "userAgent": "\"Mozilla\/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/36.0.1985.143 Safari\/537.36\"", "userId": "39"}
Schema
Fact Table
songplays - records in log data associated with song plays i.e. records with page NextSong
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
Dimension Tables
users - users in the app
user_id, first_name, last_name, gender, level
songs - songs in music database
song_id, title, artist_id, year, duration
artists - artists in music database
artist_id, name, location, latitude, longitude
time - timestamps of records in songplays broken down into specific units
start_time, hour, day, week, month, year, weekday
Project Files
sql_queries.py -> contains sql queries for dropping and creating fact and dimension tables. Also, contains insertion query template.
# DROP TABLESsongplay_table_drop = "DROP TABLE IF EXISTS songplays"user_table_drop = "DROP TABLE IF EXISTS users"song_table_drop = "DROP TABLE IF EXISTS songs"artist_table_drop = "DROP TABLE IF EXISTS artists"time_table_drop = "DROP TABLE IF EXISTS time"# CREATE TABLESsongplay_table_create = ("""CREATE TABLE IF NOT EXISTS songplays(songplay_id SERIAL CONSTRAINT songplay_pk PRIMARY KEY,start_time TIMESTAMP REFERENCES time (start_time),user_id INT REFERENCES users (user_id),level VARCHAR NOT NULL,song_id VARCHAR REFERENCES songs (song_id),artist_id VARCHAR REFERENCES artists (artist_id),session_id INT NOT NULL,location VARCHAR,user_agent TEXT)""")user_table_create = ("""CREATE TABLE IF NOT EXISTS users(user_id INT CONSTRAINT users_pk PRIMARY KEY,first_name VARCHAR,last_name VARCHAR,gender CHAR(1),level VARCHAR NOT NULL)""")song_table_create = ("""CREATE TABLE IF NOT EXISTS songs(song_id VARCHAR CONSTRAINT songs_pk PRIMARY KEY,title VARCHAR,artist_id VARCHAR REFERENCES artists (artist_id),year INT CHECK (year >= 0),duration FLOAT)""")artist_table_create = ("""CREATE TABLE IF NOT EXISTS artists(artist_id VARCHAR CONSTRAINT artist_pk PRIMARY KEY,name VARCHAR,location VARCHAR,latitude DECIMAL(9,6),longitude DECIMAL(9,6))""")time_table_create = ("""CREATE TABLE IF NOT EXISTS time(start_time TIMESTAMP CONSTRAINT time_pk PRIMARY KEY,hour INT NOT NULL CHECK (hour >= 0),day INT NOT NULL CHECK (day >= 0),week INT NOT NULL CHECK (week >= 0),month INT NOT NULL CHECK (month >= 0),year INT NOT NULL CHECK (year >= 0),weekday VARCHAR NOT NULL)""")# INSERT RECORDSsongplay_table_insert = ("""INSERT INTO songplays VALUES (DEFAULT, %s, %s, %s, %s, %s, %s, %s, %s )""")# Updating the user level on conflictuser_table_insert = ("""INSERT INTO users (user_id, first_name, last_name, gender, level) VALUES (%s, %s, %s, %s, %s)ON CONFLICT (user_id) DO UPDATE SETlevel = EXCLUDED.level""")song_table_insert = ("""INSERT INTO songs (song_id, title, artist_id, year, duration) VALUES (%s, %s, %s, %s, %s)ON CONFLICT (song_id) DO NOTHING""")# Artist location, latitude and longitude might change and need to be updated.artist_table_insert = ("""INSERT INTO artists (artist_id, name, location, latitude, longitude) VALUES (%s, %s, %s, %s, %s)ON CONFLICT (artist_id) DO UPDATE SETlocation = EXCLUDED.location,latitude = EXCLUDED.latitude,longitude = EXCLUDED.longitude""")time_table_insert = ("""INSERT INTO time VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (start_time) DO NOTHING""")# FIND SONGSsong_select = ("""SELECT song_id, artists.artist_idFROM songs JOIN artists ON songs.artist_id = artists.artist_idWHERE songs.title = %sAND artists.name = %sAND songs.duration = %s""")# QUERY LISTScreate_table_queries = [user_table_create, artist_table_create, song_table_create, time_table_create, songplay_table_create]drop_table_queries = [songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop, time_table_drop]
create_tables.py -> contains code for setting up database. Running this file creates sparkifydb and also creates the fact and dimension tables.
import psycopg2from sql_queries import create_table_queries, drop_table_queriesdef create_database():
"""
Establishes database connection and return's the connection and cursor references.
:return: return's (cur, conn) a cursor and connection reference
"""
# connect to default database
#conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=postgres password=admin")
conn.set_session(autocommit=True)
cur = conn.cursor()
# create sparkify database with UTF8 encoding
cur.execute("DROP DATABASE IF EXISTS sparkifydb")
cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf8' TEMPLATE template0")
# close connection to default database
conn.close()
# connect to sparkify database
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=admin")
cur = conn.cursor()
return cur, conndef drop_tables(cur, conn):
"""
Run's all the drop table queries defined in sql_queries.py
:param cur: cursor to the database
:param conn: database connection reference
"""
for query in drop_table_queries:
cur.execute(query)conn.commit()def create_tables(cur, conn):
"""
Run's all the create table queries defined in sql_queries.py
:param cur: cursor to the database
:param conn: database connection reference
"""
for query in create_table_queries:
cur.execute(query)conn.commit()def main():
"""
Driver main function.
"""
cur, conn = create_database()
drop_tables(cur, conn)
print("Table dropped successfully!!")
create_tables(cur, conn)
print("Table created successfully!!")conn.close()if __name__ == "__main__":main()
etl.py-> read and process song_data and log_data
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *
def process_song_file(cur, filepath):
"""
Process songs files and insert records into the Postgres database.
:param cur: cursor reference
:param filepath: complete file path for the file to load
"""
# open song file
df = pd.DataFrame([pd.read_json(filepath, typ='series', convert_dates=False)])
for value in df.values:
num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = value
# insert artist record
artist_data = (artist_id, artist_name, artist_location, artist_latitude, artist_longitude)
cur.execute(artist_table_insert, artist_data)
# insert song record
song_data = (song_id, title, artist_id, year, duration)
cur.execute(song_table_insert, song_data)
print(f"Records inserted for file {filepath}")
def process_log_file(cur, filepath):
"""
Process Event log files and insert records into the Postgres database.
:param cur: cursor reference
:param filepath: complete file path for the file to load
"""
# open log file
df = df = pd.read_json(filepath, lines=True)
# filter by NextSong action
df = df[df['page'] == "NextSong"].astype({'ts': 'datetime64[ms]'})
# convert timestamp column to datetime
t = pd.Series(df['ts'], index=df.index)
# insert time data records
column_labels = ["timestamp", "hour", "day", "weelofyear", "month", "year", "weekday"]
time_data = []
for data in t:
time_data.append([data ,data.hour, data.day, data.weekofyear, data.month, data.year, data.day_name()])
time_df = pd.DataFrame.from_records(data = time_data, columns = column_labels)
for i, row in time_df.iterrows():
cur.execute(time_table_insert, list(row))
# load user table
user_df = df[['userId','firstName','lastName','gender','level']]
# insert user records
for i, row in user_df.iterrows():
cur.execute(user_table_insert, row)
# insert songplay records
for index, row in df.iterrows():
# get songid and artistid from song and artist tables
cur.execute(song_select, (row.song, row.artist, row.length))
results = cur.fetchone()
if results:
songid, artistid = results
else:
songid, artistid = None, None
# insert songplay record
songplay_data = ( row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
cur.execute(songplay_table_insert, songplay_data)
def process_data(cur, conn, filepath, func):
"""
Driver function to load data from songs and event log files into Postgres database.
:param cur: a database cursor reference
:param conn: database connection reference
:param filepath: parent directory where the files exists
:param func: function to call
"""
# get all files matching extension from directory
all_files = []
for root, dirs, files in os.walk(filepath):
files = glob.glob(os.path.join(root,'*.json'))
for f in files :
all_files.append(os.path.abspath(f))
# get total number of files found
num_files = len(all_files)
print('{} files found in {}'.format(num_files, filepath))
# iterate over files and process
for i, datafile in enumerate(all_files, 1):
func(cur, datafile)
conn.commit()
print('{}/{} files processed.'.format(i, num_files))
def main():
"""
Driver function for loading songs and log data into Postgres database
"""
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=admin")
cur = conn.cursor()
process_data(cur, conn, filepath='data/song_data', func=process_song_file)
process_data(cur, conn, filepath='data/log_data', func=process_log_file)
conn.close()
if __name__ == "__main__":
main()
print("\n\nFinished processing!!!\n\n")
main.py -> the entry point for the program
Project run: Steps
Run the drive program main.py as below.
python main.py
The create_tables.py and etl.py file can also be run independently as below:
python create_tables.py
python etl.py
.png)