End to End Data Pipline creation wsing AWS, Apache Airflow, Postgres SQL and Snowflake
This is an End-to-End project, where I will be covering all steps from Data Extraction, Data Ingestion, and Data Analysis and also worflow orchestration(management)
The Project flow as follows,
The project starts with extracting data from News Api's using python code and data is stored in parquet format in my local machine i.e, EC2 Instance. Then this data is loaded to Amazon S3 through AWS CLI by running S3 mv cmd. Now the data is in S3 in parquet format. To transfer theese files to Snowflake and external stage which acts as a staging area pointing to data in S3 and from this external stage the data is copied to Snowflake through snowflake_conn and various data analysis is done in Snowflake.
Steps:
I will be running airflow and all the deployment on my AWS EC2 instance with ubuntu OS.
So firstly I will set up AWS EC2 instance and create a role (ECtoS3) and assign permission to access S3 storage (AWSS3FullAccess) and then set up a session through PUTTY to interact with this EC2.
In putty execute following commands ( for setting up all necessary environment for airflow)
- sudo apt update
- sudo apt install python3-pip
- sudo apt install sqlite3 //for storing metadata of airflow dags
- pip3 install --upgrade awscli //for executing AWS S3 mv commands
- sudo pip3 install virtualenv //create a virtual env to isolate our project
- virtualenc airflow_env
- source airflow_env/bin/activate
- pip install "apache-airflow[celery]==2.7.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.8.txt"
- pip install pandas
- pip install apache-airflow-providers-snowflake==5.1.2
- pip install snowflake-connector-python==2.7.9
- pip install snowflake-sqlalchemy==1.2.6
- pip install pyarrow
- pip install fastparquet //to work with parquet files
- airflow db init //initialise database to store metadata, dags, code
- sudo apt-get install postgresql postgresql-contrib // basiclally to store metadata like user details
- sudo -i -u postgres // create an user for airflow
- psql
- CREATE DATABASE airflow;
- CREATE USER airflow WITH PASSWORD 'airflow'
- GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
- exit
- exit
- ls
- cd airflow
- sed -i 's#sqlite:////home/ubuntu/airflow/airflow.db#postgresql+psycopg2://airflow:airflow@localhost/airflow#g' airflow.cfg
- sed -i 's#SequentialExecutor#LocalExecutor#g' airflow.cfg // specifying databases in airflow config file
- airflow db init
- airflow users create -u airflow -f airflow -l airflow -r Admin -e airflow@gmail.com
- password--admin@1
- mkdir /home/ubuntu/dags // store all dags and python codes
- cd airflow
- vi airflow.cfg
- change some properties in airflow.cfg
- dags_folder = /home/ubuntu/dags // location to store dags and code
- load_examples = False
1. Building the workflow on Airflow
The work flow I am goona create looks like this
The following code defines dags and workflow
2. Data Extraction: The first step in the workflow is Data Extraction. The required data is extracted from NewsAPI (newsapi.org). I will set up an account and get the API key to connect to this Website.
the code for data extraction
Now since I have all the environment setup and the code let me start the server
airflow db init // when success message everything is working fine
airflow webserver
As I can see above, the server is started. Now let me run airflow scheduler by opening another PUTTY session
command to start: source airflow_env/bin/activate
airflow scheduler
Now when I enter the webbrowser and visit site {localhost_ip_address}:8080
I am in Airflow Webserver
Now I can see my DAG but connection with SnowFlake is not established. To establish a connection, firstly i will create Snowflake account and set up the workspace
Now we will set up a connection - snowflake_conn
Here before executing dags we dont have any files in s3 or snowflake
Now I will run (debug) this Workflow
Now after completion of All Dags run we can see data in s3
Now when I run the Snowflake queries, it is giving outputs
Now in Snowflake we can run SQL queries to analyse the data which is copied from S3 to snowflake database.
Thank you for your time. I explained all the outline and process of running the project.Feel free to comment for any doubts or improvements
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)
.png)