End to End Data Pipeline Project(News Data)

 End to End Data Pipline creation wsing AWS, Apache Airflow, Postgres SQL and Snowflake

This is an End-to-End project, where I will be covering all steps from Data Extraction, Data Ingestion, and Data Analysis and also worflow orchestration(management)

The Project flow as follows,

    The project starts with extracting data from News Api's using python code and data is stored in parquet format in my local machine i.e, EC2 Instance. Then this data is loaded to Amazon S3 through AWS CLI by running S3 mv cmd. Now the data is in S3 in parquet format. To transfer theese files to Snowflake and external stage which acts as a staging area pointing to data in S3 and from this external stage the data is copied to Snowflake through snowflake_conn and various data analysis is done in Snowflake.

Steps:

         I will be running airflow and all the deployment on my AWS EC2 instance with ubuntu OS.
        So firstly I will set up AWS EC2 instance and create a role (ECtoS3) and assign permission to access S3 storage (AWSS3FullAccess)  and then set up a session through PUTTY to interact with this EC2.
        
        In putty execute following commands ( for setting up all necessary environment for airflow)
  • sudo apt update
  • sudo apt install python3-pip
  • sudo apt install sqlite3             //for storing metadata of airflow dags
  • pip3 install --upgrade awscli    //for executing AWS S3 mv commands
  • sudo pip3 install virtualenv      //create a virtual env to isolate our project
  • virtualenc airflow_env
  • source airflow_env/bin/activate
        Now i will execute these commands for setting up airflow
  • pip install "apache-airflow[celery]==2.7.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.8.txt"
  • pip install pandas 
  • pip install apache-airflow-providers-snowflake==5.1.2
  • pip install snowflake-connector-python==2.7.9
  • pip install snowflake-sqlalchemy==1.2.6
  • pip install pyarrow
  • pip install fastparquet     //to work with parquet files
  • airflow db init                   //initialise database to store metadata, dags, code
  • sudo apt-get install postgresql postgresql-contrib // basiclally to store metadata like user details
  • sudo -i -u postgres  // create an user for airflow
  • psql
  • CREATE DATABASE airflow;
  • CREATE USER airflow WITH PASSWORD 'airflow'
  • GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;

  • exit
  • exit

  • ls
  • cd airflow
  • sed -i 's#sqlite:////home/ubuntu/airflow/airflow.db#postgresql+psycopg2://airflow:airflow@localhost/airflow#g' airflow.cfg
  • sed -i 's#SequentialExecutor#LocalExecutor#g' airflow.cfg    // specifying databases in airflow config file
  • airflow db init          
  • airflow users create -u airflow -f airflow -l airflow -r Admin -e airflow@gmail.com
    • password--admin@1
  • mkdir /home/ubuntu/dags         // store all dags and python codes
  • cd airflow
  • vi airflow.cfg 
    • change some properties in airflow.cfg
      • dags_folder = /home/ubuntu/dags      // location to store dags and code
      • load_examples = False
        The dags folder will contains codes for Extraction(data_extraction.py) and Dags (workflow)(airflow_workflow.py) creating which will discuss below.Next step bulding the pipeline orchestration after all these enviroment setup

        1. Building the workflow on Airflow

         The work flow I am goona create looks like this
       

        The following code defines dags and workflow

     

       2.  Data Extraction: The first step in the workflow is Data Extraction. The required data is extracted from NewsAPI (newsapi.org). I will set up an account and get the API key to connect to this Website.

    the code for data extraction
                                                                    

Now since I have all the environment setup and the code let me start the server

        airflow db init  // when success message everything is working fine
        airflow webserver


    As I can see above, the server is started. Now let me run airflow scheduler by opening another PUTTY session

    command to start: source airflow_env/bin/activate
                                  airflow scheduler
        
                                                                

Now when I enter the webbrowser and visit site {localhost_ip_address}:8080
    I am in Airflow Webserver

The Dag I created looks like this

Now I can see my DAG but connection with SnowFlake is not established. To establish a connection, firstly i will create Snowflake account and set up the workspace
                                        
Now we will set up a connection - snowflake_conn



Here before executing dags we dont have any files in s3 or snowflake


Now I will run (debug) this Workflow

Here I can see status of all dags, (green)success and (blue)queued

Now after completion of All Dags run we can see data in s3


Now when I run the Snowflake queries, it is giving outputs



Now in Snowflake we can run SQL queries to analyse the data which is copied from S3 to snowflake database.

Thank you for your time. I explained all the outline and process of running the project.Feel free to comment for any doubts or improvements