TIL - Redis 및 Kafka 사용 시 DB와의 분산트랜잭션 문제 해결하기

2026. 5. 2. 22:50·내배캠

Redis 사용 시 분산트랜잭션 문제

문제상황1)

Redis는 스프링 프레임워크의 트랜잭션에 영향을 받지 않는다.
따라서, 서비스레이어의 하나의 메서드 안에서 DB에 저장을 하고,
Redis에도 저장을 해야할 때 분산트랜잭션 문제가 발생할 수 있다.

해결법1)

이 경우엔 ApplicationEventPublisher를 활용하는 방법이 있다.

ApplicationEventPublisher란?
스프링 내부에서 이벤트를 처리하는 라이브러리이다.

import org.springframework.context.ApplicationEventPublisher;

그래서 스프링 내부에서 발행하고 수신되는 이벤트를 관리한다.
이 Redis 사용중 분산트랜잭션 문제는 DB에 저장하고나서, 이 스프링 내부 이벤트를 발행하고
커밋이 완료된 후에 수신해서 Redis에 데이터를 저장하면 된다.

구현 예시)

1. 참가자들의 랭킹을 기록하고 나서 스프링 내부 이벤트를 발행

// 대회 종료 이벤트 수신 시 최종 순위를 DB에 저장하고, 커밋 후 Redis를 동기화하는 메서드
    @Override
    @Transactional
    public void finalizeRankingsWithValuations(
            UUID competitionId,
            List<TotalAssetCalcuatedEvent.ParticipantTotalAsset> valuations
    ) {
        if (valuations.isEmpty()) {
            return;
        }

        long totalCount = valuations.size();

        // 1. valuations를 자산 내림차순 정렬 후 DB에 직접 저장 (Redis 거치지 않음)
        // Redis는 진행 중 전광판용이고, 최종 결과의 원천은 자산 서비스가 준 확정 데이터
        List<TotalAssetCalcuatedEvent.ParticipantTotalAsset> sorted = valuations.stream()
                .sorted((a, b) -> Double.compare(b.totalAsset(), a.totalAsset()))
                .toList();

        // 참가자마다 랭킹을 기록하고 isFinalized = true로 저장
        for (int i = 0; i < sorted.size(); i++) {
            var v = sorted.get(i);
            // 랭킹 데이터를 생성
            Ranking ranking = Ranking.createRanking(competitionId, v.userId());
            // 최중 순위를 기록하고, isFinalized = true로 변경
            ranking.finalize(RankTier.from((long) i + 1, totalCount)); // 1-based rank
            rankingRepository.save(ranking);
        }

        // 2. DB 커밋 완료 후 Redis를 최종값으로 동기화 (AFTER_COMMIT 리스너가 처리)
        applicationEventPublisher.publishEvent(new RankingFinalizedEvent(competitionId, valuations));
    }

finalizeRankingsWithValuations 메서드 마지막 줄에서, 스프링 내부 이벤트를 발행하고 있다.
applicationEventPublisher.publishEvent()의 파라미터에 들어가는 값이 어떤 이벤트인지를 구분한다.
따라서 리스너에서도 해당 파라미터를 받는다면 이 트랜잭션이 커밋되면 그 리스너가 실행된다.

스프링 내부 이벤트도 하나의 이벤트를 여러 리스너가 동시에 받을 수 있다.
( 파라미터만 같은걸 받는다면 )

 

2. 커밋이 완료될 때 까지 기다렸다가 커밋이 완료되면 Redis 저장 메서드 실행

package io.antcamp.rankingservice.application.event;

import io.antcamp.rankingservice.domain.repository.RankingRedisRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

@Slf4j
@Component
@RequiredArgsConstructor
public class RankingEventListener {

    private final RankingRedisRepository rankingRedisRepository;

