Event Model & Demo Domain: Schema, Inference, and Synthetic Data
This report defines the event schemas used throughout the overlay, describes how we handle common distributed data challenges, and presents the pharmacy chain demo domain with its synthetic data generator plan.
---
1. Event Schemas
Our pipeline uses three representations of change data:
1.1 RawChangeEvent (source format)
This is the format emitted by the CDC ingestion component. For MySQL with Debezium, it matches Debezium’s `Envelope` structure:
json
{
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1672539120000,
"snapshot": "false",
"db": "pharmacy",
"table": "inventory",
"server_id": 0,
"gtid": null,
"file": "binlog.000003",
"pos": 12345,
"row": 0,
"ts_us": 1672539120000000
},
"op": "u", // c=insert, u=update, d=delete, r=read (snapshot)
"ts_ms": 1672539120000,
"before": { "branch_id": 1, "product_id": 100, "quantity_on_hand": 50 },
"after": { "branch_id": 1, "product_id": 100, "quantity_on_hand": 48 }
}
We accept this format on the `/ingest` endpoint. It may arrive in batches (array). Transaction boundaries are indicated by `source.gtid` or by grouping within a batch; Debezium can also send a “transaction” wrapper event.
---
1.2 CanonicalChangeEvent (normalized)
Immediately after ingestion, we transform each raw event into a stable, database‑agnostic representation:
go
type CanonicalChangeEvent struct {
EventID string // UUID v5 derived from source+txid+seq
Source SourceRef // database, table, primary key map
Operation string // INSERT, UPDATE, DELETE
Before map[string]interface{} // nil for insert
After map[string]interface{} // nil for delete
Timestamp time.Time // UTC from source
TransactionID string // source transaction identifier
SchemaVersion string // optional: e.g., "inventory:2"
Metadata map[string]string // optional debug info
}
Normalization does not reinterpret the data; it only renames fields, ensures consistent types, and generates `EventID`. The `Source.PrimaryKey` map captures the PK columns/values regardless of source naming. This canonical form insulates the rest of the system from Debezium specifics and leaves room for future sources (SQL Server, Postgres).
---
1.3 BusinessEvent (semantic)
Business events answer “what happened” in domain terms. They are inferred from one or more canonical change events using rules.
go
type BusinessEvent struct {
Type string // e.g., "inventory.adjustment"
EntityType string // e.g., "product"
EntityID string // e.g., product SKU or numeric ID
Timestamp time.Time
Payload map[string]interface{} // specifics
Confidence float64 // 0.0–1.0
SourceEventIDs []string // links to underlying CanonicalChangeEvent(s)
}
Examples:
- `inventory.adjustment`: triggered by an `inventory` table update where `quantity` changes and `adjustment_reason` is set. Payload: `{ "branch_id": 1, "old_qty": 50, "new_qty": 48, "reason": "SALE" }`.
- `sale.completed`: inferred from a group of events in one transaction: insert into `sales` plus multiple inserts into `sales_items`. Payload: `{ "sale_id": 123, "branch_id": 1, "total": 150.00, "items": [...] }`.
- `user.role_changed`: update on `users` table where `role_id` column changed. Payload: `{ "user_id": 42, "old_role": "cashier", "new_role": "manager" }`.
The inference engine is rule‑based for MVP; future work could use a lightweight ML model to detect patterns not covered by explicit rules.
---
2. Cross-Cutting Concerns
2.1 Schema Evolution
Source tables may change (add/drop columns, rename). Our canonical design uses `map[string]interface{}` for `Before` and `After`, which naturally tolerates varying columns. `SchemaVersion` (e.g., `"inventory:3"`) can be used to track the version of a table’s schema according to our overlay. When we detect a DDL event (Debezium `op: "c"` on a `*_schema` table? Actually Debezium emits separate DDL events), we increment that table’s version. Downstream inference rules should reference stable keys (PKs, business IDs) and guard against missing fields with defaults.
An alternative is to maintain a schema registry in our storage; for MVP we can skip formal versioning and just rely on map flexibility.
2.2 Transaction Boundaries
To correctly infer multi‑table events (like a sale spanning `sales` and `sales_items`), we need to know which canonical events belong to the same database transaction. The CDC source provides a transaction identifier per event:
- Debezium: `source.gtid` for MySQL GTID, or `source.file`+`source.pos` for binlog positions, grouped by `source.transaction_id` if using the `t` envelope.
This guarantees atomicity: either all events from a transaction are ingested and processed, or none.
2.3 Idempotency and Deduplication
Each canonical event gets a deterministic UUID (`EventID`) derived from the source transaction and row identity. When storing, we use `INSERT OR IGNORE` (SQLite) or `ON CONFLICT (event_id) DO NOTHING` (Postgres). This makes ingestion naturally idempotent: retrying the same event is safe.
We also store the raw transaction position to detect duplicates across restarts.
2.4 Ordering and Consistency
Within a single transaction, the source order is preserved by Debezium (events appear in the order of row modifications). Across transactions, global ordering is approximately chronological via `Timestamp`. We do not rely on strict global ordering for business logic; instead we rely on transaction grouping.
If ordering matters for a specific use case (e.g., “first sale of the day”), we can query by `Timestamp` and then by `transaction_id` to break ties.
2.5 Multi‑Table Event Correlation
Business events often involve multiple tables. Our inference engine will:
1. Collect all canonical events sharing the same `TransactionID`.
2. Group them by table and operation.
3. Apply pattern‑matching rules:
- Sale pattern: at least one insert into `sales` (operation=`INSERT`) plus one or more inserts into `sales_items`. Verify that the `sale_id` in `sales_items` matches the `sale_id` from the `sales` row (or use the PK of `sales`). Then produce one `sale.completed` BusinessEvent that aggregates items.
- Inventory adjustment may appear as a single row update in `inventory` with a non‑null `adjustment_reason`.
- User role change: update on `users` where `role_id` changed; optionally join with `roles` table to capture role name (using cached mapping).
- Purchase order receipt: insert into `po_items` accompanied by an update on `purchase_orders` status to “RECEIVED”.
These rules are implemented as Go functions that take the transaction group and emit zero or more BusinessEvents.
---
3. Pharmacy Demo Domain
3.1 Entity Model (Simplified)
We’ll model a typical pharmacy chain with the following core entities:
- Branch: physical locations; each has a `branch_id`.
- Product: items for sale; `product_id`, `sku`, `name`, `category` (OTC, prescription, cosmetic), `unit_price`, `is_controlled` (boolean).
- Inventory: per‑branch stock levels; composite PK (`branch_id`, `product_id`); columns: `quantity_on_hand`, `last_counted_date`.
- Sale: header for a customer transaction; `sale_id` (auto increment), `branch_id`, `sale_time`, `total_amount`, `payment_method` (cash, card, mobile), `cashier_user_id`.
- SaleItem: line items; `sale_id`, `product_id`, `quantity`, `unit_price`, `line_total`.
- InventoryAdjustment: manual corrections; `adjust_id`, `branch_id`, `product_id`, `old_qty`, `new_qty`, `reason_code` (`SALE`, `DAMAGE`, `THEFT`, `COUNT`, `SUPPLIER_ERROR`), `adjusted_by_user_id`, `adjusted_at`, `notes`.
- PurchaseOrder: orders to suppliers; `po_id`, `supplier_id`, `order_date`, `status` (PENDING, RECEIVED, CANCELLED).
- PurchaseOrderItem: `po_id`, `product_id`, `quantity_ordered`, `unit_cost`, `quantity_received`.
- Supplier: `supplier_id`, `name`, `contact_info`.
- User & Role: `user_id`, `username`, `role_id`, `branch_id` (for branch‑restricted roles), `is_active`.
We also include standard audit columns (`created_at`, `updated_at`, `created_by`, `updated_by`) on all master tables (Branch, Product, Inventory, Sale, etc.) to support “who changed what” queries.
All tables use InnoDB with foreign keys where appropriate (e.g., `inventory.branch_id → branch.branch_id`). The `inventory_adjustments` table does not have a foreign key to `sale` because adjustments can be independent; linking to sales is done via inference if the reason is `SALE` (the sale process itself decrements inventory via a transaction that includes both sale and adjustment events, so we can correlate by transaction).
---
3.2 SQL DDL (MySQL)
Below is a concise DDL sufficient for the MVP demo. It includes indexes for performance.
sql
CREATE DATABASE IF NOT EXISTS pharmacy;
USE pharmacy;
CREATE TABLE branches (
branch_id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
address TEXT,
city VARCHAR(50),
region VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
created_by INT,
updated_by INT
) ENGINE=InnoDB;
CREATE TABLE products (
product_id INT AUTO_INCREMENT PRIMARY KEY,
sku VARCHAR(50) UNIQUE NOT NULL,
name VARCHAR(200) NOT NULL,
category ENUM('OTC','PRESCRIPTION','COSMETIC','SUPPLEMENT') NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
is_controlled BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
created_by INT,
updated_by INT
) ENGINE=InnoDB;
CREATE TABLE inventory (
branch_id INT NOT NULL,
product_id INT NOT NULL,
quantity_on_hand INT NOT NULL DEFAULT 0,
last_counted_date DATE,
PRIMARY KEY (branch_id, product_id),
FOREIGN KEY (branch_id) REFERENCES branches(branch_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
) ENGINE=InnoDB;
CREATE TABLE users (
user_id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
role_id INT NOT NULL,
branch_id INT, -- NULL for corporate/regional roles
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
created_by INT,
updated_by INT,
FOREIGN KEY (role_id) REFERENCES roles(role_id),
FOREIGN KEY (branch_id) REFERENCES branches(branch_id)
) ENGINE=InnoDB;
CREATE TABLE roles (
role_id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL UNIQUE
) ENGINE=InnoDB;
CREATE TABLE sales (
sale_id INT AUTO_INCREMENT PRIMARY KEY,
branch_id INT NOT NULL,
sale_time DATETIME NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
payment_method ENUM('CASH','CARD','MOBILE') NOT NULL,
cashier_user_id INT NOT NULL,
FOREIGN KEY (branch_id) REFERENCES branches(branch_id),
FOREIGN KEY (cashier_user_id) REFERENCES users(user_id)
) ENGINE=InnoDB;
CREATE TABLE sale_items (
sale_id INT NOT NULL,
product_id INT NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
line_total DECIMAL(10,2) AS (quantity * unit_price) STORED,
PRIMARY KEY (sale_id, product_id),
FOREIGN KEY (sale_id) REFERENCES sales(sale_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
) ENGINE=InnoDB;
CREATE TABLE inventory_adjustments (
adjust_id INT AUTO_INCREMENT PRIMARY KEY,
branch_id INT NOT NULL,
product_id INT NOT NULL,
old_qty INT NOT NULL,
new_qty INT NOT NULL,
reason_code ENUM('SALE','DAMAGE','THEFT','COUNT','SUPPLIER_ERROR','OTHER') NOT NULL,
adjusted_by_user_id INT NOT NULL,
adjusted_at DATETIME NOT NULL,
notes TEXT,
FOREIGN KEY (branch_id) REFERENCES branches(branch_id),
FOREIGN KEY (product_id) REFERENCES products(product_id),
FOREIGN KEY (adjusted_by_user_id) REFERENCES users(user_id)
) ENGINE=InnoDB;
CREATE TABLE suppliers (
supplier_id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
contact_name VARCHAR(100),
contact_phone VARCHAR(20),
contact_email VARCHAR(100)
) ENGINE=InnoDB;
CREATE TABLE purchase_orders (
po_id INT AUTO_INCREMENT PRIMARY KEY,
supplier_id INT NOT NULL,
order_date DATE NOT NULL,
status ENUM('PENDING','RECEIVED','CANCELLED') DEFAULT 'PENDING',
created_by INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (supplier_id) REFERENCES suppliers(supplier_id)
) ENGINE=InnoDB;
CREATE TABLE po_items (
po_id INT NOT NULL,
product_id INT NOT NULL,
quantity_ordered INT NOT NULL,
unit_cost DECIMAL(10,2) NOT NULL,
quantity_received INT DEFAULT 0,
PRIMARY KEY (po_id, product_id),
FOREIGN KEY (po_id) REFERENCES purchase_orders(po_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
) ENGINE=InnoDB;
Indexes beyond PKs/FKs:
- `idx_inventory_branch_product (branch_id, product_id)` is covered by PK.
- `idx_sales_time (sale_time)` for time‑series queries.
- `idx_adjustments_branch_time (branch_id, adjusted_at)` for per‑branch anomaly scans.
---
3.3 Synthetic Data Generator Plan
We’ll produce a self‑contained generator (Go program) that writes directly to the MySQL database (using `go-sql-driver/mysql`). The generator runs in two phases:
1. Schema & Static Data:
- Execute the DDL above.
- Insert ~20 branches (named “Branch 1”, “Branch 2”, … in major Peruvian cities: Lima, Arequipa, Trujillo, Chiclayo, etc.).
- Insert ~500 products with realistic names (using a fake data library with pharmacy‑like names) and SKUs.
- Insert roles: `admin`, `manager`, `cashier`, `warehouse`.
- Insert users: 1–2 admins, 1 manager per branch, 3–5 cashiers per branch, warehouse staff.
- Insert suppliers: 10–15 local pharma suppliers.
2. Time Series Simulation:
- Choose a date range (e.g., last 90 days).
- For each day:
- Sales bursts: simulate peak hours (10–13, 17–20). Poisson process for number of transactions per branch. Each transaction picks random items (3–8), computes totals, generates `sales` + `sale_items`. This automatically triggers `inventory` decrements if we model that by updating inventory within the same transaction (in a real system the sale would deduct inventory via triggers or application logic). We'll mimic that by issuing an `UPDATE inventory SET quantity_on_hand = quantity_on_hand - :qty WHERE branch_id=:branch AND product_id=:product`. That update will be captured as an inventory adjustment with `reason_code='SALE'`.
- Purchase orders: random days have new POs; later same day or next day receive items (increase `inventory`).
- Manual adjustments: small percentage (1–2%) of inventory rows get manual adjustments for `DAMAGE`, `COUNT`, `THEFT`. We’ll inject anomalies: a few days with spikes in `THEFT` after hours; large negative adjustments without corresponding sales; sudden `quantity_on_hand` = 0 for hot products.
- User role changes: occasional promotions/demotions.
- All writes should be grouped in transactions where logical; e.g., a sale transaction includes inserts into `sales`, `sale_items`, and inventory updates. This ensures Debezium groups them under one `transaction_id`, enabling the inference engine to reconstruct sales.
The generator will be deterministic given a random seed, making demos reproducible.
Pseudocode:
func main() {
db = connect_mysql()
ensure_schema(db)
insert_static_data(db)
start_date = now() - 90 days
for day := start_date; day <= now(); day = day + 1 day {
begin_transaction(db) // for each day we may commit per generated event
for each branch {
simulate_sales(db, branch, day)
simulate_purchase_orders(db, branch, day)
simulate_inventory_adjustments(db, branch, day)
simulate_user_activity(db, branch, day)
}
commit()
if anomaly_week(day) {
inject_anomalies(db, day)
}
}
}
Implementation details: We’ll use `github.com/brianvoe/gofakeit` for names, addresses, and random choices. For performance, we’ll use bulk inserts where possible.
---
4. Event Taxonomy (BusinessEvent Types)
The MVP will recognize the following business event types:
- `sale.completed` – a full customer transaction.
- `inventory.adjustment` – any change to `inventory.quantity_on_hand`. Sub‑types: `SALE`, `DAMAGE`, `THEFT`, `COUNT`, `SUPPLIER_ERROR`, `OTHER`. The `reason_code` is in payload.
- `purchase_order.received` – when a PO status becomes `RECEIVED` and `po_items.quantity_received` increases inventory.
- `user.role_changed` – update on `users.role_id`.
- `user.created` / `user.deactivated` – insert/update on `users.is_active`.
- `product.price_changed` – update on `products.unit_price`.
- `supplier.added` – insert into `suppliers`.
This taxonomy will be expanded later, but it’s sufficient for pilot storytelling.
---
5. Conclusion
The event model cleanly separates raw ingestion, canonical representation, and business semantics. The pharmacy domain provides a rich but understandable setting for the MVP demo. Synthetic data will generate realistic daily operations plus injected anomalies for the AI layer to surface.
Next report (Report 4) will detail the “Audit Intelligence” features: daily summaries, anomaly detection, suspicious activity, variance, and traceability—tying these to the event model and AI layer.
---
Word count: ~1,300