Apache Flink Application: Real Time Analytics

 Real Time Analytics

    In this project I will be buidling an, Apache Flink application for real-time sales analytics which is  built using Docker Compose to orchestrate the necessary infrastructure components, including Apache Flink, Elasticsearch, and Postgres. The application processes financial transaction data from Kafka, performs aggregations, and stores the results in both Postgres and Elasticsearch for further analysis.

Project Architecture:

Project Components:

  • Apache Kafka: A distributed streaming platform that excels in publishing and subscribing to streams of records, storing these records, and processing them as they occur.
  • Apache Flink: A framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink is known for its high throughput and low-latency streaming capabilities.
  • PostgreSQL (Postgres): A powerful, open-source object-relational database system known for its reliability, feature robustness, and performance.
  • Elasticsearch: A distributed, RESTful search and analytics engine capable of addressing a growing number of use cases.
  • Kibana: A data visualization dashboard for Elasticsearch. It provides visualization capabilities on top of the content indexed on an Elasticsearch cluster.

Project Flow:

  1. Data Ingestion: Data is ingested in real-time through Apache Kafka, which acts as the initial entry point for streaming data.
  2. Stream Processing: Apache Flink processes the streaming data, performing tasks such as aggregations, enrichments, or complex computations.
  3. Data Storage and Indexing: Processed data is then stored in PostgreSQL for transactional data needs and simultaneously indexed in Elasticsearch for real-time analytics.
  4. Visualization: Kibana is used to visualize and analyze data stored in Elasticsearch.

Starting with project setup:

Setup system architecture on docker

    To begin, ensure you have Docker and Docker Compose installed on your system. Docker Compose will allow us to define and run multi-container Docker applications, which is ideal for setting up our stack.

docker-compose.yml

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: [ "CMD", "bash", "-c", "echo 'ruok' | nc localhost 2181" ]
      interval: 10s
      timeout: 5s
      retries: 5

  broker:
    image: confluentinc/cp-kafka:7.4.0
    hostname: broker
    container_name: broker
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
    healthcheck:
      test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
      interval: 10s
      timeout: 5s
      retries: 5

  postgres:
    image: postgres:latest
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: postgres
    healthcheck:
      test: [ "CMD", "pg_isready", "-U", "postgres" ]
      interval: 10s
      timeout: 5s
      retries: 5

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.1
    container_name: es-container
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node
    ports:
      - 9200:9200

  kibana:
    container_name: kb-container
    image: docker.elastic.co/kibana/kibana:8.11.1
    environment:
      - ELASTICSEARCH_HOSTS=http://es-container:9200
    depends_on:
      - elasticsearch
    ports:
      - 5601:5601

Now when I run docker compose up -d , all the services will get up and running

Setup Apache Flink

    Download and setup Apache flink

    Start Apache Flink
    
    To start a local cluster, run the bash script that comes with Flink:
    
    $ ./bin/start-cluster.sh

    On Web UI at localhost:8081 - I can  view the Flink dashboard and see that the cluster is up and running.

Generating E-commerce sales transactions using python and submitting it into kafka cluster 

The main.py file contain the code that helps that generate the sales transactions

import json
import random
import time

from faker import Faker
from confluent_kafka import SerializingProducer
from datetime import datetime

fake = Faker()

def generate_sales_transactions():
    user = fake.simple_profile()

    return {
        "transactionId": fake.uuid4(),
        "productId": random.choice(['product1', 'product2', 'product3', 'product4', 'product5', 'product6']),
        "productName": random.choice(['laptop', 'mobile', 'tablet', 'watch', 'headphone', 'speaker']),
        'productCategory': random.choice(['electronic', 'fashion', 'grocery', 'home', 'beauty', 'sports']),
        'productPrice': round(random.uniform(10, 1000), 2),
        'productQuantity': random.randint(1, 10),
        'productBrand': random.choice(['apple', 'samsung', 'oneplus', 'mi', 'boat', 'sony']),
        'currency': random.choice(['USD', 'GBP']),
        'customerId': user['username'],
        'transactionDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f%z'),
        "paymentMethod": random.choice(['credit_card', 'debit_card', 'online_transfer'])
    }

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f"Message delivered to {msg.topic} [{msg.partition()}]")
def main():
    topic = 'financial_transactions'
    producer= SerializingProducer({
        'bootstrap.servers': 'localhost:9092'
    })

    curr_time = datetime.now()

    while (datetime.now() - curr_time).seconds < 120:
        try:
            transaction = generate_sales_transactions()
            transaction['totalAmount'] = transaction['productPrice'] * transaction['productQuantity']

            print(transaction)

            producer.produce(topic,
                             key=transaction['transactionId'],
                             value=json.dumps(transaction),
                             on_delivery=delivery_report
                             )
            producer.poll(0)

            #wait for 5 seconds before sending the next transaction
            time.sleep(5)
        except BufferError:
            print("Buffer full! Waiting...")
            time.sleep(1)
        except Exception as e:
            print(e)

