Post

Real-Time Sales Order Processing

Real-Time Sales Order Processing

Real-Time Sales Order Processing with Spring Boot, RabbitMQ, Spring Cloud Data Flow, PostgreSQL, Debezium, and GemFire

This document outlines a workflow for processing sales orders in real-time, leveraging a combination of Spring Boot, RabbitMQ, Spring Cloud Data Flow, PostgreSQL, Debezium, and GemFire.

Architecture Overview

The system follows these steps:

  1. Sales Order Generation (Spring Boot): A Spring Boot application generates sales order data.
  2. Message Queuing (RabbitMQ): These sales orders are published as JSON payloads to a RabbitMQ queue.
  3. Data Ingestion (Spring Cloud Data Flow): Spring Cloud Data Flow is used to create a pipeline that reads messages from the RabbitMQ queue and inserts them into a PostgreSQL table (salesorders).
  4. Data Transformation (PostgreSQL): PostgreSQL triggers and stored procedures are used to automatically process new entries in the salesorders table. This involves parsing the JSON payload and populating a denormalized table (salesorders_read) with individual columns.
  5. Change Data Capture (Debezium & Spring Cloud Data Flow): Spring Cloud Data Flow, utilizing a Debezium source connector, captures change data events from the salesorders_read table in PostgreSQL.
  6. Real-Time Data Sink (GemFire): Changes captured by Debezium are then written in real-time to an “orders” region in GemFire.

realtimeprocessing

Prerequisites

Before you begin, ensure you have the following installed and running:

  • Docker: For containerizing the infrastructure components.
  • Java Development Kit (JDK): For running Spring Boot and Spring Cloud Data Flow.
  • Maven: For building Spring Boot applications.
  • Spring Cloud Data Flow Server: Set up and running. You can follow the official Spring Cloud Data Flow documentation for installation.

Setting Up the Infrastructure

  1. Start PostgreSQL with WAL Replication:

    1
    
    docker run  -d --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:2.3.3.Final
    
  2. Create a Docker Network for GemFire:

    1
    
    docker network create gf-network
    
  3. Start GemFire Locator:

    1
    
    docker run -e 'ACCEPT_TERMS=y' -d --name gf-locator --network=gf-network -p 10334:10334 -p 1099:1099 -p 7070:7070 gemfire/gemfire:9.15.6 gfsh start locator --name=locator1 --jmx-manager-hostname-for-clients=127.0.0.1 --hostname-for-clients=127.0.0.1
    
  4. Start GemFire Server:

    1
    
    docker run -e 'ACCEPT_TERMS=y' -d --name gf-server1 --network=gf-network -p 40404:40404 gemfire/gemfire:9.15.6 gfsh start server --name=server1 --locators=gf-locator\[10334\] --hostname-for-clients=127.0.0.1
    
  5. Run rabbitmq in docker
    1
    2
    3
    4
    
     docker network create rmq-network
    
     docker run -d --hostname my-rabbit ---name rabbitmq --network rmq-network  -p 5552:5552 -p 15672:15672 -p 5672:5672  rabbitmq:3-management
    
    
  6. Enable RabbitMQ Streams Plugin
    1
    2
    
     docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream 
     docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management
    
  7. sample output from RabbitMQ after enabling stream plugin
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
     rabbitmq_stream_management
     Enabling plugins on node rabbit@d212bb73a912:
     rabbitmq_stream
     rabbitmq_stream_management
     The following plugins have been configured:
       rabbitmq_management
       rabbitmq_management_agent
       rabbitmq_prometheus
       rabbitmq_stream
       rabbitmq_stream_management
       rabbitmq_web_dispatch
     Applying plugin configuration to rabbit@d212bb73a912...
     The following plugins have been enabled:
       rabbitmq_stream
       rabbitmq_stream_management
    
     started 2 plugins.
    

Database Schema and Logic (PostgreSQL)

