Data Streaming Realtime Project


Realtime Data Streaming Project

    In this project I will be building a real-time end-to-end data streaming pipeline covering each phase from data ingestion to processing and finally storing. I will be using tools like Apache Airflow, Python, Apache Kafka, Apache Zookeeper, Apache Spark, and Cassandra—containerization is done using Docker. I will discuss how these technologies work together to ingest, process and store data

Project flow                  

Project Architecture Overview

  • Data Ingestion: Raw data is ingested into the system using Kafka. The data can come from various sources like IoT devices, user activity logs, etc.
  • Data Processing: Airflow schedules Spark jobs to process the raw data. The processed data can either be aggregated, filtered, or transformed based on the business logic.
  • Data Storage: The processed data is stored in either Cassandra for NoSQL needs or PostgreSQL for relational data storage.
  • Orchestration: Docker containers encapsulate each component of the architecture, ensuring isolation and ease of deployment.

Project Components

  • Data Source: We use randomuser.me API to generate random user data for our pipeline.
  • Apache Airflow: Responsible for orchestrating the pipeline and storing fetched data in a PostgreSQL database.
  • Apache Kafka and Zookeeper: Used for streaming data from PostgreSQL to the processing engine.
  • Control Center and Schema Registry: Helps in monitoring and schema management of our Kafka streams.
  • Apache Spark: For data processing with its master and worker nodes.
  • Cassandra: Where the processed data will be stored.

The technologies used

  • Docker: A containerization tool that allows for isolated, consistent,and easily deployable applications.
  • Apache Airflow: An open-source platform used to programmatically author, schedule, and monitor workflows.
  • Apache Kafka: A distributed streaming platform designed for fault tolerance, high throughput, and scalability.
  • Apache Spark: A unified analytics engine for big data processing, with built-in modules for streaming, SQL, machine learning, and graph processing.
  • Apache Cassandra: A NoSQL database ideal for handling large amounts of data across many commodity servers, without any single point of failure.
  • PostgreSQL: An open-source relational database that focuses on extensibility and SQL compliance.
  • Python
Project Execution

    Firstly write python code to extract data from API and I ran this in a python virtual environment

        Python code for data extraction as follows, it is called in DAG

        


I can see that extraction is working fine

Now I need to setup airflow, kafka, spark, postgres and others on Docker Container. This can be done by specifying the each component and their dependencies in docker-compose.yml

    Airflow and Postgres - standalone

    Contol centre is dependent on schema registry listening to events on registry which helps to visualize the data directly on kafka which is managed by Zookeeper

    Spark - Standalone

    docker-compose.yml

        Zookeeper

        Kafka
        Schema-registry - ensures you have data verification and data versioning for your Kafka producers and consumers  & Control center
            

    Now when i run the command 
                      docker compose up -d


In the docker-desktop i can see services running 

 After successful running of all these services, when i open localhost:9091  I can see the control center with all services
Now I need to create a kafka topic, for that i will install a package 

    pip install kafka-python
    add this piece of code in kafka_streaming.py - where we defined our dags
            
                def stream_data():
                         .....................................
                         producer = KafkaProducer(bootstrap_servers=['broker:29092'],                                                                     max_block_ms=5000)
                         producer.send('user_created',json.dumps(data) .encode('uts-8')

when i run this code python/dags/kafka_streaming.py
    In the control on docker I can see the topic created 
    
Now I have airflow and kafka all I need to do is transfer data from airflow to kafka which is stored in postrgres by airflow

before doing this let me add the webserver properties in my docker-compose.yml

     Webserver 
Now i will create an entry point for the project
        file structure
                ./dags/..
                ./scripts/entrypoint.sh         // which path is to be added in webserver properties as above docker-compose.yml
            
       entrypoint.sh
        
Since the data is taken from postgres I need to set it up
    after installing i update the postgres properties in docker-compose.yml

     Postgres
After this i need to scheduler to scheduler the workflow which to be added to docker-compose.yml

    
 Scheduler
            
Since all these services are to be installed, I will append a requrirements.txt in docker-compose.yml

        in scheduler section
            volumes:
                  - ./dags:/opt/airflow/dags
                  - ./script/entrypoint.sh:/opt/airflow/script/entrypoint.sh
                  - ./requirements.txt:/opt/airflow/requirements.txt
            ......
            command: bash -c "pip install -r ./requirements.txt && airflow db upgrade && airflow scheduler"

So this requirements.txt will have all the neccessary packages

Next when I do docker compose up -d  // all these services will up and start running in docker desktop

The scheduler also start installing and running airflow it is the entry point
Now the airflow is up and running 
        visit localhost:8080
            
Here the DAG is successfully created

The Grid is working fine and I have stream_data_api initalised

Now in extraction part as before we extracted selected particular data from random api but now we need to all the data api to ingected directly nito kafka

    code as follows
             Kafka_stream.py
                      here the data is fomatted to make it clean and more readable


This code will read streaming data from api contiuosly for the required time

Here i can see the streaming data is written continuosly for a minute

Till now i have built airflow dag, setup postgre, sceduler and data is being written to kafka. In next steps I nedd to do processing on these data using spark and store it into cassandra.

I will setup spark and cassandra configurations/properties in docker-compose.yml

     Spark - here i have  setup master and worker
    Cassandra

    Now all configurations are set and when i run docker compose up -d both spark (master,worker) and cassandra will start running

We can check all the services running but command docker ps.

All the services are up and running. Now for spark streaming i will create a spark_stream.py 

            folder structure
                    ./dags/
                    ./scripts/
                    .spark_stream.py
I need spark and pyspark dependecnies pip install spark pyspark

The code in spark_stream.py
I will submit this code to spark cluster
    run docker compose up -d                // to start all services (checking)
    now
    spark-submit --master spark://localhost:7077 spark_stream.py

After running this command the spark cluster will initialise the Airflow DAG will extract data from API and then is inserted to kafka from there the data is streamed by spark to Cassandra -  All this process is orchestrated by airflow DAG

    Airflow localhost:8080
Here the dag has successfully run

When I login to cassandra, I can see the data inserted in it


Thank you for your time. Feel free to comment