Spring RabbitMQ and retry and Dead Letter Queue
RabbitMQ for Docker
version: '3.7'
services:
rabbitmq:
image: rabbitmq:management
container_name: spring-rabbimq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: "admin"
RABBITMQ_DEFAULT_PASS: "1234"
RabbitMQ UI Admin : http://localhost:15672
Spring boot Exchange와 Queue 자동 생성 이해
스프링 부트가 구동될때 exchange와 queue 설정에 따라 자동으로 생성되지 않는 다.
자동 생성 시점은 메시지가 발행이 될때와 메시지를 구독하기 위해 RabbitMQ 서버에 연결될때 생성된다.
아래 설정은 자동 생성 여부를 설정하기 위한 옵션이고 기본 값은 true
이다.
spring.rabbitmq.dynamic=true
메시지 발행을 담당하는 서비스가 구동될때 exchange나 queue가 없더라도 오류가 발생되지 않는 다.
메시지가 발행될때 자동으로 생성되기 때문에 검증하지 않는 것 같다.
하지만 메시지 구독을 담당하는 서비스가 구동될때 exchange나 queue가 없으면 오류가 발생된다.
exchange와 queue를 자동 생성하려면 아래 3가지를 참고한다.
1) 메시지 구독 서비스는 자동 생성을 끄고 메시지 발행 서비스에서만 자동 생성한다.
메시지 구독시 아래와 같이 queue name 만 설정하면 된다.
@RabbitListener("queue.consumer-server.push")
2) 발행과 구독 서비스 모두 자동 생성하려면 양쪽 동일한 설정을 구성해야 한다.
한쪽이라도 다르면 오류가 발생되니 주의한다. 메시지 구독 서비스에서 exchange와 queue 설정은 @RabbitListener
어노테이션 혹은 SimpleRabbitListenerContainerFactory
클래스에서 설정할 수 있다.
예시)
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = NotifyMessageProperties.EXCHANGE_NAME, type = ExchangeTypes.FANOUT),
value = @Queue(value = NotifyMessageProperties.QUEUE_NAME
, arguments = @Argument(name = "x-dead-letter-exchange", value = NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME))
)
, containerFactory = "listenerContainer"
)
3) 자동 생성을 모두 끄고 직접 생성한다.
Retry and Dead Letter Queue Strategy
구독한 메시지 처리시 예외가 발생하면 반복적으로 구독하기 때문에 다음 메시지를 구독할 수 없는 문제가 발생한다. 그래서 메시지 구독시 예외가 발생하면 몇번의 재시도 후 메시지를 버리지 않고 임시 큐에 보관하는 전략 구성할 수 있다. 임시 큐에 쌓인 메시지는 직접 처리해야 하므로 알림을 받도록 개발해주면 좋다.
그리고 메시지 구독을 담당하는 서비스와 임시 큐를 구독하는 서비스를 분리하여 운영 서비스에 영향이 가지 않도록 하는 것도 좋을 것 같다.
구독이 최종적으로 실패하면 서비스 재시작 후 다시 구독을 시작한다.
발행 서비스 Exchange와 Queue 설정
일반적인 exchange와 queue 생성
@Slf4j
@RequiredArgsConstructor
@Configuration
public class NotifyRabbitConfiguration {
private final MessageConverter messageConverter;
@Bean
FanoutExchange notifyExchange() {
return ExchangeBuilder
.fanoutExchange(NotifyMessageProperties.EXCHANGE_NAME)
.build();
}
@Bean
Queue notifyQueue() {
return QueueBuilder.durable(NotifyMessageProperties.QUEUE_NAME)
.deadLetterExchange(NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME)
.build();
}
@Bean
Binding notifyBinding() {
return BindingBuilder.bind(notifyQueue()).to(notifyExchange());
}
@Bean
AmqpTemplate notifyRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
rabbitTemplate.setExchange(NotifyMessageProperties.EXCHANGE_NAME);
return rabbitTemplate;
}
}
실패시 임시로 보관하기 위한 exchange와 queue
@Slf4j
@RequiredArgsConstructor
@Configuration
public class DeadLetterNotifyRabbitConfiguration {
@Bean
FanoutExchange deadLetterNotifyExchange() {
return ExchangeBuilder
.fanoutExchange(NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME)
.build();
}
@Bean
Queue deadLetterNotifyQueue() {
return QueueBuilder
.durable(NotifyMessageProperties.DEAD_LETTER_QUEUE_NAME)
.build();
}
@Bean
Binding deadLetterNotifyBinding() {
return BindingBuilder.bind(deadLetterNotifyQueue()).to(deadLetterNotifyExchange());
}
}
구독 서비스의 Exchange와 Queue 설정
재시도 설정
@RequiredArgsConstructor
@Configuration
public class NotifyRabbitConfiguration {
@Bean
SimpleRabbitListenerContainerFactory listenerContainer(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
container.setConnectionFactory(connectionFactory);
MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer();
container.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(Duration.ofSeconds(3L).toMillis(), 2, Duration.ofSeconds(10L).toMillis())
.recoverer(messageRecoverer)
.build());
return container;
}
@Bean
SimpleRabbitListenerContainerFactory deadListenerContainer(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
container.setConnectionFactory(connectionFactory);
return container;
}
}
만약 프로퍼티로 한다면 아래와 같다. RejectAndDontRequeueRecoverer
클래스는 기본적으로 설정된다.
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-attempts: 3
max-interval: 10s
multiplier: 2
재시도 옵션 설명
initial-interval : 최초 재시도 간격 시간
max-interval : 재시도 간격 시간
max-attempts : 재시도 최대 횟수
multiplier : 재시도 간격 시간을 점진적으로 높인다. initial-interval * multiplier 계산되며 max-interval 값보다 높으면 max-interval 값을 사용한다.
구독하기 위한 선언
// 일반적인 구독
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = NotifyMessageProperties.EXCHANGE_NAME, type = ExchangeTypes.FANOUT),
value = @Queue(value = NotifyMessageProperties.QUEUE_NAME
, arguments = @Argument(name = "x-dead-letter-exchange", value = NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME))
)
, containerFactory = "listenerContainer"
)
// 보관된 메시지 구독, 메시지가 유실되지 않도록 ackMode를 사용한다.
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = NotifyMessageProperties.DEAD_LETTER_EXCHANGE_NAME, type = ExchangeTypes.FANOUT),
value = @Queue(value = NotifyMessageProperties.DEAD_LETTER_QUEUE_NAME)
)
, ackMode = "MANUAL"
, containerFactory = "listenerContainer"
)
참고
- 기본 리스너 생성 클래스 -
AbstractRabbitListenerContainerFactoryConfigurer.configure
- ack 전략 - https://stackoverflow.com/questions/38728668/spring-rabbitmq-using-manual-channel-acknowledgement-on-a-service-with-rabbit/38730821
- https://www.techgeeknext.com/spring-boot/spring-boot-rabbitmq-retry-and-error-handling-example
- https://www.baeldung.com/spring-amqp-exponential-backoff
- HA - https://stackoverflow.com/questions/28207327/how-load-balancer-works-in-rabbitmq
'Tech' 카테고리의 다른 글
Spring Security OAuth - Authorization Server (0) | 2021.11.11 |
---|---|
Spring Security OAuth - Password 인증 방식 (0) | 2021.11.11 |
프로젝트 코드 템플릿 - Project Code Templates (0) | 2021.10.28 |
OAuth 2.0 Simplified (0) | 2021.10.18 |