프로그래밍 PROGRAMMING/아키텍쳐

Event Sourcing 아키텍처 패턴 적용하기

매운할라피뇽 2026. 3. 23. 09:30
반응형
Event Sourcing 아키텍처 패턴 적용

Event Sourcing은 애플리케이션의 상태를 직접 저장하는 대신, 상태를 변경시킨 모든 이벤트를 순서대로 저장하는 아키텍처 패턴입니다. 현재 상태는 이 이벤트 시퀀스를 재생(replay)하여 언제든지 재구성할 수 있으며, 이를 통해 완전한 감사 로그, 시간 여행 디버깅, 그리고 유연한 읽기 모델 구성이 가능해집니다. 이 글에서는 Event Sourcing의 핵심 개념을 이해하고, Java 기반 예제를 통해 실제 프로젝트에 적용하는 방법을 단계별로 살펴봅니다.


Event Sourcing의 핵심 개념

전통적인 CRUD 방식에서는 레코드를 업데이트하면 이전 상태가 사라집니다. 예를 들어 주문 상태가 PENDING → PAID → SHIPPED로 바뀌면, 최종 상태인 SHIPPED만 데이터베이스에 남습니다.

Event Sourcing에서는 이 흐름 자체를 저장합니다.

  • OrderPlaced 이벤트 → 주문 생성
  • PaymentReceived 이벤트 → 결제 완료
  • OrderShipped 이벤트 → 배송 시작

이벤트는 불변(immutable) 하며, 과거를 수정하지 않고 새로운 이벤트를 추가하는 방식으로만 상태를 변경합니다. 이 특성이 Event Sourcing을 감사(audit)와 이력 추적에 강력하게 만드는 핵심입니다.
주요 구성 요소:

  1. Event — 과거에 발생한 사실을 나타내는 불변 객체
  2. Event Store — 이벤트를 순서대로 저장하는 저장소
  3. Aggregate — 이벤트를 적용하여 현재 상태를 재구성하는 도메인 객체
  4. Projection — 이벤트 스트림을 특정 읽기 모델로 변환한 결과

도메인 이벤트와 Aggregate 설계

도메인 이벤트를 정의하는 것이 설계의 출발점입니다. 이벤트 이름은 항상 과거형 동사로 작성하여 "이미 발생한 사실"임을 명확히 합니다.

// 이벤트 마커 인터페이스
public interface DomainEvent {
    String aggregateId();
    Instant occurredAt();
}

// 주문 생성 이벤트
public record OrderPlaced(
    String aggregateId,
    String customerId,
    List<String> productIds,
    Instant occurredAt
) implements DomainEvent {}

// 결제 완료 이벤트
public record PaymentReceived(
    String aggregateId,
    BigDecimal amount,
    Instant occurredAt
) implements DomainEvent {}

다음은 이벤트를 누적하여 상태를 재구성하는 Order Aggregate입니다. 현재 상태는 파생된 결과물이므로, 비즈니스 로직은 이벤트를 발행하는 메서드에, 상태 변경은 apply() 메서드에 분리합니다.

public class Order {
    private String id;
    private String customerId;
    private OrderStatus status;
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // 정적 팩토리: 새 주문 생성 (이벤트 발행)
    public static Order place(String orderId, String customerId, List<String> productIds) {
        Order order = new Order();
        order.applyAndRecord(new OrderPlaced(orderId, customerId, productIds, Instant.now()));
        return order;
    }

    // 결제 처리 (이벤트 발행)
    public void receivePayment(BigDecimal amount) {
        if (this.status != OrderStatus.PENDING) {
            throw new IllegalStateException("결제 가능한 상태가 아닙니다: " + this.status);
        }
        applyAndRecord(new PaymentReceived(this.id, amount, Instant.now()));
    }

    // 이벤트 저장소에서 상태를 재구성할 때 사용
    public static Order reconstitute(List<DomainEvent> events) {
        Order order = new Order();
        events.forEach(order::apply);
        return order;
    }

    private void applyAndRecord(DomainEvent event) {
        apply(event);
        uncommittedEvents.add(event);
    }

