Real Time Analytics
Project Architecture:
Project Components:
- Apache Kafka: A distributed streaming platform that excels in publishing and subscribing to streams of records, storing these records, and processing them as they occur.
- Apache Flink: A framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink is known for its high throughput and low-latency streaming capabilities.
- PostgreSQL (Postgres): A powerful, open-source object-relational database system known for its reliability, feature robustness, and performance.
- Elasticsearch: A distributed, RESTful search and analytics engine capable of addressing a growing number of use cases.
- Kibana: A data visualization dashboard for Elasticsearch. It provides visualization capabilities on top of the content indexed on an Elasticsearch cluster.
Project Flow:
- Data Ingestion: Data is ingested in real-time through Apache Kafka, which acts as the initial entry point for streaming data.
- Stream Processing: Apache Flink processes the streaming data, performing tasks such as aggregations, enrichments, or complex computations.
- Data Storage and Indexing: Processed data is then stored in PostgreSQL for transactional data needs and simultaneously indexed in Elasticsearch for real-time analytics.
- Visualization: Kibana is used to visualize and analyze data stored in Elasticsearch.
Starting with project setup:
Setup system architecture on docker
version: '3.8'services:zookeeper:image: confluentinc/cp-zookeeper:7.4.0hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000healthcheck:test: [ "CMD", "bash", "-c", "echo 'ruok' | nc localhost 2181" ]interval: 10stimeout: 5sretries: 5broker:image: confluentinc/cp-kafka:7.4.0hostname: brokercontainer_name: brokerdepends_on:zookeeper:condition: service_healthyports:- "9092:9092"- "9101:9101"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_JMX_PORT: 9101KAFKA_JMX_HOSTNAME: localhosthealthcheck:test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]interval: 10stimeout: 5sretries: 5postgres:image: postgres:latestcontainer_name: postgresports:- "5432:5432"environment:POSTGRES_USER: postgresPOSTGRES_PASSWORD: postgresPOSTGRES_DB: postgreshealthcheck:test: [ "CMD", "pg_isready", "-U", "postgres" ]interval: 10stimeout: 5sretries: 5elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:8.11.1container_name: es-containerenvironment:- xpack.security.enabled=false- discovery.type=single-nodeports:- 9200:9200kibana:container_name: kb-containerimage: docker.elastic.co/kibana/kibana:8.11.1environment:- ELASTICSEARCH_HOSTS=http://es-container:9200depends_on:- elasticsearchports:- 5601:5601
Now when I run docker compose up -d , all the services will get up and running
Setup Apache Flink
Generating E-commerce sales transactions using python and submitting it into kafka cluster
import jsonimport randomimport timefrom faker import Fakerfrom confluent_kafka import SerializingProducerfrom datetime import datetimefake = Faker()def generate_sales_transactions():user = fake.simple_profile()return {"transactionId": fake.uuid4(),"productId": random.choice(['product1', 'product2', 'product3', 'product4', 'product5', 'product6']),"productName": random.choice(['laptop', 'mobile', 'tablet', 'watch', 'headphone', 'speaker']),'productCategory': random.choice(['electronic', 'fashion', 'grocery', 'home', 'beauty', 'sports']),'productPrice': round(random.uniform(10, 1000), 2),'productQuantity': random.randint(1, 10),'productBrand': random.choice(['apple', 'samsung', 'oneplus', 'mi', 'boat', 'sony']),'currency': random.choice(['USD', 'GBP']),'customerId': user['username'],'transactionDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f%z'),"paymentMethod": random.choice(['credit_card', 'debit_card', 'online_transfer'])}def delivery_report(err, msg):if err is not None:print(f'Message delivery failed: {err}')else:print(f"Message delivered to {msg.topic} [{msg.partition()}]")def main():topic = 'financial_transactions'producer= SerializingProducer({'bootstrap.servers': 'localhost:9092'})curr_time = datetime.now()while (datetime.now() - curr_time).seconds < 120:try:transaction = generate_sales_transactions()transaction['totalAmount'] = transaction['productPrice'] * transaction['productQuantity']print(transaction)producer.produce(topic,key=transaction['transactionId'],value=json.dumps(transaction),on_delivery=delivery_report)producer.poll(0)#wait for 5 seconds before sending the next transactiontime.sleep(5)except BufferError:print("Buffer full! Waiting...")time.sleep(1)except Exception as e:print(e)if __name__ == "__main__":main()
Set up Flink project
Flink Kafka Connection
We connect to Kafka from Apache Flink using the KafkaSource connector from org.apache.flink.connector.kafka.source.KafkaSource;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String topic = "financial_transactions";
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
.setBootstrapServers("localhost:9092")
.setTopics(topic)
.setGroupId("flink-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new JSONValueDeserializationSchema())
.build();
DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
The JSONValueDeserializationSchema function is a very important deserializer that helps convert the data consumed from Kafka into our Transaction DTO (Data Transfer Object). We then converted the source to data stream using the DataStream module.
Transaction DTO
The JSONValueDeserializationSchema function is a very important deserializer that helps convert the data consumed from Kafka into our Transaction DTO (Data Transfer Object). We then converted the source to data stream using the DataStream module.
Transaction DTO
Here is the layout of the Transaction object;
Here is the layout of the Transaction object;
package Dto;
import lombok.Data;
import java.sql.Timestamp;
@Data
public class Transaction {
private String transactionId;
private String productId;
private String productName;
private String productCategory;
private double productPrice;
private int productQuantity;
private String productBrand;
private double totalAmount;
private String currency;
private String customerId;
private Timestamp transactionDate;
private String paymentMethod;
}
JSONValueDeserializationSchema
package Deserializer;import Dto.Transaction;import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import java.io.IOException;public class JSONValueDeserializationSchema implements DeserializationSchema<Transaction> {private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void open(InitializationContext context) throws Exception {DeserializationSchema.super.open(context);}@Overridepublic Transaction deserialize(byte[] bytes) throws IOException {return objectMapper.readValue(bytes, Transaction.class);}@Overridepublic boolean isEndOfStream(Transaction transaction) {return false;}@Overridepublic TypeInformation<Transaction> getProducedType() {return TypeInformation.of(Transaction.class);}}
To see the data consumed from Kafka in Flink, we use:
transactionStream.print()
Flink-Postgres Interaction
We start by creating the tables that will be used in the project on Postgres, we want to ensure EXACTLY_ONCE processing to avoid duplication of data into our DB.
Here is the configuration of the JdbcExecutionOptions and JdbcConnectionOptions .
JdbcExecutionOptions execOptions = new JdbcExecutionOptions.Builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build();
JdbcConnectionOptions connOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(jdbcUrl)
.withDriverName("org.postgresql.Driver")
.withUsername(username)
.withPassword(password)
.build();
Transactions Table
//create transactions table
transactionStream.addSink(JdbcSink.sink(
"CREATE TABLE IF NOT EXISTS transactions (" +
"transaction_id VARCHAR(255) PRIMARY KEY, " +
"product_id VARCHAR(255), " +
"product_name VARCHAR(255), " +
"product_category VARCHAR(255), " +
"product_price DOUBLE PRECISION, " +
"product_quantity INTEGER, " +
"product_brand VARCHAR(255), " +
"total_amount DOUBLE PRECISION, " +
"currency VARCHAR(255), " +
"customer_id VARCHAR(255), " +
"transaction_date TIMESTAMP, " +
"payment_method VARCHAR(255) " +
")",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {
},
execOptions,
connOptions
)).name("Create Transactions Table Sink");
Sales Per Day Table
//create sales_per_category table sink
transactionStream.addSink(JdbcSink.sink(
"CREATE TABLE IF NOT EXISTS sales_per_category (" +
"transaction_date DATE, " +
"category VARCHAR(255), " +
"total_sales DOUBLE PRECISION, " +
"PRIMARY KEY (transaction_date, category)" +
")",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {
},
execOptions,
connOptions
)).name("Create Sales Per Category Table");
Sales Per Month Table
//create sales_per_day table sink
transactionStream.addSink(JdbcSink.sink(
"CREATE TABLE IF NOT EXISTS sales_per_day (" +
"transaction_date DATE PRIMARY KEY, " +
"total_sales DOUBLE PRECISION " +
")",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {
},
execOptions,
connOptions
)).name("Create Sales Per Day Table");
Inserting Data into Postgres from Flink DataStream
It is important at the point to note the syntax that is being used to insert data into postgres. If you noticed, we are starting from the beginning of the Kafka offset .setStartingOffsets(OffsetsInitializer.earliest()) when we configured our Kafka source. We need to handle multiple processing at this point.
Additionally we also need to perform some aggregations for Sales Per Category, Sales Per Day, and Sales Per Month before we eventually insert them into postgres DB.
Transactions Data Stream
transactionStream.addSink(JdbcSink.sink(
"INSERT INTO transactions(transaction_id, product_id, product_name,
product_category, product_price, " +
"product_quantity, product_brand, total_amount, currency,
customer_id, transaction_date, payment_method) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT (transaction_id) DO UPDATE SET " +
"product_id = EXCLUDED.product_id, " +
"product_name = EXCLUDED.product_name, " +
"product_category = EXCLUDED.product_category, " +
"product_price = EXCLUDED.product_price, " +
"product_quantity = EXCLUDED.product_quantity, " +
"product_brand = EXCLUDED.product_brand, " +
"total_amount = EXCLUDED.total_amount, " +
"currency = EXCLUDED.currency, " +
"customer_id = EXCLUDED.customer_id, " +
"transaction_date = EXCLUDED.transaction_date, " +
"payment_method = EXCLUDED.payment_method " +
"WHERE transactions.transaction_id = EXCLUDED.transaction_id",
(JdbcStatementBuilder<Transaction>) (preparedStatement, transaction) -> {
preparedStatement.setString(1, transaction.getTransactionId());
preparedStatement.setString(2, transaction.getProductId());
preparedStatement.setString(3, transaction.getProductName());
preparedStatement.setString(4, transaction.getProductCategory());
preparedStatement.setDouble(5, transaction.getProductPrice());
preparedStatement.setInt(6, transaction.getProductQuantity());
preparedStatement.setString(7, transaction.getProductBrand());
preparedStatement.setDouble(8, transaction.getTotalAmount());
preparedStatement.setString(9, transaction.getCurrency());
preparedStatement.setString(10, transaction.getCustomerId());
preparedStatement.setTimestamp(11, transaction.getTransactionDate());
preparedStatement.setString(12, transaction.getPaymentMethod());
},
execOptions,
connOptions
)).name("Insert into transactions table sink");
Sales Per Category Data Stream
With sales per category, we need to perform an aggregation of the data before insertion. So we’re using three Map Reduce capabilities on our data stream to perform the aggregation.
With sales per category, we need to perform an aggregation of the data before insertion. So we’re using three Map Reduce capabilities on our data stream to perform the aggregation.
transactionStream.map(
transaction -> {
Date transactionDate = new Date(System.currentTimeMillis());
String category = transaction.getProductCategory();
double totalSales = transaction.getTotalAmount();
return new SalesPerCategory(transactionDate, category, totalSales);
}
).keyBy(SalesPerCategory::getCategory)
.reduce((salesPerCategory, t1) -> {
salesPerCategory.setTotalSales(salesPerCategory.getTotalSales() + t1.getTotalSales());
return salesPerCategory;
})
.addSink(JdbcSink.sink(
"INSERT INTO sales_per_category(transaction_date,
category, total_sales) " +
"VALUES (?, ?, ?) " +
"ON CONFLICT (transaction_date, category) DO UPDATE SET " +
"total_sales = EXCLUDED.total_sales " +
"WHERE sales_per_category.category = EXCLUDED.category " +
"AND sales_per_category.transaction_date = EXCLUDED.transaction_date",
(JdbcStatementBuilder<SalesPerCategory>) (preparedStatement, salesPerCategory) -> {
preparedStatement.setDate(1, new Date(System.currentTimeMillis()));
preparedStatement.setString(2, salesPerCategory.getCategory());
preparedStatement.setDouble(3, salesPerCategory.getTotalSales());
},
execOptions,
connOptions
)).name("Insert into sales per category table");
Sales Per Day Data Stream
transactionStream.map(
transaction -> {
Date transactionDate = new Date(System.currentTimeMillis());
double totalSales = transaction.getTotalAmount();
return new SalesPerDay(transactionDate, totalSales);
}
).keyBy(SalesPerDay::getTransactionDate)
.reduce((salesPerDay, t1) -> {
salesPerDay.setTotalSales(salesPerDay.getTotalSales() + t1.getTotalSales());
return salesPerDay;
}).addSink(JdbcSink.sink(
"INSERT INTO sales_per_day(transaction_date, total_sales) " +
"VALUES (?,?) " +
"ON CONFLICT (transaction_date) DO UPDATE SET " +
"total_sales = EXCLUDED.total_sales " +
"WHERE sales_per_day.transaction_date = EXCLUDED.transaction_date",
(JdbcStatementBuilder<SalesPerDay>) (preparedStatement, salesPerDay) -> {
preparedStatement.setDate(1, new Date(System.currentTimeMillis()));
preparedStatement.setDouble(2, salesPerDay.getTotalSales());
},
execOptions,
connOptions
)).name("Insert into sales per day table");
Sales Per Month Data Stream
transactionStream.map(
transaction -> {
Date transactionDate = new Date(System.currentTimeMillis());
int year = transactionDate.toLocalDate().getYear();
int month = transactionDate.toLocalDate().getMonth().getValue();
double totalSales = transaction.getTotalAmount();
return new SalesPerMonth(year, month, totalSales);
}
).keyBy(SalesPerMonth::getMonth)
.reduce((salesPerMonth, t1) -> {
salesPerMonth.setTotalSales(salesPerMonth.getTotalSales() + t1.getTotalSales());
return salesPerMonth;
}).addSink(JdbcSink.sink(
"INSERT INTO sales_per_month(year, month, total_sales) " +
"VALUES (?,?,?) " +
"ON CONFLICT (year, month) DO UPDATE SET " +
"total_sales = EXCLUDED.total_sales " +
"WHERE sales_per_month.year = EXCLUDED.year " +
"AND sales_per_month.month = EXCLUDED.month ",
(JdbcStatementBuilder<SalesPerMonth>) (preparedStatement, salesPerMonth) -> {
preparedStatement.setInt(1, salesPerMonth.getYear());
preparedStatement.setInt(2, salesPerMonth.getMonth());
preparedStatement.setDouble(3, salesPerMonth.getTotalSales());
},
execOptions,
connOptions
)).name("Insert into sales per month table");
Indexing Data on Elasticsearch
To index data on elasticsearch with Flink we use the sinkTo function to generate a sink from our data stream into Elasticsearch.
To index data on elasticsearch with Flink we use the sinkTo function to generate a sink from our data stream into Elasticsearch.
transactionStream.sinkTo(
new Elasticsearch7SinkBuilder<Transaction>()
.setHosts(new HttpHost("localhost", 9200, "http"))
.setEmitter((transaction, runtimeContext, requestIndexer) -> {String json = convertTransactionToJson(transaction);
IndexRequest indexRequest = Requests.indexRequest()
.index("transactions")
.id(transaction.getTransactionId())
.source(json, XContentType.JSON);
requestIndexer.add(indexRequest);
})
.build()
).name("Elasticsearch Sink");
This uses the emitter to index the data on elasticsearch in the transactions index and before indexing, we convert the data to string using convertTransactionToJson function.
After indexing, here’s the structure of the transaction index on elasticsearch. You can get this by running GET transactions in the DevTools.
Transactions index structureHere is how the records inserted look like on elasticsearch; You can query them by running GET transactions/_search
This uses the emitter to index the data on elasticsearch in the transactions index and before indexing, we convert the data to string using convertTransactionToJson function.
After indexing, here’s the structure of the transaction index on elasticsearch. You can get this by running GET transactions in the DevTools.
Here is how the records inserted look like on elasticsearch; You can query them by running GET transactions/_search
Reindexing Data on Elasticsearch
Realtime Dashboard with Elasticsearch, Kibana
To create our Dashboard, we needed to create a view on Elasticsearch
To create our Dashboard, we needed to create a view on Elasticsearch