Execute the following SQL scripts in your PostgreSQL database (postgres database created by the Docker container):

  • DDL for salesorders,saleorders_read table
  • Stored procedure that load data from salesorders to salesorders_read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
CREATE SEQUENCE IF NOT EXISTS public.salesorders_order_id_seq;

CREATE TABLE IF NOT EXISTS public.salesorders (
    order_id integer NOT NULL DEFAULT nextval('salesorders_order_id_seq'::regclass),
    payload character varying(1000) COLLATE pg_catalog."default",
    CONSTRAINT salesorders_pkey PRIMARY KEY (order_id)
);

CREATE OR REPLACE FUNCTION public.sales_order_trigger_func()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
    -- Call the stored procedure to process the order data
    CALL public.process_sales_order(NEW.order_id, NEW.payload);
    RETURN NEW;
END;
$$;

CREATE OR REPLACE TRIGGER sales_order_update_trigger
    AFTER INSERT OR UPDATE
    ON public.salesorders
    FOR EACH ROW
    EXECUTE FUNCTION public.sales_order_trigger_func();

CREATE TABLE IF NOT EXISTS public.salesorders_read
(
    order_id integer,
    product character varying(255) COLLATE pg_catalog."default",
    price numeric(10,2),
    quantity integer,
    ship_to character varying(50) COLLATE pg_catalog."default",
    payment_method character varying(50) COLLATE pg_catalog."default",
    order_date date,
    address character varying(50) COLLATE pg_catalog."default",
    store_name character varying(50) COLLATE pg_catalog."default",
    store_address character varying(50) COLLATE pg_catalog."default",
    sales_rep_name character varying(50) COLLATE pg_catalog."default",
    CONSTRAINT salesorders_read_pkey PRIMARY KEY (order_id)
);

CREATE OR REPLACE PROCEDURE public.process_sales_order(order_id INT, payload TEXT)
LANGUAGE plpgsql
AS $$
DECLARE
    product VARCHAR(255);
    price NUMERIC(10, 2);
    quantity INT;
    ship_to VARCHAR(50);
    payment_method VARCHAR(50);
    order_date DATE;
    address VARCHAR(50);
    store_name VARCHAR(50);
    store_address VARCHAR(50);
    sales_rep_name VARCHAR(50);
BEGIN
    -- Extract values using regular expressions (assuming payload format: product='...', price=..., ...)
    product := REGEXP_REPLACE(payload, '.*product=''([^'']*)''.*', '\1');
    price := REGEXP_REPLACE(payload, '.*price=([\d\.]+).*', '\1')::NUMERIC(10, 2);
    quantity := REGEXP_REPLACE(payload, '.*quantity=(\d+).*', '\1')::INT;
    ship_to := REGEXP_REPLACE(payload, '.*shipTo=''([^'']*)''.*', '\1');
    payment_method := REGEXP_REPLACE(payload, '.*paymentMethod=''([^'']*)''.*', '\1');
    order_date := REGEXP_REPLACE(payload, '.*orderDate=([\d\-]+).*', '\1')::DATE;
    address := REGEXP_REPLACE(payload, '.*address=''([^'']*)''.*', '\1');
    store_name := REGEXP_REPLACE(payload, '.*storeName=''([^'']*)''.*', '\1');
    store_address := REGEXP_REPLACE(payload, '.*storeAddress=''([^'']*)''.*', '\1');
    sales_rep_name := REGEXP_REPLACE(payload, '.*salesRepName=''([^'']*)''.*', '\1');

    -- Insert or update the salesorders_read table
    INSERT INTO public.salesorders_read (
        order_id, product, price, quantity, ship_to, payment_method,
        order_date, address, store_name, store_address, sales_rep_name
    ) VALUES (
        order_id, product, price, quantity, ship_to, payment_method,
        order_date, address, store_name, store_address, sales_rep_name
    )
    ON CONFLICT (order_id) DO UPDATE SET
        product = EXCLUDED.product,
        price = EXCLUDED.price,
        quantity = EXCLUDED.quantity,
        ship_to = EXCLUDED.ship_to,
        payment_method = EXCLUDED.payment_method,
        order_date = EXCLUDED.order_date,
        address = EXCLUDED.address,
        store_name = EXCLUDED.store_name,
        store_address = EXCLUDED.store_address,
        sales_rep_name = EXCLUDED.sales_rep_name;