if __name__ == "__main__":
    main()

Set up Flink project 

    I setup Apache Flink Project by using the Java quickstart on IDE like IntelliJ Idea. Set up all the dependencies.

Flink Kafka Connection

    We connect to Kafka from Apache Flink using the KafkaSource connector from org.apache.flink.connector.kafka.source.KafkaSource;

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String topic = "financial_transactions";
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
        .setBootstrapServers("localhost:9092")
        .setTopics(topic)
        .setGroupId("flink-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new JSONValueDeserializationSchema())
        .build();
DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");

The JSONValueDeserializationSchema function is a very important deserializer that helps convert the data consumed from Kafka into our Transaction DTO (Data Transfer Object). We then converted the source to data stream using the DataStream module.

Transaction DTO

Here is the layout of the Transaction object;

package Dto;
import lombok.Data;
import java.sql.Timestamp;
@Data
public class Transaction {
    private String transactionId;
    private String productId;
    private String productName;
    private String productCategory;
    private double productPrice;
    private int productQuantity;
    private String productBrand;
    private double totalAmount;
    private String currency;
    private String customerId;
    private Timestamp transactionDate;
    private String paymentMethod;
}

JSONValueDeserializationSchema


package Deserializer;

import Dto.Transaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

public class JSONValueDeserializationSchema implements DeserializationSchema<Transaction> {

    private final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void open(InitializationContext context) throws Exception {
        DeserializationSchema.super.open(context);
    }

    @Override
    public Transaction deserialize(byte[] bytes) throws IOException {
        return objectMapper.readValue(bytes, Transaction.class);
    }

    @Override
    public boolean isEndOfStream(Transaction transaction) {
        return false;
    }

    @Override
    public TypeInformation<Transaction> getProducedType() {
        return TypeInformation.of(Transaction.class);
    }
}

 To see the data consumed from Kafka in Flink, we use:

transactionStream.print()

Flink-Postgres Interaction

    We start by creating the tables that will be used in the project on Postgres, we want to ensure EXACTLY_ONCE processing to avoid duplication of data into our DB.

    Here is the configuration of the JdbcExecutionOptions and JdbcConnectionOptions .

JdbcExecutionOptions execOptions = new JdbcExecutionOptions.Builder()
                .withBatchSize(1000)
                .withBatchIntervalMs(200)
                .withMaxRetries(5)
                .build();
JdbcConnectionOptions connOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl(jdbcUrl)
                .withDriverName("org.postgresql.Driver")
                .withUsername(username)
                .withPassword(password)
                .build();

Transactions Table

    
//create transactions table
transactionStream.addSink(JdbcSink.sink(
"CREATE TABLE IF NOT EXISTS transactions (" +
"transaction_id VARCHAR(255) PRIMARY KEY, " +
"product_id VARCHAR(255), " +
"product_name VARCHAR(255), " +
"product_category VARCHAR(255), " +
"product_price DOUBLE PRECISION, " +
"product_quantity INTEGER, " +
"product_brand VARCHAR(255), " +
"total_amount DOUBLE PRECISION, " +
"currency VARCHAR(255), " +
"customer_id VARCHAR(255), " +
"transaction_date TIMESTAMP, " +
"payment_method VARCHAR(255) " +
")",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {

},
execOptions,
connOptions
)).name("Create Transactions Table Sink");

Sales Per Day Table


//create sales_per_category table sink
transactionStream.addSink(JdbcSink.sink(
"CREATE TABLE IF NOT EXISTS sales_per_category (" +
"transaction_date DATE, " +
"category VARCHAR(255), " +
"total_sales DOUBLE PRECISION, " +
"PRIMARY KEY (transaction_date, category)" +
")",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {

},
execOptions,
connOptions
)).name("Create Sales Per Category Table");

Sales Per Month Table


//create sales_per_day table sink
transactionStream.addSink(JdbcSink.sink(
"CREATE TABLE IF NOT EXISTS sales_per_day (" +
"transaction_date DATE PRIMARY KEY, " +
"total_sales DOUBLE PRECISION " +
")",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {

},
execOptions,
connOptions
)).name("Create Sales Per Day Table");

Inserting Data into Postgres from Flink DataStream

    It is important at the point to note the syntax that is being used to insert data into postgres. If you noticed, we are starting from the beginning of the Kafka offset .setStartingOffsets(OffsetsInitializer.earliest()) when we configured our Kafka source. We need to handle multiple processing at this point.

Additionally we also need to perform some aggregations for Sales Per Category, Sales Per Day, and Sales Per Month before we eventually insert them into postgres DB.

Transactions Data Stream


transactionStream.addSink(JdbcSink.sink(
"INSERT INTO transactions(transaction_id, product_id, product_name,
 
product_category, product_price, " +
"product_quantity, product_brand, total_amount, currency,
 
customer_id, transaction_date, payment_method) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT (transaction_id) DO UPDATE SET " +
"product_id = EXCLUDED.product_id, " +
"product_name = EXCLUDED.product_name, " +
"product_category = EXCLUDED.product_category, " +
"product_price = EXCLUDED.product_price, " +
"product_quantity = EXCLUDED.product_quantity, " +
"product_brand = EXCLUDED.product_brand, " +
"total_amount = EXCLUDED.total_amount, " +
"currency = EXCLUDED.currency, " +
"customer_id = EXCLUDED.customer_id, " +
"transaction_date = EXCLUDED.transaction_date, " +
"payment_method = EXCLUDED.payment_method " +
"WHERE transactions.transaction_id = EXCLUDED.transaction_id",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {
preparedStatement.setString(1, transaction.getTransactionId());
preparedStatement.setString(2, transaction.getProductId());
preparedStatement.setString(3, transaction.getProductName());
preparedStatement.setString(4, transaction.getProductCategory());
preparedStatement.setDouble(5, transaction.getProductPrice());
preparedStatement.setInt(6, transaction.getProductQuantity());
preparedStatement.setString(7, transaction.getProductBrand());
preparedStatement.setDouble(8, transaction.getTotalAmount());
preparedStatement.setString(9, transaction.getCurrency());
preparedStatement.setString(10, transaction.getCustomerId());
preparedStatement.setTimestamp(11, transaction.getTransactionDate());
preparedStatement.setString(12, transaction.getPaymentMethod());
},
execOptions,
connOptions
)).name("Insert into transactions table sink");

