In high-volume backend services, processing points reversal / cancellation events often hits the same wall: how to catch up with backlogs without overwhelming external dependencies or corrupting ledger state.
The Solution: Batch + Bounded Parallelism + DB-level Idempotency
1) Batch Consumption
Instead of one-by-one processing, we consume messages in controlled chunks.
@KafkaListener(containerFactory = "batchListenerFactory")
public void consume(List<ConsumerRecord<String, String>> records) {
List<Event> events = parse(records);
cancellationService.process(events);
}
2) Bounded Parallel External Calls
Within a batch, fetch balance data in parallel.
PoolSize) should be tuned based on the
downstream API's rate limits and latency, not just your own CPU cores. I/O-bound tasks
usually require a larger pool than CPU-bound tasks.
// Parallel fetch and collect into a HashMap for O(1) lookup
Map<String, Balance> balanceMap = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(
Balance::getCustomerId,
b -> b,
(existing, replacement) -> replacement
));
3) Efficient Batch Insert with Idempotency
Instead of executing 100 INSERT statements, we combine them into one. We leverage
ON CONFLICT DO NOTHING to handle duplicates at the storage layer.
// Example SQL generated by the service
INSERT INTO ledger_entries (customer_id, amount, entry_type, source_id)
VALUES
('cust_01', -100, 'REVERSAL', 'tx_999'),
('cust_02', -50, 'REVERSAL', 'tx_888')
ON CONFLICT (customer_id, source_id) WHERE entry_type = 'REVERSAL'
DO NOTHING;
4) Partial Unique Index (The Safety Net)
We only want to dedupe reversal events, leaving other transaction types flexible.
CREATE UNIQUE INDEX uq_reversal_by_customer_source
ON ledger_entries (customer_id, source_transaction_id)
WHERE entry_type = 'REVERSAL';
5) Publish Only for New Rows
To prevent downstream duplicate events, we only publish messages for rows that were actually inserted.
INSERT ... RETURNING,
MySQL users might rely on ROW_COUNT() or a separate SELECT check if
batch-level granular reporting is required.
// Only publish if the ID exists in the set of successfully inserted records
Set<UUID> insertedIds = repository.batchInsert(ledgerEntries);
events.stream()
.filter(e -> insertedIds.contains(e.getId()))
.forEach(e -> producer.send(buildMsg(e)));
6) End-to-End Batching: The Kafka Producer
After the database insert, we only publish events for newly created records. To maintain high throughput, the Kafka producer should also be tuned for batching.
linger.ms
(e.g., 5-10ms) and batch.size (e.g., 32KB) to allow the producer to buffer and compress
multiple records into a single network request.
// Example: Only publish for IDs successfully inserted in this batch
Set<UUID> insertedIds = repository.batchInsert(ledgerEntries);
events.stream()
.filter(e -> insertedIds.contains(e.getId()))
.forEach(e -> producer.send(new ProducerRecord<>(topic, e.getKey(), e.payload())));
Handling Partial Failures
In a batch of 100, what if 1 event fails while 99 are valid?
- Individual Try-Catch: Wrap external calls. If one fails, exclude it from the DB batch.
- Partial DB Inserts:
ON CONFLICT DO NOTHINGallows valid rows to persist even if others are duplicates. - Dead Letter Queue (DLQ): Route only the truly unprocessable events to a DLQ.
Why This Matters
- Higher throughput during catch-up / spikes
- No duplicate deductions
- Lower DB overhead
- Safer retry behavior
- Downstream systems protected via backpressure
Summary & Practical Notes
- Backpressure: Use a Caller-Runs policy to slow down the producer when threads are saturated.
- Metrics: Monitor the ratio of "Inserted" vs "Duplicate Ignored" to detect retry storms.
- Config: Keep pool sizes and batch sizes configurable for real-time tuning.