> Hello World !!!

     

@syaku

Spring RabbitMQ and retry and Dead Letter Queue

 

Github: https://github.com/syakuis/spring-rabbitmq

RabbitMQ for Docker

version: '3.7'
services:
  rabbitmq:
    image: rabbitmq:management
    container_name: spring-rabbimq
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: "admin"
      RABBITMQ_DEFAULT_PASS: "1234"

RabbitMQ UI Admin : http://localhost:15672

Spring boot Exchange와 Queue 자동 생성 이해

스프링 부트가 구동될때 exchange와 queue 설정에 따라 자동으로 생성되지 않는 다.

자동 생성 시점은 메시지가 발행이 될때와 메시지를 구독하기 위해 RabbitMQ 서버에 연결될때 생성된다.

아래 설정은 자동 생성 여부를 설정하기 위한 옵션이고 기본 값은 true 이다.

spring.rabbitmq.dynamic=true

메시지 발행을 담당하는 서비스가 구동될때 exchange나 queue가 없더라도 오류가 발생되지 않는 다.

메시지가 발행될때 자동으로 생성되기 때문에 검증하지 않는 것 같다.

하지만 메시지 구독을 담당하는 서비스가 구동될때 exchange나 queue가 없으면 오류가 발생된다.

exchange와 queue를 자동 생성하려면 아래 3가지를 참고한다.

1) 메시지 구독 서비스는 자동 생성을 끄고 메시지 발행 서비스에서만 자동 생성한다.

메시지 구독시 아래와 같이 queue name 만 설정하면 된다.

@RabbitListener("queue.consumer-server.push")

2) 발행과 구독 서비스 모두 자동 생성하려면 양쪽 동일한 설정을 구성해야 한다.
한쪽이라도 다르면 오류가 발생되니 주의한다. 메시지 구독 서비스에서 exchange와 queue 설정은 @RabbitListener 어노테이션 혹은 SimpleRabbitListenerContainerFactory 클래스에서 설정할 수 있다.

예시)

@RabbitListener(
    bindings = @QueueBinding(
        exchange = @Exchange(value = NotifyMessageProperties.EXCHANGE_NAME, type = ExchangeTypes.FANOUT),
        value = @Queue(value = NotifyMessageProperties.QUEUE_NAME
            , arguments = @Argument(name = "x-dead-letter-exchange", value = NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME))
    )
        , containerFactory = "listenerContainer"
)

3) 자동 생성을 모두 끄고 직접 생성한다.

Retry and Dead Letter Queue Strategy

구독한 메시지 처리시 예외가 발생하면 반복적으로 구독하기 때문에 다음 메시지를 구독할 수 없는 문제가 발생한다. 그래서 메시지 구독시 예외가 발생하면 몇번의 재시도 후 메시지를 버리지 않고 임시 큐에 보관하는 전략 구성할 수 있다. 임시 큐에 쌓인 메시지는 직접 처리해야 하므로 알림을 받도록 개발해주면 좋다.

그리고 메시지 구독을 담당하는 서비스와 임시 큐를 구독하는 서비스를 분리하여 운영 서비스에 영향이 가지 않도록 하는 것도 좋을 것 같다.

구독이 최종적으로 실패하면 서비스 재시작 후 다시 구독을 시작한다.

발행 서비스 Exchange와 Queue 설정

일반적인 exchange와 queue 생성

@Slf4j
@RequiredArgsConstructor
@Configuration
public class NotifyRabbitConfiguration {
    private final MessageConverter messageConverter;

    @Bean
    FanoutExchange notifyExchange() {
        return ExchangeBuilder
            .fanoutExchange(NotifyMessageProperties.EXCHANGE_NAME)
            .build();
    }

    @Bean
    Queue notifyQueue() {
        return QueueBuilder.durable(NotifyMessageProperties.QUEUE_NAME)
            .deadLetterExchange(NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME)
            .build();
    }

    @Bean
    Binding notifyBinding() {
        return BindingBuilder.bind(notifyQueue()).to(notifyExchange());
    }

    @Bean
    AmqpTemplate notifyRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);

        rabbitTemplate.setExchange(NotifyMessageProperties.EXCHANGE_NAME);
        return rabbitTemplate;
    }
}

실패시 임시로 보관하기 위한 exchange와 queue

@Slf4j
@RequiredArgsConstructor
@Configuration
public class DeadLetterNotifyRabbitConfiguration {

    @Bean
    FanoutExchange deadLetterNotifyExchange() {
        return ExchangeBuilder
            .fanoutExchange(NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME)
            .build();
    }

    @Bean
    Queue deadLetterNotifyQueue() {
        return QueueBuilder
            .durable(NotifyMessageProperties.DEAD_LETTER_QUEUE_NAME)
            .build();
    }

    @Bean
    Binding deadLetterNotifyBinding() {
        return BindingBuilder.bind(deadLetterNotifyQueue()).to(deadLetterNotifyExchange());
    }
}

구독 서비스의 Exchange와 Queue 설정

재시도 설정

@RequiredArgsConstructor
@Configuration
public class NotifyRabbitConfiguration {
    @Bean
    SimpleRabbitListenerContainerFactory listenerContainer(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
        container.setConnectionFactory(connectionFactory);
        MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer();
        container.setAdviceChain(RetryInterceptorBuilder.stateless()
            .maxAttempts(3)
            .backOffOptions(Duration.ofSeconds(3L).toMillis(), 2, Duration.ofSeconds(10L).toMillis())
            .recoverer(messageRecoverer)
            .build());

        return container;
    }

    @Bean
    SimpleRabbitListenerContainerFactory deadListenerContainer(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

만약 프로퍼티로 한다면 아래와 같다. RejectAndDontRequeueRecoverer 클래스는 기본적으로 설정된다.

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 3s
          max-attempts: 3
          max-interval: 10s
          multiplier: 2

재시도 옵션 설명

initial-interval : 최초 재시도 간격 시간
max-interval : 재시도 간격 시간
max-attempts : 재시도 최대 횟수
multiplier : 재시도 간격 시간을 점진적으로 높인다. initial-interval * multiplier 계산되며 max-interval 값보다 높으면 max-interval 값을 사용한다.

구독하기 위한 선언

// 일반적인 구독
@RabbitListener(
      bindings = @QueueBinding(
        exchange = @Exchange(value = NotifyMessageProperties.EXCHANGE_NAME, type = ExchangeTypes.FANOUT),
        value = @Queue(value = NotifyMessageProperties.QUEUE_NAME
            , arguments = @Argument(name = "x-dead-letter-exchange", value = NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME))
    )
    , containerFactory = "listenerContainer"
)

// 보관된 메시지 구독, 메시지가 유실되지 않도록 ackMode를 사용한다.
@RabbitListener(
    bindings = @QueueBinding(
        exchange = @Exchange(value = NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME, type = ExchangeTypes.FANOUT),
        value = @Queue(value = NotifyMessageProperties.DEAD_LETTER_QUEUE_NAME)
    )
    , ackMode = "MANUAL"
    , containerFactory = "listenerContainer"
)

참고