Sales Per Category Data Stream

With sales per category, we need to perform an aggregation of the data before insertion. So we’re using three Map Reduce capabilities on our data stream to perform the aggregation.

transactionStream.map(
transaction -> {
Date transactionDate = new Date(System.currentTimeMillis());
String category = transaction.getProductCategory();
double totalSales = transaction.getTotalAmount();
return new SalesPerCategory(transactionDate, category, totalSales);
}
).keyBy(SalesPerCategory::getCategory)
.reduce((salesPerCategory, t1) -> {
salesPerCategory.setTotalSales(salesPerCategory.getTotalSales() + t1.getTotalSales());
return salesPerCategory;
})
.addSink(JdbcSink.sink(
"INSERT INTO sales_per_category(transaction_date,

 

category, total_sales) " +
"VALUES (?, ?, ?) " +
"ON CONFLICT (transaction_date, category) DO UPDATE SET " +
"total_sales = EXCLUDED.total_sales " +
"WHERE sales_per_category.category = EXCLUDED.category " +
"AND sales_per_category.transaction_date = EXCLUDED.transaction_date",
(JdbcStatementBuilder<SalesPerCategory>) (preparedStatement, salesPerCategory) -> {
preparedStatement.setDate(1, new Date(System.currentTimeMillis()));
preparedStatement.setString(2, salesPerCategory.getCategory());
preparedStatement.setDouble(3, salesPerCategory.getTotalSales());
},
execOptions,
connOptions
)).name("Insert into sales per category table");