EXCEPTION
    WHEN others THEN
        RAISE NOTICE 'Error processing order %: %', order_id, SQLERRM;
END;
$$;

CREATE TABLE IF NOT EXISTS public.salesorders_fraud
(
    order_id integer,
    product character varying(255) COLLATE pg_catalog."default",
    price numeric(10,2),
    quantity integer,
    ship_to character varying(50) COLLATE pg_catalog."default",
    payment_method character varying(50) COLLATE pg_catalog."default",
    order_date date,
    address character varying(50) COLLATE pg_catalog."default",
    store_name character varying(50) COLLATE pg_catalog."default",
    store_address character varying(50) COLLATE pg_catalog."default",
    sales_rep_name character varying(50) COLLATE pg_catalog."default",
    CONSTRAINT salesorders_fraud_pkey PRIMARY KEY (order_id)
);

Save the above logic to bootstrap.sql

1
docker exec -i postgres psql -U postgres postgres < bootstrapsql 

Spring Boot Application

You’ll need a Spring Boot application that generates sales order data in a specific format (matching the regular expressions in the PostgreSQL stored procedure) and sends it as JSON to a RabbitMQ queue.

Demo Application you can leverage: Sales Order Generator

  1. Clone the repository:
    1
    2
    
    git clone https://github.com/cfkubo/spring-boot-random-data-generator
    cd random-data-generator
    
  2. Build the project:
    1
    
    mvn clean install
    
  3. Configure RabbitMQ: Update the application.properties file with your RabbitMQ connection details.

  4. Run the application:
    1
    
    mvn spring-boot:run
    

Usage

The application will generate random sales orders and send them to RabbitMQ. sThe application also logs the details of each generated order.

RabbitMQ Screenshot

Example Sales Order Payload (as a String that will be embedded in JSON):

1
product='Laptop', price=1200.50, quantity=1, shipTo='Home', paymentMethod='Credit Card', orderDate=2025-03-27, address='123 Main St', storeName='Tech Store', storeAddress='456 Oak Ave', salesRepName='John Doe'

The Spring Boot application flow:

  • Establish a connection to RabbitMQ.
  • Define a queue (e.g., salesOrderQuorumQueue).
  • Generate sales order data periodically or on demand.
  • Convert the sales order data into a JSON payload (where the above string is likely a value of a field).
  • Publish the JSON payload to the RabbitMQ queue.

Spring Cloud Data Flow(SCDF)

Spring Cloud Data Flow is provides an dashabord and shell to deploy the data pipelines. You can leverge the below file or download the never versions if needed.

Download SCDF Server, Skipper and Shell

1
2
3
4
5
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.5/spring-cloud-dataflow-server-2.11.5.jar

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.11.5/spring-cloud-dataflow-shell-2.11.5.jar

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.11.5/spring-cloud-skipper-server-2.11.5.jar

Run SCDF Skipper

1
java -jar spring-cloud-skipper-server-2.11.5.jar

Run SCDF Server

1
java -jar spring-cloud-dataflow-server-2.11.5.jar

Run SCDF Shell

1
java -jar spring-cloud-dataflow-shell-2.11.5.jar

Access SCDF Server Dashboard

1
http://localhost:9393/dashboard

Import the source and sink applicaiton

scdfimportapps

scdfapps

You will define two Spring Cloud Data Flow pipelines:

RabbitMQ to PostgreSQL

