MQTT(Message Queue Telemetry Transfort)란?
publish, subscribe 기반의 메시징 프로토콜이다. TCP/IP 프로토콜 위에서 동작하며 네트워크 대역폭이 제한되는 원격 위치와의 연결을 위해서 설계되었다. 무선 네크워크에 연결된 단순한 기기들간의 간단한 소통을 위해 만들어진 만큼 리소스를 매우 적게 요구하므로 IoT(Internet of Things, 사물인터넷) 에서 가볍에 많이 쓰이는 프로토콜이다.
MQTT라는 프로토콜을 구현한 소프트웨어가 바로 메시지 브로커 mosquitto 이다. MQTT 를 깔고 나면 프로그램의 이름이 "mosquitto"인 것을 확인할 수 있다.
회사에 Message Queue 를 이용하는 Listener 서버가 있는데 이와 관련된 부분을 구현하다가 이와 관련한 부분을 포스팅하려 한다.
@KafkaListener 는 kafka 서버에서 특정 토픽에 대한 메시지를 받을 때마다 어떤 메서드를 실행하도록 설정하는 애너테이션이다.
ActiveMQ도 마찬가지로 @JmsListener 를 붙여서 ActiveMQ 서버에서 메시지를 수신할 때마다 실행되는 메서드에 적용할 수 있다. 회사에서 Message Broker를 세 종류 (Kafka, ActiveMQ, MQTT) 를 사용 중인데 yml 파일에서 어떤 Message Queue 를 쓸 것인지 정하면 이 에 따라서, 셋 중 하나가 실행되도록 설정하려한다.
1. @KafkaListener
2. @JmsListener
3. (MQTT subscribe 하는 메서드)
그런데 MQTT 에서는 다른 Message Queue 와 다르게 자동으로 Listener 설정하는 애너테이션이 없어서 리스너 역할을 하는 메서드를 직접 만들려고 한다.
Dependency 추가 (Java 8 기준)
- org.eclipse.paho.client.mqttv3: 순수 MQTT Client 라이브러리. low level로 직접 publish / subscribe 가능.
- 이 라이브러리만 쓴다면 MQTT 콜백 메서드를 써서 messageArrived() 메서드를 오버라이드하면 subscribe 기능을 구현할 수 있다.
- spring-integration-mqtt: Spring Framework 의 @ServiceActivator를 이용해서 MQTT 서버에서 특정 Topic에 대한 메시지를 subscribe 할 때마다 특정 메서드가 실행되도록 설정한다.
- 자동으로 publish / subscribe 설정 가능. MQTT 를 추상화해서 쉽게 다룰 수 있다. (Channel, Adaptor 를 제공.)
<hide/>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.15</version>
</dependency>
Config 클래스
mqttClientFactory: MQTT broker 와 연결하기 위한 클라이언트를 생성하고 접속 정보를 설정한다. 이 때 "tcp"프로토콜 사용.
inbound: MQTT broker로부터 메시지를 받아오는 Adaptor 역할이다. 이걸 mqttInputChannel 로 전달한다.
mqttInputChannel: MQTT 메시지가 전달될 통로 이름이다. 메시지가 이 채널을 통해서 흐른다.
<hide/>
@Configuration
@RequiredArgsConstructor
public class MqttListenerConfig {
@Value("${spring.mqtt.server}")
private String MQTT_SERVER;
@Value("${spring.mqtt.qos}")
private String MQTT_QOS;
private final ApplicationProperty property;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{MQTT_SERVER});
// options.setUserName("");
// options.setPassword("".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "MQTTListener-" + UUID.randomUUID();
String topic = property.getQueueName();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), topic);
adapter.setOutputChannel(mqttInputChannel());
adapter.setQos(MQTT_QOS != null ? Integer.valueOf(MQTT_QOS) : 0);
return adapter;
}
}
Service 클래스
어떤 메서드에 @ServiceActivator를 붙이면 앞에서 살펴본 mqttInputChannel 에서 데이터를 받을 때마다 해당 메서드를 실행한다.
주의) 사용 중인 middleware 가 MQTT 가 아닌 경우(yml 파일에 middleware 종류에 대한 설정이 있음)에도 MQTT 서버에서 메시지를 받으면 이 메서드는 반드시 실행된다. 이 때 실행되지 않도록 반드시 조건문을 다음과 같이 넣어줘야한다.
(@JmsListener, @KafkaListener 도 마찬가지이다. yml 파일에 middleware로 설정한 값에 따라서 특정 Message Queue 만 동작하도록@ConditionalOnProperty를 적용하려했으나 아예 작동이 안되서 다음과 같은 if 문으로 return 하도록 설정했다. )
<hide/>
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMqttMessage(String message) {
if(!"mqtt".equals(queue.getMiddleware())) {
return;
}
// 받은 메시지를 처리한다.
}
'개발 일지' 카테고리의 다른 글
[12월 5주차] Vue.js 3 상위 컴포넌트 ↔ 하위 컴포넌트 데이터 전달 방법 (1) | 2023.12.31 |
---|---|
OkHttp vs RestTemplate (0) | 2023.03.18 |