Data Modeling Project with Postgres

Data Modeling with Postgres

Overview

    In this project, I will be applying Data Modeling with Postgres and build an ETL pipeline using Python. 

Use Case: A startup wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. Currently, they are collecting data in json format and the analytics team is particularly interested in understanding what songs users are listening to.

Song Dataset

    Songs dataset is a subset of [Million Song Dataset](http://millionsongdataset.com/).

Sample Record :

{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}

Log Dataset

    Logs dataset is generated by [Event Simulator](https://github.com/Interana/eventsim).

    Sample Record :

{"artist": null, "auth": "Logged In", "firstName": "Walter", "gender": "M", "itemInSession": 0, "lastName": "Frye", "length": null, "level": "free", "location": "San Francisco-Oakland-Hayward, CA", "method": "GET","page": "Home", "registration": 1540919166796.0, "sessionId": 38, "song": null, "status": 200, "ts": 1541105830796, "userAgent": "\"Mozilla\/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/36.0.1985.143 Safari\/537.36\"", "userId": "39"}

Schema

    Fact Table 

        songplays - records in log data associated with song plays i.e. records with page NextSong

            songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

    Dimension Tables

        users - users in the app

            user_id, first_name, last_name, gender, level

        songs  - songs in music database

            song_id, title, artist_id, year, duration

        artists  - artists in music database

            artist_id, name, location, latitude, longitude

        time  - timestamps of records in  songplays  broken down into specific units

            start_time, hour, day, week, month, year, weekday

 Project Files

    sql_queries.py -> contains sql queries for dropping and  creating fact and dimension tables. Also, contains insertion query template.

# DROP TABLES

songplay_table_drop = "DROP TABLE  IF EXISTS songplays"
user_table_drop = "DROP TABLE IF EXISTS  users"
song_table_drop = "DROP TABLE IF EXISTS  songs"
artist_table_drop = "DROP TABLE  IF EXISTS artists"
time_table_drop = "DROP TABLE  IF EXISTS time"

# CREATE TABLES

songplay_table_create = ("""CREATE TABLE IF NOT EXISTS songplays(
songplay_id SERIAL CONSTRAINT songplay_pk PRIMARY KEY,
start_time TIMESTAMP REFERENCES time (start_time),
user_id INT REFERENCES users (user_id),
level VARCHAR NOT NULL,
song_id VARCHAR REFERENCES songs (song_id),
artist_id VARCHAR REFERENCES artists (artist_id),
session_id INT NOT NULL, 
location VARCHAR,
user_agent TEXT
)""")

user_table_create = ("""CREATE TABLE IF NOT EXISTS  users(
user_id  INT CONSTRAINT users_pk PRIMARY KEY,
first_name  VARCHAR,
last_name  VARCHAR,
gender  CHAR(1),
level VARCHAR NOT NULL
)""")

song_table_create = ("""CREATE TABLE  IF NOT EXISTS songs(
song_id VARCHAR CONSTRAINT songs_pk PRIMARY KEY,
title  VARCHAR,
artist_id  VARCHAR REFERENCES artists (artist_id),
year INT CHECK (year >= 0),
duration FLOAT
)""")

artist_table_create = ("""CREATE TABLE  IF NOT EXISTS artists(
artist_id VARCHAR CONSTRAINT artist_pk PRIMARY KEY,
name VARCHAR,
location VARCHAR,
latitude DECIMAL(9,6),
longitude DECIMAL(9,6)
)""")

time_table_create = ("""CREATE TABLE IF NOT EXISTS  time(
start_time  TIMESTAMP CONSTRAINT time_pk PRIMARY KEY,
hour INT NOT NULL CHECK (hour >= 0),
day INT NOT NULL CHECK (day >= 0),
week INT NOT NULL CHECK (week >= 0),
month INT NOT NULL CHECK (month >= 0),
year INT NOT NULL CHECK (year >= 0),
weekday VARCHAR NOT NULL
)""")

# INSERT RECORDS

songplay_table_insert = ("""INSERT INTO songplays VALUES (DEFAULT, %s, %s, %s, %s, %s, %s, %s, %s )
""")


# Updating the user level on conflict
user_table_insert = ("""INSERT INTO users (user_id, first_name, last_name, gender, level) VALUES (%s, %s, %s, %s, %s) 
                        ON CONFLICT (user_id) DO UPDATE SET 
                        level = EXCLUDED.level 
""")

song_table_insert = ("""INSERT INTO songs (song_id, title, artist_id, year, duration) VALUES (%s, %s, %s, %s, %s) 
                        ON CONFLICT (song_id) DO NOTHING                        
""")


# Artist location, latitude and longitude might change and need to be updated.
artist_table_insert = ("""INSERT INTO artists (artist_id, name, location, latitude, longitude) VALUES (%s, %s, %s, %s, %s) 
                          ON CONFLICT (artist_id) DO UPDATE SET
                          location = EXCLUDED.location,
                          latitude = EXCLUDED.latitude,
                          longitude = EXCLUDED.longitude
""")

time_table_insert = ("""INSERT INTO time VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (start_time) DO NOTHING
""")

# FIND SONGS

song_select = ("""
    SELECT song_id, artists.artist_id
    FROM songs JOIN artists ON songs.artist_id = artists.artist_id
    WHERE songs.title = %s
    AND artists.name = %s
    AND songs.duration = %s
""")

# QUERY LISTS

create_table_queries = [user_table_create, artist_table_create, song_table_create, time_table_create, songplay_table_create]
drop_table_queries = [songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop, time_table_drop]

    create_tables.py -> contains code for setting up database. Running this file creates sparkifydb and also creates the fact and dimension tables.

import psycopg2
from sql_queries import create_table_queries, drop_table_queries

def create_database():
    """
    Establishes database connection and return's the connection and cursor references.
    :return: return's (cur, conn) a cursor and connection reference
    """
    # connect to default database
    #conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
    conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=postgres password=admin")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    # create sparkify database with UTF8 encoding
    cur.execute("DROP DATABASE IF EXISTS sparkifydb")
    cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf8' TEMPLATE template0")
    # close connection to default database
    conn.close()    
    
    # connect to sparkify database
    conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=admin")
    cur = conn.cursor()
    
    return cur, conn

def drop_tables(cur, conn):
    """
    Run's all the drop table queries defined in sql_queries.py
    :param cur: cursor to the database
    :param conn: database connection reference
    """
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

def create_tables(cur, conn):
    """
    Run's all the create table queries defined in sql_queries.py
    :param cur: cursor to the database
    :param conn: database connection reference
    """
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()

def main():
    """
    Driver main function.
    """
    cur, conn = create_database()
    
    drop_tables(cur, conn)
    print("Table dropped successfully!!")
    create_tables(cur, conn)
    print("Table created successfully!!")
    conn.close()

if __name__ == "__main__":
    main()

     etl.py-> read and process song_data and log_data

import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

def process_song_file(cur, filepath):
    """
    Process songs files and insert records into the Postgres database.
    :param cur: cursor reference
    :param filepath: complete file path for the file to load
    """
    # open song file
    df = pd.DataFrame([pd.read_json(filepath, typ='series', convert_dates=False)])
    for value in df.values:
        num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = value
        # insert artist record
        artist_data = (artist_id, artist_name, artist_location, artist_latitude, artist_longitude)
        cur.execute(artist_table_insert, artist_data)
        # insert song record
        song_data = (song_id, title, artist_id, year, duration)
        cur.execute(song_table_insert, song_data)
    
    print(f"Records inserted for file {filepath}")

def process_log_file(cur, filepath):
    """
    Process Event log files and insert records into the Postgres database.
    :param cur: cursor reference
    :param filepath: complete file path for the file to load
    """
    # open log file
    df = df = pd.read_json(filepath, lines=True)
    # filter by NextSong action
    df = df[df['page'] == "NextSong"].astype({'ts': 'datetime64[ms]'})
    # convert timestamp column to datetime
    t = pd.Series(df['ts'], index=df.index)
    
    # insert time data records
    column_labels = ["timestamp", "hour", "day", "weelofyear", "month", "year", "weekday"]
    time_data = []
    for data in t:
        time_data.append([data ,data.hour, data.day, data.weekofyear, data.month, data.year, data.day_name()])
    time_df = pd.DataFrame.from_records(data = time_data, columns = column_labels)
    for i, row in time_df.iterrows():
        cur.execute(time_table_insert, list(row))
    # load user table
    user_df = df[['userId','firstName','lastName','gender','level']]
    # insert user records
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)
    # insert songplay records
    for index, row in df.iterrows():
        
        # get songid and artistid from song and artist tables
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()
        
        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None
        # insert songplay record
        songplay_data = ( row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)

def process_data(cur, conn, filepath, func):
    """
    Driver function to load data from songs and event log files into Postgres database.
    :param cur: a database cursor reference
    :param conn: database connection reference
    :param filepath: parent directory where the files exists
    :param func: function to call
    """
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))
    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))

def main():
    """
    Driver function for loading songs and log data into Postgres database
    """
    conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=admin")
    cur = conn.cursor()
    process_data(cur, conn, filepath='data/song_data', func=process_song_file)
    process_data(cur, conn, filepath='data/log_data', func=process_log_file)
    conn.close()

if __name__ == "__main__":
    main()
    print("\n\nFinished processing!!!\n\n")

        main.py -> the entry point for the program

Project run: Steps

    Run the drive program main.py as below.

        python main.py

    The create_tables.py and etl.py file can also be run independently as below:

        python create_tables.py 

        python etl.py 

To establish connection to postgre db and validate the data loaded