how to join DB transaction restClient, kafka in transaction manner?

Summary

Issue: The system fails to ensure transactional consistency between database operations, REST client calls, and Kafka message sends. If a database save fails, the REST client call and Kafka message are not rolled back, leading to data inconsistency.

Root Cause

  • Lack of transactional boundary: The @Transactional annotation ensures database operations are rolled back on failure, but external calls (REST client, Kafka) are not managed within the same transaction.
  • Parallel stream processing: The use of parallelStream() complicates error handling, as exceptions in one thread do not automatically roll back the entire transaction.

Why This Happens in Real Systems

  • Distributed systems complexity: Transactional consistency across multiple systems (DB, REST, Kafka) is challenging due to differing transaction management mechanisms.
  • Asynchronous nature of Kafka: Kafka sends are inherently asynchronous, making them difficult to include in a synchronous transaction.

Real-World Impact

  • Data inconsistency: If a database save fails, the REST client call and Kafka message are not rolled back, leading to stale or incorrect data.
  • System unreliability: Inconsistent state can cause downstream systems to process incorrect information, affecting business operations.

Example or Code (if necessary and relevant)

@Transactional
public void syncShop(Shop shop) {
    shopRepository.lockShop(shop.getId());
    var products = shop.getProducts();
    products.forEach(product -> { // Use sequential processing
        try {
            var seller = product.getSeller();
            var sellerDetails = restClient.getLegalDetails(seller.getInn());
            var dto = new ProductDto();
            dto.setProduct(product);
            dto.setSellerDetails(sellerDetails);
            kafkaTemplate.send("product_details", "product", dto).completable().join(); // Block for Kafka send
            product.setSynced(true);
            productRepository.save(product);
        } catch (Exception e) {
            log.error("Error syncing product", e);
            throw e; // Propagate exception to roll back transaction
        }
    });
    shop.setSynced(true);
}

How Senior Engineers Fix It

  • Use outbox pattern: Persist outgoing messages (Kafka, REST) to the database within the transaction. Process them asynchronously after the transaction commits.
  • Idempotent operations: Ensure REST client calls and Kafka messages are idempotent to handle retries safely.
  • Saga pattern: Implement compensating transactions to roll back external operations if the database save fails.

Why Juniors Miss It

  • Assumption of transactional scope: Juniors often assume @Transactional covers all operations, not realizing external calls are not included.
  • Overlooking failure scenarios: Lack of experience in distributed systems leads to inadequate error handling and rollback strategies.
  • Parallel processing pitfalls: Misuse of parallelStream() without considering its impact on transaction management and error propagation.

Leave a Comment