    // 이벤트 타입별 상태 적용 (패턴 매칭 활용)
    private void apply(DomainEvent event) {
        switch (event) {
            case OrderPlaced e -> {
                this.id = e.aggregateId();
                this.customerId = e.customerId();
                this.status = OrderStatus.PENDING;
            }
            case PaymentReceived e -> this.status = OrderStatus.PAID;
            default -> { /* 알 수 없는 이벤트는 무시 */ }
        }
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }
}

Event Store 구현과 Projection

Event Store는 이벤트 스트림을 aggregateId 기준으로 조회하고 저장하는 역할을 합니다. 운영 환경에서는 EventStoreDB 또는 PostgreSQL의 JSONB 컬럼을 활용하는 방식이 널리 사용됩니다. 아래는 인메모리 기반의 간단한 구현 예시입니다.

@Repository
public class InMemoryEventStore {
    // aggregateId → 이벤트 리스트
    private final Map<String, List<DomainEvent>> store = new ConcurrentHashMap<>();

    public void append(String aggregateId, List<DomainEvent> events) {
        store.computeIfAbsent(aggregateId, k -> new ArrayList<>()).addAll(events);
    }

    public List<DomainEvent> load(String aggregateId) {
        return store.getOrDefault(aggregateId, List.of());
    }
}

// Order 저장소: 이벤트를 저장하고 재구성
@Service
public class OrderRepository {
    private final InMemoryEventStore eventStore;

    public void save(Order order) {
        List<DomainEvent> events = order.getUncommittedEvents();
        if (!events.isEmpty()) {
            eventStore.append(events.get(0).aggregateId(), events);
        }
    }

    public Order findById(String orderId) {
        List<DomainEvent> events = eventStore.load(orderId);
        if (events.isEmpty()) throw new EntityNotFoundException(orderId);
        return Order.reconstitute(events);
    }
}

Projection은 이벤트 스트림을 읽기 전용 모델로 변환합니다. 예를 들어 주문 요약 뷰를 만들기 위해 이벤트 핸들러를 별도로 구성할 수 있습니다.

@Component
public class OrderSummaryProjection {
    private final Map<String, OrderSummary> summaries = new ConcurrentHashMap<>();

    @EventHandler
    public void on(OrderPlaced event) {
        summaries.put(event.aggregateId(),
            new OrderSummary(event.aggregateId(), event.customerId(), "PENDING"));
    }

    @EventHandler
    public void on(PaymentReceived event) {
        summaries.computeIfPresent(event.aggregateId(), (id, summary) ->
            summary.withStatus("PAID"));
    }

    public Optional<OrderSummary> findById(String id) {
        return Optional.ofNullable(summaries.get(id));
    }
}

실행 흐름을 정리하면 다음과 같습니다.

[사용자 요청]
    ↓ Command
[Order.place()] → OrderPlaced 이벤트 발행
    ↓ save()
[Event Store] → 이벤트 영속화
    ↓ 비동기 전파
[Projection] → 읽기 모델(OrderSummary) 업데이트
    ↓ 조회
[Query Handler] → Projection에서 즉시 응답

맺음말

Event Sourcing은 도입 초기 학습 곡선이 있지만, 이벤트 이력이 중요한 도메인(금융, 물류, 커머스 등)에서 그 가치가 두드러집니다. 핵심 설계 원칙을 정리하면 다음과 같습니다.

  • 이벤트는 불변: 한번 저장된 이벤트는 수정하지 않습니다.
  • 상태는 파생: Aggregate의 현재 상태는 이벤트 재생의 결과입니다.
  • 읽기와 쓰기 분리: Projection을 통해 쿼리에 최적화된 별도 모델을 유지합니다.

Event Sourcing은 CQRS(Command Query Responsibility Segregation) 패턴과 함께 사용될 때 시너지가 극대화됩니다. 또한 이벤트 기반 아키텍처(EDA)와 결합하면 서비스 간 느슨한 결합(loose coupling)과 높은 확장성을 동시에 달성할 수 있습니다. 처음 적용할 때는 단일 Bounded Context의 핵심 도메인에 국한하여 시작한 뒤, 점진적으로 범위를 넓혀가는 전략이 현실적입니다.

반응형