    /**
     * DB 커밋 완료 후 Redis를 최종 순위값으로 동기화한다.
     * DB가 원천 데이터이고, Redis는 DB 상태를 따라가는 팔로워.
     */
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onRankingFinalized(RankingFinalizedEvent event) {
        log.info("[Redis] 최종 랭킹 동기화 시작. competitionId={}, count={}",
                event.competitionId(), event.valuations().size());

        event.valuations().forEach(v ->
                rankingRedisRepository.upsertScore(event.competitionId(), v.userId(), v.totalAsset())
        );

        log.info("[Redis] 최종 랭킹 동기화 완료. competitionId={}", event.competitionId());
    }
}

지금 finalizeRankingsWithValuations 메서드에서 발행한 이벤트의 파라미터랑 동일하게
RankingFinalizedEvent를 받는 메서드를 정의하고
@TransactionalEventListener() 를 붙인다.

그리고 속성값으로 phase = TransactionalPhase.AFTER_COMMIT 를 붙여서
해당 트랜잭션이 끝날 떄 실행되게 설정하면 된다.

문제상황2)

랭킹을 조회하고 db에 랭킹을 저장하는 과정에서
막상 트랜잭션이 처리되는 db에 저장하는 로직은 redis 조회 이후에 있어서
DB의 점유시간이 길어지는 문제이다.

따라서 Redis의 조회 메서드를 트랜잭션 밖으로 뺴야하는 상황이다.

// 대회 종료 시 최종 랭킹을 DB에 저장하는 메서드(DB에 관리자가 수동으로 저장할 때 사용)
@Override
@Transactional
public void finalizeRankings(UUID competitionId) {
    long totalCount = rankingRedisRepository.getTotalCount(competitionId);
    if (totalCount == 0) {
        return;
    }

    // RankingEntry는 유저id, 총자산, 순위로 구성, 전체 참가자의 랭킹 목록 조회
    List<RankingRedisRepository.RankingEntry> all =
            rankingRedisRepository.getTopRankings(competitionId, 0, totalCount);

    // 참가자마다 최종 순위를 기록하고 isFinalized = true로 저장
    all.forEach(entry -> {
        Ranking ranking = Ranking.createRanking(competitionId, entry.userId());
        ranking.finalize(RankTier.from(entry.rank(), totalCount));
        rankingRepository.save(ranking);
    });
}

 

해결법1) DB에 저장할 떄만 트랜잭션이 설정되게 바꾼다.

TransactionTemplate을 사용해서 해당 메서드가 실행될 떄만 DB를 점유하도록 설정하는 방법입니다.

 

코드 예시)

@RequiredArgsConstructor
public class RankingServiceImpl implements RankingService {

    private final TransactionTemplate transactionTemplate; // 주입 추가

    // @Transactional 제거
    public void finalizeRankings(UUID competitionId) {
        // 1. Redis I/O — 트랜잭션 밖 (커넥션 점유 없음)
        long totalCount = rankingRedisRepository.getTotalCount(competitionId);
        if (totalCount == 0) return;
        List<RankingRedisRepository.RankingEntry> all =
                rankingRedisRepository.getTopRankings(competitionId, 0, totalCount);

        // 2. DB 저장만 트랜잭션으로 감쌈
        transactionTemplate.executeWithoutResult(status ->
            all.forEach(entry -> {
                Ranking ranking = Ranking.createRanking(competitionId, entry.userId());
                ranking.finalize(RankTier.from(entry.rank(), totalCount));
                rankingRepository.save(ranking);
            })
        );
    }
}

Kafka 사용 시 분산트랜잭션 문제

마찬가지로 DB에 저장하고 카프카로 이벤트를 발행해야 하는 상황에서,
서비스레이어의 한 메서드 안에서 이벤트 발행메서드 까지 호출한다면,
이벤트는 발행됐는데 커밋이 실패하여 DB저장은 롤백될 수 있습니다.

따라서 스프링 내부 이벤트를 통해 DB저장이 커밋되고 나서 이벤트를 발행하도록 할 수 있습니다.

예시 코드)

1. 대회 신청후 DB에 참여내역을 저장하고 스프링 내부 이벤트를 발행

Service
@RequiredArgsConstructor
public class CompetitionParticipantServiceImpl implements CompetitionParticipantService {