Sales Per Day Data Stream


transactionStream.map(
transaction -> {
Date transactionDate = new Date(System.currentTimeMillis());
double totalSales = transaction.getTotalAmount();
return new SalesPerDay(transactionDate, totalSales);
}
).keyBy(SalesPerDay::getTransactionDate)
.reduce((salesPerDay, t1) -> {
salesPerDay.setTotalSales(salesPerDay.getTotalSales() + t1.getTotalSales());
return salesPerDay;
}).addSink(JdbcSink.sink(
"INSERT INTO sales_per_day(transaction_date, total_sales) " +
"VALUES (?,?) " +
"ON CONFLICT (transaction_date) DO UPDATE SET " +
"total_sales = EXCLUDED.total_sales " +
"WHERE sales_per_day.transaction_date = EXCLUDED.transaction_date",
(JdbcStatementBuilder<SalesPerDay>) (preparedStatement, salesPerDay) -> {
preparedStatement.setDate(1, new Date(System.currentTimeMillis()));
preparedStatement.setDouble(2, salesPerDay.getTotalSales());
},
execOptions,
connOptions
)).name("Insert into sales per day table");

Sales Per Month Data Stream


transactionStream.map(
transaction -> {
Date transactionDate = new Date(System.currentTimeMillis());
int year = transactionDate.toLocalDate().getYear();
int month = transactionDate.toLocalDate().getMonth().getValue();
double totalSales = transaction.getTotalAmount();
return new SalesPerMonth(year, month, totalSales);
}
).keyBy(SalesPerMonth::getMonth)
.reduce((salesPerMonth, t1) -> {
salesPerMonth.setTotalSales(salesPerMonth.getTotalSales() + t1.getTotalSales());
return salesPerMonth;
}).addSink(JdbcSink.sink(
"INSERT INTO sales_per_month(year, month, total_sales) " +
"VALUES (?,?,?) " +
"ON CONFLICT (year, month) DO UPDATE SET " +
"total_sales = EXCLUDED.total_sales " +
"WHERE sales_per_month.year = EXCLUDED.year " +
"AND sales_per_month.month = EXCLUDED.month ",
(JdbcStatementBuilder<SalesPerMonth>) (preparedStatement, salesPerMonth) -> {
preparedStatement.setInt(1, salesPerMonth.getYear());
preparedStatement.setInt(2, salesPerMonth.getMonth());
preparedStatement.setDouble(3, salesPerMonth.getTotalSales());
},
execOptions,
connOptions
)).name("Insert into sales per month table");

 

Indexing Data on Elasticsearch

To index data on elasticsearch with Flink we use the sinkTo function to generate a sink from our data stream into Elasticsearch.

transactionStream.sinkTo(
new Elasticsearch7SinkBuilder<Transaction>()
.setHosts(new HttpHost("localhost", 9200, "http"))
.setEmitter((transaction, runtimeContext, requestIndexer) -> {

String json = convertTransactionToJson(transaction);
IndexRequest indexRequest = Requests.indexRequest()
.index("transactions")
.id(transaction.getTransactionId())
.source(json, XContentType.JSON);
requestIndexer.add(indexRequest);
})
.build()
).name("Elasticsearch Sink");

This uses the emitter to index the data on elasticsearch in the transactions index and before indexing, we convert the data to string using convertTransactionToJson function.

After indexing, here’s the structure of the transaction index on elasticsearch. You can get this by running GET transactions in the DevTools.


Transactions index structure

Here is how the records inserted look like on elasticsearch; You can query them by running GET transactions/_search

Reindexing Data on Elasticsearch

If you noticed the transactionDate in the transactions index, it is reprensented with numbers. To get a readable transaction date, we need to reindex into a different index. To reindex data on elasticsearch, we use the _reindex function.

However, using toString() does not give us much room to wiggle around the format. So we need to use a more robust way to format the data.

Realtime Dashboard with Elasticsearch, Kibana

To create our Dashboard, we needed to create a view on Elasticsearch

Creating Donut Chart



Count of Records

Product Brand Chart

Final Dashboard

Once the charts have been arranged on the dashboard, the next thing is to set the update interval in the calendar icon on the top right corner of the screen. We use 20 seconds as our refresh interval.

Thanks for visiting, feel free to comment