부트캠프/Dev

스프링으로 RabbitMQ를 사용해보자

nameless1004 2024. 9. 27. 15:41

세팅

gradle

    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'

위 두 라이브러리를 추가해주자.

properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

프로퍼티즈도 위와 같이 설정해주자

Config

Config 설정을 따로 해주자.

@Configuration
public class RabbitMqConfig {

    @Value("${rabbitmq.queue.name}")
    private String queuename;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    @Bean
    public Queue queue() {
        return new Queue(queuename, true);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter()); // 메시지 컨버터 설정
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }
}

도커

docker pull rabbitmq // 도커 이미지 다운로드
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq // 이미지 실행
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management // 관리자 페이지 접속 가능하게

도커 허브에서 pull을 해준 후 http://localhost:15672로 들어가면 관리자 페이지로 들어갈 수 있다. 안들어가지면 위에 접속 가능하게 해주는 명령어를 입력해주자. 기본 아이디 비밀번호는 guest이다.

이로써 기본 세팅은 끝났다. 이제 계속 구현해보자.

스프링

먼저 엔티티와 레포지토리를 만들어 놓자.

@Entity
@Getter
@NoArgsConstructor
@Slf4j
public class Ticket {

    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String name;

    private Long quantity;

    public Ticket(Create dto) {
        this.name = dto.name();
        this.quantity = dto.quantity();
    }

    public void decrease(Long quantity){
        long remain = this.quantity - quantity;
        if(remain < 0) {
            throw new IllegalArgumentException("남은 수량 부족");
        }

        this.quantity = remain;
        log.info("남은 수량: {}",this.quantity);
    }

    public void updateQuantity(Long quantity){
        this.quantity = quantity;
    }

    public void increase(Long quantity) {
        this.quantity += quantity;
    }
}
public interface TicketRepository extends JpaRepository<Ticket, Long> {
}

그 다음 프로듀서와 컨슈머를 구현해보자.

프로듀서

@Service
@RequiredArgsConstructor
public class TicketProducer {

    private final RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    public  void sendTicketRequest(TicketDto dto) {
        rabbitTemplate.convertAndSend(exchangeName, routingKey, dto);
    }
}


컨슈머

@Service
@Getter
@RequiredArgsConstructor
@Transactional
@Slf4j
public class TicketConsumer {

    private final TicketRepository ticketRepository;
    
    @RabbitListener(queues = "ticketingQueue", concurrency = "1")
    public void processTicketRequest(TicketDto ticketDto) {

        try {
            log.info("메세지 처리 중");
            Ticket ticket = ticketRepository.findById(ticketDto.getTicketId())
                .orElseThrow(() -> new IllegalArgumentException("Ticket not found"));
            log.info("Remain : {}", ticket.getQuantity());
            if(ticket.getQuantity() - ticketDto.getQuantity() < 0) {
                log.info("수량 부족 !!!!!");
                return;
            }
            ticket.decrease(ticketDto.getQuantity());
            ticketRepository.save(ticket);
        } catch (Exception e) {
            throw new AmqpRejectAndDontRequeueException(e);
        }
    }
}

스프링에서 큐에 넣는 것은 rabbitTemplate를 사용해서 넣으면 된다. 그리고 컨슈머쪽에서는 RabbitListener 어노테이션을 사용해서 메세지를 받을 수 있다. @SpringBootApplication 있는 부분에 꼭 @EnableRabbit을 달아주자.

컨슈머 부분에서 try - catch 부분에 throw new AmqpRejectAndDontRequeueException 부분이 있는데 이 부분은 실패할 시 무한 Re-try에 걸려서 처리해준 부분이다. 어노테이션 부분에 concurrency가 있는데 이 부분은 동시에 몇개를 처리할건 이다. 만약 concurrency = "2" 이렇게 한다면 즉, 2이상을 입력하면 동시성 처리를 해줘야한다.

이어서 테스트 컨트롤러 부분을 작성해보자.

@RestController
@RequiredArgsConstructor
public class TicketController {

    private final TicketService ticketService;
    private final TicketProducer producer;

    @PostMapping("/publish")
    public ResponseEntity<Void> setup(@RequestBody Create request) {
        ticketService.publishTicket(request);
        return ResponseEntity.ok().build();
    }

    @PatchMapping("/increase")
    public ResponseEntity<Void> increase(@RequestBody Add request) {
        ticketService.increateQuantity(request);
        return ResponseEntity.ok().build();
    }

    @GetMapping("/test")
    public void test(@RequestBody Test request) {
        ExecutorService executorService = Executors.newFixedThreadPool(request.concurrent().intValue());

        for (int i = 0; i < request.concurrent(); i++) {
            executorService.submit(() -> {
                producer.sendTicketRequest(new TicketDto(request.id(), request.quantity()));
            });
        }
    }

}

서비스 부분

@Service
@RequiredArgsConstructor
@Transactional
public class TicketService {

    private final TicketRepository ticketRepository;

    public void publishTicket(Create request) {
        Ticket ticket = new Ticket(request);
        ticketRepository.save(ticket);
    }

    public void increateQuantity(Add request) {
        Ticket ticket = ticketRepository.findById(request.id())
            .orElseThrow(() -> new IllegalArgumentException("존재하지 않는 쿠폰입니다."));
        ticket.increase(request.quantity());
    }

}

 

RequestDto들

public sealed interface TicketerRequest permits Create, Add, Test
{
    record Create(String name, Long quantity) implements TicketerRequest{}
    record Add(Long id, Long quantity) implements TicketerRequest {}
    record Test(Long id, Long concurrent, Long quantity) implements TicketerRequest {}
}

다 구현했으니 이제 Postman으로 테스트를 해보자.

수량을 100개로 해놓은 다음. 80명이 동시에 티켓팅을 시켜보자.

잘 처리되는 것을 볼 수 있다! 그러면 수량이 지금 20개가 남았는데 300명이 동시에 신청하면 어떻게 될까?

남은 수량은 0개가 된 이후부터는 잘 처리가 되고 있는 것 같다!

나중에 프로젝트 할 때 티켓팅이 있다면 좀 더 깊이 공부해서 사용해도 괜찮을 것 같다. 티켓팅 외에도 파일 업로드 후 변환 작업 같은 곳들에도 쓸 수 있을 것 같다.