Azure Spark Cluster Data Engineering Project

India Visa Analysis

In this project I will set up the Spark master-worker architecture in a Docker container on Azure.I will then perform end-to-end data processing and visualization of visa numbers in India using PySpark and Plotly. I will take input and then write some jobs in spark python or spark sql and submit it to spark cluster for processing and do visualisation on the output.

Requirements:

  1. Microsft Azure
  2. Docker: The Spark master-worker architecture is set up in a Docker container on Azure.
  3. Python Libraries
    • PySpark
    • pandas
    • Plotly Express
    • pycountry
    • pycountry_convert
    • fuzzywuzzy

Usage

  1. Data Input: Place your CSV file named visa_number_in_india.csv in the input directory.
  2. Run the Script: Execute the provided Python script.
  3. Visualizations: After execution, you'll find the visualizations saved as HTML files in the output directory.
  4. Cleaned Data: The cleaned data will also be saved as a CSV file in the output directory.

Process

  • Data Ingestion: The script ingests the CSV file containing the visa numbers in India.
  • Data Cleaning: The script standardizes column names, drops null columns, and corrects country names using fuzzy matching.
  • Data Transformation: The data is further enriched by adding continent information for each country.
  • Data Visualization: The cleaned and transformed data is visualized using Plotly Express to provide insights into visa trends in India.
  • System Architecture: The Spark master-worker architecture is set up in a Docker container on Azure.

Steps

 Firstly create a Virtual Machine in azure named it spark-cluster and create a connection to this spark-cluster through key,

Files Structure of the looks like this
    /input/visa_number_in_india.csv
    /jobs/visualisation.py
    /output/
    /docker-compose.yml
    /upload_files.sh
    /download_files.sh

Now i will install docker in this azure virtual machine

in docker-compose.yml i will specify all the services and their configurations

version: '3'

    services:
      spark-master:
        build:
          context: .
          dockerfile: Dockerfile.spark
        command: bin/spark-class org.apache.spark.deploy.master.Master
        volumes:
          - ./jobs:/opt/bitnami/spark/jobs
          - ./input:/opt/bitnami/spark/input
          - ./output:/opt/bitnami/spark/output
          - ./requirements.txt:/requirements.txt
        ports:
          - "9090:8080"
          - "7077:7077"
        networks:
          ruchith-project:

      spark-worker: &worker
        build:
          context: .
          dockerfile: Dockerfile.spark
        command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-   master:7077
        volumes:
          - ./jobs:/opt/bitnami/spark/jobs
          - ./input:/opt/bitnami/spark/input
          - ./output:/opt/bitnami/spark/output
          - ./requirements.txt:/requirements.txt
        depends_on:
          - spark-master
        environment:
          SPARK_MODE: worker
          SPARK_WORKER_CORES: 2
          SPARK_WORKER_MEMORY: 1g
          SPARK_MASTER_URL: spark://spark-master:7077
        networks:
          ruchith-project:

      spark-worker-2:
        <<: *worker

      spark-worker-3:
        <<: *worker

      spark-worker-4:
        <<: *worker

    networks:
      ruchith-project:

I will create a upload_files.sh script file with scp command to copy input files to spark-cluster 
    upload_files.sh
        scp -i spark-clusters_key.pem -r .//* azureuser@ip_address:/home/azureuser

and a download_file.sh script file to copy output data from spark-cluster to local and move it to the output folder
    download_files.sh
        scp -i spark-clusters_key.pem -rp azureuser@ip_address:/home/azureuser/output /

Then i will create a dockerfile.spark to speficy initial commands to run once spark-cluster is up and running 
    dockerfile.spark
        FROM bitnami/spark:latest

        COPY requirements.txt .

        USER root

        RUN apt-get clean  && \
        apt-get update && \
        apt-get install -y python3-pip && \
        pip3 install --no-cache-dir -r ./requirements.txt
    
When i run 
    sudo docker compose up -d    All the services and container will get started, up and running

Now i will start with my visualisation.py code
    
    visualisation.py

import plotly.express as px
import pycountry
import pycountry_convert as pcc
from fuzzywuzzy import process
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName('End to end processing').getOrCreate()

df = spark.read.csv('input/visa_number_in_india.csv', header=True, inferSchema=True)

# standardize or clean the columns
new_column_names = [col_name.replace(' ', '_')
                    .replace('/', '')
                    .replace('.', '')
                    .replace(',', '')
                    for col_name in df.columns]
