Spring 事件驱动架构:构建松耦合的微服务系统

张开发
2026/4/16 2:34:44 15 分钟阅读

分享文章

Spring 事件驱动架构:构建松耦合的微服务系统
Spring 事件驱动架构构建松耦合的微服务系统别叫我大神叫我 Alex 就好。一、引言大家好我是 Alex。事件驱动架构EDA是构建松耦合、可扩展系统的有效方式。Spring Framework 提供了强大的事件机制而 Spring Cloud Stream 则让我们能够轻松构建基于消息的事件驱动系统。今天我想和大家分享一下如何在 Spring 中实现事件驱动架构。二、Spring 事件机制基础1. 同步事件处理Spring 的 ApplicationEvent 是最基础的事件机制// 定义事件 public class OrderCreatedEvent extends ApplicationEvent { private final Long orderId; private final Long customerId; private final BigDecimal amount; public OrderCreatedEvent(Object source, Long orderId, Long customerId, BigDecimal amount) { super(source); this.orderId orderId; this.customerId customerId; this.amount amount; } // getters... } // 发布事件 Service public class OrderService { Autowired private ApplicationEventPublisher eventPublisher; Transactional public Order createOrder(OrderRequest request) { Order order saveOrder(request); // 发布事件 eventPublisher.publishEvent(new OrderCreatedEvent( this, order.getId(), order.getCustomerId(), order.getAmount() )); return order; } } // 监听事件 Component public class OrderEventListener { EventListener public void handleOrderCreated(OrderCreatedEvent event) { // 发送通知邮件 emailService.sendOrderConfirmation(event.getOrderId()); } EventListener Async // 异步处理 public void handleOrderCreatedAsync(OrderCreatedEvent event) { // 更新统计数据 analyticsService.trackOrder(event); } }2. 异步事件处理配置Configuration EnableAsync public class AsyncEventConfig implements AsyncConfigurer { Override Bean(name eventAsyncExecutor) public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(20); executor.setQueueCapacity(1000); executor.setThreadNamePrefix(event-); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) - { log.error(Async event handler error in method: {}, method.getName(), ex); }; } }三、Spring Cloud Stream 事件驱动1. 基础配置spring: cloud: stream: bindings: order-created-out: destination: order-created-topic content-type: application/json order-created-in: destination: order-created-topic group: order-service-group consumer: max-attempts: 3 back-off-initial-interval: 1000 back-off-max-interval: 10000 kafka: binder: brokers: localhost:9092 configuration: acks: all retries: 32. 事件发布与订阅// 定义事件接口 public interface OrderEventChannels { String ORDER_CREATED_OUT order-created-out; String ORDER_CREATED_IN order-created-in; Output(ORDER_CREATED_OUT) MessageChannel orderCreatedOut(); Input(ORDER_CREATED_IN) SubscribableChannel orderCreatedIn(); } // 事件发布器 Service EnableBinding(OrderEventChannels.class) public class OrderEventPublisher { Autowired private OrderEventChannels channels; public void publishOrderCreated(Order order) { OrderCreatedEvent event OrderCreatedEvent.builder() .orderId(order.getId()) .customerId(order.getCustomerId()) .amount(order.getAmount()) .createdAt(Instant.now()) .build(); channels.orderCreatedOut().send(MessageBuilder .withPayload(event) .setHeader(event-type, OrderCreated) .setHeader(version, 1.0) .build()); } } // 事件处理器 Component EnableBinding(OrderEventChannels.class) public class OrderEventHandler { StreamListener(OrderEventChannels.ORDER_CREATED_IN) public void handleOrderCreated(Payload OrderCreatedEvent event, Header(event-type) String eventType) { log.info(Received order created event: {}, event.getOrderId()); // 处理库存扣减 inventoryService.reserve(event.getOrderId(), event.getItems()); // 发送通知 notificationService.notifyCustomer(event.getCustomerId(), 订单已创建); } }四、事件溯源与 CQRS1. 事件存储Entity Table(name event_store) public class DomainEvent { Id GeneratedValue(strategy GenerationType.IDENTITY) private Long id; private String aggregateId; private String aggregateType; private Long version; Column(name event_type) private String eventType; Column(name event_data, columnDefinition TEXT) private String eventData; Column(name occurred_on) private Instant occurredOn; // getters and setters } Repository public interface EventStoreRepository extends JpaRepositoryDomainEvent, Long { ListDomainEvent findByAggregateIdOrderByVersionAsc(String aggregateId); ListDomainEvent findByAggregateIdAndVersionGreaterThan(String aggregateId, Long version); }2. 聚合根实现public abstract class EventSourcedAggregate { private String id; private Long version 0L; private ListDomainEvent uncommittedEvents new ArrayList(); public void apply(DomainEvent event) { handleEvent(event); uncommittedEvents.add(event); version; } protected abstract void handleEvent(DomainEvent event); public ListDomainEvent getUncommittedEvents() { return new ArrayList(uncommittedEvents); } public void markCommitted() { uncommittedEvents.clear(); } } public class OrderAggregate extends EventSourcedAggregate { private OrderStatus status; private ListOrderItem items; private BigDecimal totalAmount; public static OrderAggregate create(String orderId, Long customerId, ListOrderItem items) { OrderAggregate aggregate new OrderAggregate(); aggregate.apply(new OrderCreatedEvent(orderId, customerId, items)); return aggregate; } public void pay(Payment payment) { if (status ! OrderStatus.PENDING) { throw new IllegalStateException(Order is not pending); } apply(new OrderPaidEvent(getId(), payment)); } Override protected void handleEvent(DomainEvent event) { if (event instanceof OrderCreatedEvent) { handleOrderCreated((OrderCreatedEvent) event); } else if (event instanceof OrderPaidEvent) { handleOrderPaid((OrderPaidEvent) event); } } private void handleOrderCreated(OrderCreatedEvent event) { this.status OrderStatus.PENDING; this.items event.getItems(); this.totalAmount calculateTotal(event.getItems()); } private void handleOrderPaid(OrderPaidEvent event) { this.status OrderStatus.PAID; } }五、Saga 模式实现1. Saga 编排器Component public class OrderSagaOrchestrator { Autowired private SagaEventPublisher eventPublisher; Autowired private SagaStateRepository sagaStateRepository; public void startOrderSaga(Order order) { String sagaId UUID.randomUUID().toString(); SagaState sagaState SagaState.builder() .sagaId(sagaId) .orderId(order.getId()) .status(SagaStatus.STARTED) .currentStep(0) .build(); sagaStateRepository.save(sagaState); // 第一步创建订单 eventPublisher.publish(new CreateOrderCommand(sagaId, order)); } EventListener public void onOrderCreated(OrderCreatedEvent event) { Saga

更多文章