    private final CompetitionRepository competitionRepository;
    private final CompetitionParticipantRepository competitionParticipantRepository;
    private final ApplicationEventPublisher applicationEventPublisher;

    @Transactional
    public void registerCompetition(JoinCompetitionCommand command) {
        // 1. 대회 조회 (Competition 비관적 락 먼저 획득 - 같은 대회 신청 요청을 직렬화)
        Competition competition = competitionRepository.findByIdWithLock(command.competitionId())
                .orElseThrow(() -> new BusinessException(ErrorCode.INVALID_INPUT));

        // 2. 락 획득 후 중복 신청 체크 (이 시점엔 앞선 트랜잭션이 이미 commit된 상태)
        competitionParticipantRepository.findByUserIdAndCompetitionId(command.userId(), command.competitionId())
                .ifPresent(p -> {
                    throw new BusinessException(ErrorCode.INVALID_INPUT);
                });

        competition.register();
        competitionRepository.save(competition);

        // 3. 대회 참여자 저장
        CompetitionParticipant participant = CompetitionParticipant.create(
                command.userId(),
                command.nickname(),
                command.competitionId()
        );
        competitionParticipantRepository.save(participant);

        // 4. Spring 내부 이벤트 발행 → DB 커밋 완료 후 리스너가 Kafka로 전달
        applicationEventPublisher.publishEvent(new CompetitionRegisteredEvent(
                competition.getCompetitionId(),
                competition.getName(),
                competition.getType().name(),
                competition.getFirstSeed(),
                command.userId()
        ));
    }
}

 

2. 스프링 내부 이벤트를 수신하는 리스너 클래스

package io.antcamp.competitionservice.application.event;

import io.antcamp.competitionservice.domain.event.CompetitionCancelledEvent;
import io.antcamp.competitionservice.domain.event.CompetitionRegisteredEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

/**
 * 대회 참가자 관련 이벤트를 수신하여 Kafka로 발행하는 리스너.
 * DB 커밋 완료 후에만 Kafka 발행을 보장한다.
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class CompetitionParticipantEventListener {

    private final CompetitionEventProducer competitionEventProducer;

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onCompetitionRegistered(CompetitionRegisteredEvent event) {
        competitionEventProducer.publishCompetitionRegistered(event);
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onCompetitionCancelled(CompetitionCancelledEvent event) {
        competitionEventProducer.publishCompetitionCancelled(event);
    }
}

처리해야하는 스프링 내부 이벤트를 정의하고 해당 이벤트를 감지하면 카프카 이벤트가 발행됩니다.

 

 

'내배캠' 카테고리의 다른 글

TIL - 코드리뷰 피드백 정리  (0) 2026.05.07
TIL - 대회/랭킹 도메인 흐름 정리  (0) 2026.05.06
전자기기로 이해하는 헥사고날 아키텍처  (1) 2026.04.30
TIL - 공통 모듈의 ErrorCode, 어떻게 관리해야 할까  (0) 2026.04.26
TIL - 생성자에 private과 protected, 언제 어떻게 쓸까  (0) 2026.04.24
'내배캠' 카테고리의 다른 글
  • TIL - 코드리뷰 피드백 정리
  • TIL - 대회/랭킹 도메인 흐름 정리
  • 전자기기로 이해하는 헥사고날 아키텍처
  • TIL - 공통 모듈의 ErrorCode, 어떻게 관리해야 할까
MvA
MvA
백엔드 개발자 김재현입니다. 주로 공부하면서 느낀점을 기록합니다.
  • MvA
    Man vs Ai
    MvA
  • 전체
    오늘
    어제
    • 분류 전체보기 (94)
      • Java (6)
      • Python (8)
        • 딥러닝 (1)
        • 머신러닝 (7)
      • JavaScript (2)
      • 내배캠 (60)
      • 개인 프로젝트 (11)
      • 책 후기 (5)
      • 기타 (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    내일배움캠프
    딥러닝
    아키텍처
    배포
    Riot API
    머신러닝
    TiL
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.4
MvA
TIL - Redis 및 Kafka 사용 시 DB와의 분산트랜잭션 문제 해결하기
상단으로

티스토리툴바