df = df.toDF(*new_column_names)

# drop all null columns
df = df.dropna(how='all')

df = df.select('year', 'country', 'number_of_issued_numerical')


def correct_country_name(name, threshold=70):
    countries = [country.name for country in pycountry.countries]

    corrected_name, score = process.extractOne(name, countries)

    if score >= threshold:
        return corrected_name

    # no changes
    return name


def get_continent_name(country_name):
    try:
        country_code = pcc.country_name_to_country_alpha2(country_name, cn_name_format='default')
        continent_code = pcc.country_alpha2_to_continent_code(country_code)
        return pcc.convert_continent_code_to_continent_name(continent_code)
    except:
        return None


correct_country_name_udf = udf(correct_country_name, StringType())
df = df.withColumn('country', correct_country_name_udf(df['country']))

country_corrections = {
    'Andra': 'Russia',
    'Antigua Berbuda': 'Antigua and Barbuda',
    'Barrane': 'Bahrain',
    'Brush': 'Bhutan',
    'Komoro': 'Comoros',
    'Benan': 'Benin',
    'Kiribass': 'Kiribati',
    'Gaiana': 'Guyana',
    'Court Jiboire': "Côte d'Ivoire",
    'Lesot': 'Lesotho',
    'Macau travel certificate': 'Macao',
    'Moldoba': 'Moldova',
    'Naure': 'Nauru',
    'Nigail': 'Niger',
    'Palao': 'Palau',
    'St. Christopher Navis': 'Saint Kitts and Nevis',
    'Santa Principa': 'Sao Tome and Principe',
    'Saechel': 'Seychelles',
    'Slinum': 'Saint Helena',
    'Swaji Land': 'Eswatini',
    'Torque menistan': 'Turkmenistan',
    'Tsubaru': 'Zimbabwe',
    'Kosovo': 'Kosovo'
}

df = df.replace(country_corrections, subset='country')

continent_udf = udf(get_continent_name, StringType())
df = df.withColumn('continent', continent_udf(df['country']))

df.createGlobalTempView('india_visa')

# VISUALISATION
df_cont = spark.sql("""
    SELECT year, continent, sum(number_of_issued_numerical) visa_issued
    FROM global_temp.india_visa
    WHERE continent IS NOT NULL
    GROUP BY year, continent
""")

df_cont = df_cont.toPandas()

fig = px.bar(df_cont, x='year', y='visa_issued', color='continent', barmode='group')

fig.update_layout(title_text="Number of visa issued in India between 2006 and 2020",
                  xaxis_title='Year',
                  yaxis_title='Number of visa issued',
                  legend_title='Continent')

fig.write_html('output/visa_number_in_india_continent_2006_2020.html')

# top 10 countries with the most issued visa in 2020
df_country = spark.sql("""
    SELECT country, sum(number_of_issued_numerical) visa_issued
    FROM global_temp.india_visa
    WHERE country NOT IN ('total', 'others')
    AND country IS NOT NULL
    AND year = 2020
    GROUP BY country
    order by visa_issued DESC
    LIMIT 10
""")

df_country = df_country.toPandas()

fig = px.bar(df_country, x='country', y='visa_issued', color='country')

fig.update_layout(title_text="Top 10 countries with most issued visa in 2020",
                  xaxis_title='Country',
                  yaxis_title='Number of visa issued',
                  legend_title='Country')

fig.write_html('output/visa_number_in_india_by_country_2020.html')

# display the output on the map
df_country_year_map = spark.sql("""
    SELECT year, country, sum(number_of_issued_numerical) visa_issued
    FROM global_temp.india_visa
    WHERE country not in ('total', 'others')
    and country is not null
    group by year, country
    ORDER BY year asc
""")

df_country_year_map = df_country_year_map.toPandas()

fig = px.choropleth(df_country_year_map, locations='country',
                    color='visa_issued',
                    hover_name='country',
                    animation_frame='year',
                    range_color=[100000, 100000],
                    color_continuous_scale=px.colors.sequential.Plasma,
                    locationmode='country names',
                    title='Yearly visa issued by countries'
                    )

fig.write_html('output/visa_number_in_india_year_map.html')

df.write.csv("output/visa_number_in_india_cleaned.csv", header=True, mode='overwrite')

spark.stop()
        
When all the code in run successfully on docker container I can see output file in my output directory and when i lanuch it i can the bar garph in a html webpage