India Visa Analysis
In this project I will set up the Spark master-worker architecture in a Docker container on Azure.I will then perform end-to-end data processing and visualization of visa numbers in India using PySpark and Plotly. I will take input and then write some jobs in spark python or spark sql and submit it to spark cluster for processing and do visualisation on the output.
Requirements:
- Microsft Azure
- Docker: The Spark master-worker architecture is set up in a Docker container on Azure.
- Python Libraries:
- PySpark
- pandas
- Plotly Express
- pycountry
- pycountry_convert
- fuzzywuzzy
Usage
- Data Input: Place your CSV file named
visa_number_in_india.csvin theinputdirectory. - Run the Script: Execute the provided Python script.
- Visualizations: After execution, you'll find the visualizations saved as HTML files in the
outputdirectory. - Cleaned Data: The cleaned data will also be saved as a CSV file in the
outputdirectory.
Process
- Data Ingestion: The script ingests the CSV file containing the visa numbers in India.
- Data Cleaning: The script standardizes column names, drops null columns, and corrects country names using fuzzy matching.
- Data Transformation: The data is further enriched by adding continent information for each country.
- Data Visualization: The cleaned and transformed data is visualized using Plotly Express to provide insights into visa trends in India.
- System Architecture: The Spark master-worker architecture is set up in a Docker container on Azure.
Steps
Firstly create a Virtual Machine in azure named it spark-cluster and create a connection to this spark-cluster through key,
Files Structure of the looks like this
/input/visa_number_in_india.csv
/jobs/visualisation.py
/output/
/docker-compose.yml
/upload_files.sh
/download_files.sh
Now i will install docker in this azure virtual machine
in docker-compose.yml i will specify all the services and their configurations
version: '3'
services:
spark-master:
build:
context: .
dockerfile: Dockerfile.spark
command: bin/spark-class org.apache.spark.deploy.master.Master
volumes:
- ./jobs:/opt/bitnami/spark/jobs
- ./input:/opt/bitnami/spark/input
- ./output:/opt/bitnami/spark/output
- ./requirements.txt:/requirements.txt
ports:
- "9090:8080"
- "7077:7077"
networks:
ruchith-project:
spark-worker: &worker
build:
context: .
dockerfile: Dockerfile.spark
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark- master:7077
volumes:
- ./jobs:/opt/bitnami/spark/jobs
- ./input:/opt/bitnami/spark/input
- ./output:/opt/bitnami/spark/output
- ./requirements.txt:/requirements.txt
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
networks:
ruchith-project:
spark-worker-2:
<<: *worker
spark-worker-3:
<<: *worker
spark-worker-4:
<<: *worker
networks:
ruchith-project:
I will create a upload_files.sh script file with scp command to copy input files to spark-cluster
upload_files.sh
scp -i spark-clusters_key.pem -r .//* azureuser@ip_address:/home/azureuser
and a download_file.sh script file to copy output data from spark-cluster to local and move it to the output folder
download_files.sh
scp -i spark-clusters_key.pem -rp azureuser@ip_address:/home/azureuser/output /
Then i will create a dockerfile.spark to speficy initial commands to run once spark-cluster is up and running
dockerfile.spark
FROM bitnami/spark:latest
COPY requirements.txt .
USER root
RUN apt-get clean && \
apt-get update && \
apt-get install -y python3-pip && \
pip3 install --no-cache-dir -r ./requirements.txt
When i run
sudo docker compose up -d All the services and container will get started, up and running
Now i will start with my visualisation.py code
visualisation.py
import plotly.express as px
import pycountry
import pycountry_convert as pcc
from fuzzywuzzy import process
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName('End to end processing').getOrCreate()
df = spark.read.csv('input/visa_number_in_india.csv', header=True, inferSchema=True)
# standardize or clean the columns
new_column_names = [col_name.replace(' ', '_')
.replace('/', '')
.replace('.', '')
.replace(',', '')
for col_name in df.columns]
df = df.toDF(*new_column_names)
# drop all null columns
df = df.dropna(how='all')
df = df.select('year', 'country', 'number_of_issued_numerical')
def correct_country_name(name, threshold=70):
countries = [country.name for country in pycountry.countries]
corrected_name, score = process.extractOne(name, countries)
if score >= threshold:
return corrected_name
# no changes
return name
def get_continent_name(country_name):
try:
country_code = pcc.country_name_to_country_alpha2(country_name, cn_name_format='default')
continent_code = pcc.country_alpha2_to_continent_code(country_code)
return pcc.convert_continent_code_to_continent_name(continent_code)
except:
return None
correct_country_name_udf = udf(correct_country_name, StringType())
df = df.withColumn('country', correct_country_name_udf(df['country']))
country_corrections = {
'Andra': 'Russia',
'Antigua Berbuda': 'Antigua and Barbuda',
'Barrane': 'Bahrain',
'Brush': 'Bhutan',
'Komoro': 'Comoros',
'Benan': 'Benin',
'Kiribass': 'Kiribati',
'Gaiana': 'Guyana',
'Court Jiboire': "Côte d'Ivoire",
'Lesot': 'Lesotho',
'Macau travel certificate': 'Macao',
'Moldoba': 'Moldova',
'Naure': 'Nauru',
'Nigail': 'Niger',
'Palao': 'Palau',
'St. Christopher Navis': 'Saint Kitts and Nevis',
'Santa Principa': 'Sao Tome and Principe',
'Saechel': 'Seychelles',
'Slinum': 'Saint Helena',
'Swaji Land': 'Eswatini',
'Torque menistan': 'Turkmenistan',
'Tsubaru': 'Zimbabwe',
'Kosovo': 'Kosovo'
}
df = df.replace(country_corrections, subset='country')
continent_udf = udf(get_continent_name, StringType())
df = df.withColumn('continent', continent_udf(df['country']))
df.createGlobalTempView('india_visa')
# VISUALISATION
df_cont = spark.sql("""
SELECT year, continent, sum(number_of_issued_numerical) visa_issued
FROM global_temp.india_visa
WHERE continent IS NOT NULL
GROUP BY year, continent
""")
df_cont = df_cont.toPandas()
fig = px.bar(df_cont, x='year', y='visa_issued', color='continent', barmode='group')
fig.update_layout(title_text="Number of visa issued in India between 2006 and 2020",
xaxis_title='Year',
yaxis_title='Number of visa issued',
legend_title='Continent')
fig.write_html('output/visa_number_in_india_continent_2006_2020.html')
# top 10 countries with the most issued visa in 2020
df_country = spark.sql("""
SELECT country, sum(number_of_issued_numerical) visa_issued
FROM global_temp.india_visa
WHERE country NOT IN ('total', 'others')
AND country IS NOT NULL
AND year = 2020
GROUP BY country
order by visa_issued DESC
LIMIT 10
""")
df_country = df_country.toPandas()
fig = px.bar(df_country, x='country', y='visa_issued', color='country')
fig.update_layout(title_text="Top 10 countries with most issued visa in 2020",
xaxis_title='Country',
yaxis_title='Number of visa issued',
legend_title='Country')
fig.write_html('output/visa_number_in_india_by_country_2020.html')
# display the output on the map
df_country_year_map = spark.sql("""
SELECT year, country, sum(number_of_issued_numerical) visa_issued
FROM global_temp.india_visa
WHERE country not in ('total', 'others')
and country is not null
group by year, country
ORDER BY year asc
""")
df_country_year_map = df_country_year_map.toPandas()
fig = px.choropleth(df_country_year_map, locations='country',
color='visa_issued',
hover_name='country',
animation_frame='year',
range_color=[100000, 100000],
color_continuous_scale=px.colors.sequential.Plasma,
locationmode='country names',
title='Yearly visa issued by countries'
)
fig.write_html('output/visa_number_in_india_year_map.html')
df.write.csv("output/visa_number_in_india_cleaned.csv", header=True, mode='overwrite')
spark.stop()
When all the code in run successfully on docker container I can see output file in my output directory and when i lanuch it i can the bar garph in a html webpage
.png)