This pipeline reads from the RabbitMQ queue and writes to the salesorders table in PostgreSQL.

  • rabbit (Source): Configured to connect to your RabbitMQ instance and consume messages from the salesOrderQuorumQueue.
  • jdbc (Sink): Configured to connect to your PostgreSQL database and insert the received JSON payload into the public.salesorders table’s payload column.
  • Register the necessary Spring Cloud Stream applications (RabbitMQ Binder and JDBC Sink) in your Spring Cloud Data Flow server.
1
insert-to-pg=rabbit --queues=salesOrderQuorumQueue --port=5672 --publisher-confirm-type=CORRELATED  | jdbc --password=postgres --username=postgres --url="jdbc:postgresql://localhost:5432/postgres" --table-name="public.salesorders"

scdfstoretopg

PostgreSQL CDC to GemFire

This pipeline captures changes from the salesorders_read table in PostgreSQL and writes them to GemFire.

  • cdc-debezium (Source): A Debezium source connector configured to:
  • Connect to the PostgreSQL database.
  • Monitor the public.salesorders_read table for changes.
  • Use the specified database credentials and connection details.
  • Enable flattening of the Debezium message payload.
  • geode (Sink): A GemFire sink connector configured to:
  • Connect to the GemFire locator at localhost:10334.
  • Write data to the “orders” region.
  • Use the order_id field from the Debezium payload as the key in the GemFire region.
  • Send the entire payload as a JSON value.
  • Register the necessary Spring Cloud Stream applications (Debezium Source and GemFire Sink) in your Spring Cloud Data Flow server.
1
cdc-fruad-geode=cdc-debezium --cdc.name=postgres-connector --cdc.config.database.dbname=postgres --connector=postgres --cdc.config.database.server.name=my-app-connector --cdc.config.database.user=postgres --cdc.config.database.password=postgres --cdc.config.database.hostname=localhost --cdc.config.database.port=5432 --cdc.flattening.enabled="true" --cdc.config.schema.include.list=public --cdc.config.table.include.list="public.salesorders_read" | geode --host-addresses=localhost:10334 --region-name=orders --key-expression="payload.getField('order_id')" --json="true"

scdfstoretogem

Configuring GemFire

Connect to the GemFire Locator using gfsh:

1
docker exec -it gf-locator gfsh
1
connect

Create the “orders” Region:

1
create region --name=orders --type=PARTITION

Running the Workflow

  • Start the Spring Boot Sales Order Generator application. This will begin publishing sales order messages to the RabbitMQ queue.
  • Deploy the “RabbitMQ to PostgreSQL” Spring Cloud Data Flow stream. This will start consuming messages and inserting data into the salesorders table.
  • Observe PostgreSQL: As data is inserted into salesorders, the sales_order_update_trigger will fire, calling the process_sales_order stored procedure. This will parse the payload and populate the salesorders_read table.
  • Deploy the “PostgreSQL CDC to GemFire” Spring Cloud Data Flow stream. This will connect to PostgreSQL via Debezium and start capturing changes in the salesorders_read table.
  • Observe GemFire: As changes occur in salesorders_read, Debezium will emit change events, and the GemFire sink will write these events to the “orders” region in GemFire.

Verifying the Data in GemFire

You can use gfsh to query the “orders” region in GemFire:

Connect to the GemFire Locator:

1
docker exec -it gf-locator gfsh
1
connect

Query the “orders” Region:

1
query --query="select * from /orders"

You should see the sales order data (as JSON) that originated from your Spring Boot application, flowed through RabbitMQ and PostgreSQL, and was captured by Debezium and written to GemFire.

Conclusion

This workflow demonstrates a powerful and scalable approach to real-time data processing. By combining Spring Boot for application logic, RabbitMQ for asynchronous communication, Spring Cloud Data Flow for building data pipelines, PostgreSQL for persistent storage and data transformation, Debezium for change data capture, and GemFire for a high-performance in-memory data grid, you can build responsive and data-driven applications. This architecture allows for decoupling of services, efficient data transformation, and real-time data availability for downstream consumers.

Reference

This post is licensed under CC BY 4.0 by the author.