ETC

Linux에서 Mosquitto로 MQTT 통신을 해보자! | Mosquitto 사용

keepbang 2021. 12. 3. 17:46

MQTT란 장치 간에 저전력으로 메시지를 전송하는 경량의 구독, 발행 네트워크 프로토콜을 말한다.

네트워크 프로토콜이라고 하지만 일반적으로 TCP/IP를 통해 실행이 된다.

MQTT를 사용하기 위해서는 여러 가지 용어를 알아두면 유용하다!!

 


👉 Broker(브로커) : MQTT 서버이며 장치(이하 client)와 장치(client)를 연결해 주는 역할을 한다. Mosquitto가 Broker이다.

👉 publish(발행) : client는 Broker로 message를 발행할 수 있다. 이때 해당 message를 구분하기 위한 키를 넣어서 보내는데 이걸 Topic이라고 한다. 발행을 하는 client를 publisher(발행자)라고 한다.

👉 subscribe(구독) : client가 broker를 통해 특정 Topic으로 publish된 message를 받을 수 있다. 구독하는 client를 subscriber(구독자)라고 한다.

👉 Topic : 발행(publish), 구독(subscribe) 할 때 Topic으로 message를 구분한다. 예를 들어 /user/msg라는 topic으로 hello라는 message를 publish 하면 /user/msg를 subscribe 하고 있는 subscribers들은 hello라는 message를 받을 수 있다.

👉 bridge : 서버 이중화를 할 때 유용한 기능으로 mqtt broker 간의 동기화를 하는 기능이다. mosquitto를 설치하면 mosquitto.conf 파일 안에 bridge에 대한 속성들을 살펴볼 수 있다. bridge에 대한 기능은 다음 포스팅 때 올리고자 한다.

👉 Will message : mqtt 브로커와 client간의 연결이 비정상적으로 종료되었을 경우에 Will message가 publish된다. 다른 client 서버에서 will message topic을 sibscribe할 수 있으며 publish된 message를 처리할 수 있다.

 

👉 Qos : 서비스의 퀄리티를 말하며 Qos 레벨에 따라 메시지의 생명주기가 결정된다. 하지만 Qos 레벨이 높을수록 서버에서 처리할 내용이 많아지므로 속도가 느려지는 단점이 있기 때문에 사용에 주의해야 한다.

 

Introduction to MQTT QoS (Quality of Service)

MQTT protocol specifies the quality of service, which guarantees the reliability of message delivery under different network environments. The design of QoS is the focus of the MQTT protocol. As a protocol specifically designed for IoT scenarios, MQTT's op

www.emqx.com


MQTT에 대한 간단한 설명

대충 그림으로 설명하자면 이런 형태가 될 거 같다...

초록색 client가 publisher, 발행자가 되는 것이고

파란색 client가 subscriber, 구독자가 된다.

그림에는 한쪽 방향으로 화살표가 되어 있지만 반대 방행으로 구독과 발행도 가능하다.

 

 

 

 

 

 

 

 

 

 

 

특정 topic으로 publish, subscribe

초록색 client에서 topic을 client3으로 보냈기 때문에 client3 topic으로 subscribe 하고 있는 파란색 Client3에게 메시지를 전송하게 된다.


실제 Linux 서버에서 사용해 보기

 

먼저 mqtt broker가 될 mosquitto를 설치해야 하는데 설치방법은 인터넷에 자세히 나와있기 때문에....

링크를 남겨둔다...!!

 

CentOS 7 - Mosquitto (MQTT) 설치 및 사용법

이번에는 WAS인 Mosquitto 서버를 설치해보도록 하겠습니다. Mosquitto는 MQTT 프로토콜 버전 5.0, 3.1.1 및 3.1을 구현하는 공개 소스(EPL / EDL License) 메세지 브로커 입니다. 설치 전에 MQTT 프로토콜에 대해..

gsk121.tistory.com

yum으로 설치하면 간편하게 설치할 수 있다.

인터넷이 안 되는 환경을 위해 수동으로 설치할 수도 있지만 인터넷이 안되는 환경에서는 의존성 패키지 설치가 엄청 빡쌔다...

수동으로 설치할 때 필요한 패키지들이다.

gcc*, penssl, openssl-devel, pcre, pcre-devel, zlib, zlib-devel, glibc, glibc-devel

이것 외에도 아래에 것들도 필요할 수도 있다.

libuuid, libuuid-devel, libxslt

yum으로 설치하면 패키지 설치도 간단하게 할 수 있지만.... 인터넷이 안 되는 linux 서버에서 설치를 한다고 하면 rpm으로 패키지 파일들을 가져가서 직접 다 설치해 줘야 한다. 설치할 때 순서 같은 게 중요하니 꼭!!! 인터넷 안되는 서버에서 테스트를 하고 간다.

기본적으로 mosquitto를 수동 설치할 때는 mosquitto 계정을 만들어서 그 계정에서 설치를 진행한다. sudo 권한이 필요하니 root 계정으로 sudo 권한을 주고 진행하자

 

 

Download

Mosquitto Enhancements These projects can be used to add extra features to Mosquitto. Management Center: A web UI for managing Mosquitto instances. In particular, this offers a convenient UI for managing clients, groups and roles as in the new Dynamic Secu

mosquitto.org

linux에 설치한다고 하면 위 링크에서 tar.gz 파일을 다운로드하면 된다.

파일을 원하는 위치에 넣고 tar 명령어로 풀면 mosquitto 디렉터리가 나온다.

mosquitto 디렉터리에서 make 명령어와 몇 가지 설정을 해주면 mosquitto 설치는 완료된다.

 

--mosquitto 설치 경로에서
# make
# sudo make install(수동으로 설치한다면 여기서 의존성 에러가 나올수 있다)

 

/etc/ld.so.conf 파일에 아래 내용 추가
include ld.so.conf.d/*.conf
include /usr/local/lib
/usr/lib
/usr/local/lib

=============== 수정 후 ==================
# sudo /sbin/ldconfig
# sudo ln -s /mosquitto설치 경로/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1

 

.bash_profile에 MOSQUITTO_HOME 설정
PATH=$PATH:$HOME/.local/bin:$HOME/bin:$MOSQUITTO_HOME/src:$MOSQUITTO_HOME/client
export MOSQUITTO_HOME=mosquitto설치경로
export PATH

=============== 수정 후 ==================

# source .bash_profile

 

 

인터넷에 검색해보면 여기서 mosquitto_sub, mosquitto_pub이라는 명령어로 테스트를 한다. 테스트하는 방법은 많이 나와있으므로 여기서는 mosquitto를 실제 기동 시키거나 config 파일에 대해 설명하고자 한다.

 

mosquitto 실행

# mosquitto -p 1883 > $LOG_DIRECTORY_PATH/mosquitto.log 2>&1 &

위 명령어로 실행시키면 특정 port와 log 파일 위치를 지정해서 실행시킬 수 있다.

port와 로그파일 경로 및 로그 level은 conf 파일을 통해 변경할 수 있다.

mosquitto를 설치한 디렉터리를 가보면 mosquitto.conf라는 config 파일이 있다. 내부를 보면 여러 주석들로 설명이 가득하다. 자세한 설명은 영어지만 아래 링크에 나와있다.

 

 

mosquitto.conf man page

Name mosquitto.conf — the configuration file for mosquitto Synopsis mosquitto.conf Description mosquitto.conf is the configuration file for mosquitto. This file can reside anywhere as long as m

mosquitto.org

port : mosquitto posrt를 설정한다(default:1883)
bind_address : 특정 ip의 접속만 허용한다.
bind_interface : 특정 네트워크 이름으로 접속을 허용한다.
log_dest : log 파일 경로를 설정한다. file로 설정할 때에는 "log_dest file 경로/log 파일 이름"으로 설정하면 된다.
log_type : 보여줄 log type을 설정한다. "log_type all"로 설정하면 모든 로그를 볼 수 있다.
connection_messages : mosquitto에 client가 connection 될 때 message를 log로 보여준다.
log_timestamp : log message를 보여줄 때 timestamp 값을 추가한다.

 


JAVA(maven)로 Test하기

 

mqtt를 사용하기 위해서 vertx-mqtt를 사용하였다.

 

......
  <dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-core</artifactId>
    <version>4.0.2</version>
  </dependency>

  <dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-mqtt</artifactId>
    <version>4.0.2</version>
  </dependency>
......

 

class 파일을 2개 만들고 하나는 publisher, 하나는 subscriber로 사용한다.

(아래 코드는 테스트를 위한 코드이므로 참고용으로만 봐주세요:~)

 

subscriber
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Calendar;

public class SubscribeTest {
	private Integer port;
	private String host;
	private Integer keepAliveTimeSeconds;
	private Integer workerPoolSize;
	private String topic = "/USER/MSG/client3";
	private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");


	public SubscribeTest() {
		this.port = 1883;
		this.host = "192.168.0.3";
		this.keepAliveTimeSeconds = 60;
		this.workerPoolSize = 40;
	}
	
	public static void main(String[] args) throws Exception {
		SubscribeTest process = new SubscribeTest();
		process.start();
	}

	public void start() {
		VertxOptions vertxOptions = new VertxOptions();
		vertxOptions.setBlockedThreadCheckInterval(200);
		vertxOptions.setMaxEventLoopExecuteTime(1000);
		vertxOptions.setWorkerPoolSize(workerPoolSize);

		Vertx vertx = Vertx.vertx(vertxOptions);

		MqttClientOptions options = new MqttClientOptions();
		options.setClientId(sdf.format(Calendar.getInstance().getTime()));
		options.setKeepAliveTimeSeconds(this.keepAliveTimeSeconds);
		options.setMaxMessageSize(268435456);

		MqttClient client = MqttClient.create(vertx, options);
		
		client.closeHandler(new Handler<Void>() {

			@Override
			public void handle(Void event) {
				System.out.println("Session Close!!");
			}
			
		});

		client.publishHandler(new Handler<MqttPublishMessage>() {

			@Override
			public void handle(MqttPublishMessage publish) {
				String topic = publish.topicName();
				String message = publish.payload().toString(Charset.defaultCharset());
				System.out.println(topic);
				System.out.println(message);

				System.out.println("Just received message on [" + publish.topicName() + "] payload ["
						+ publish.payload().toString(Charset.defaultCharset()) + "] with QoS [" + publish.qosLevel()
						+ "]");
			}

		});

		client.subscribeCompletionHandler(new Handler<MqttSubAckMessage>() {

			@Override
			public void handle(MqttSubAckMessage h) {
				System.out.println("Receive SUBACK from server with granted QoS : " + h.grantedQoSLevels());
			}

		});

		client.connect(this.port, this.host, new Handler<AsyncResult<MqttConnAckMessage>>() {

			@Override
			public void handle(AsyncResult<MqttConnAckMessage> ch) {
				if (ch.succeeded()) {
					System.out.println("Connected to a server");
					client.subscribe(topic, 0);
				} else {
					System.out.println("Failed to connect to a server");
					System.out.println(ch.cause());
				}
			}

		});
	}
}

맨 아래에 connect 한 부분을 보면 subscribe 부분이 있다.

topic은 /USER/MSG/client3로 하였다.

 

publisher
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Calendar;

public class PublisherTest {
	private Integer port;
	private String host;
	private Integer keepAliveTimeSeconds;
	private Integer workerPoolSize;
	private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");


	public PublisherTest() {
		this.port = 21883;
		this.host = "192.168.0.240";
		this.keepAliveTimeSeconds = 60;
		this.workerPoolSize = 40;
	}
	
	public static void main(String[] args) throws Exception {
		PublisherTest process = new PublisherTest();
		process.start();
	}

	public void start() {
		VertxOptions vertxOptions = new VertxOptions();
		vertxOptions.setBlockedThreadCheckInterval(200);
		vertxOptions.setMaxEventLoopExecuteTime(1000);
		vertxOptions.setWorkerPoolSize(workerPoolSize);

		Vertx vertx = Vertx.vertx(vertxOptions);

		MqttClientOptions options = new MqttClientOptions();
		options.setClientId(sdf.format(Calendar.getInstance().getTime()));
		options.setKeepAliveTimeSeconds(this.keepAliveTimeSeconds);
		options.setMaxMessageSize(268435456);

		MqttClient client = MqttClient.create(vertx, options);
		
		client.closeHandler(new Handler<Void>() {

			@Override
			public void handle(Void event) {
				System.out.println("Session Close!!");
			}
			
		});

		client.connect(this.port, this.host, new Handler<AsyncResult<MqttConnAckMessage>>() {

			@Override
			public void handle(AsyncResult<MqttConnAckMessage> ch) {
				if (ch.succeeded()) {
					System.out.println("Connected to a server");
					client.publish("/USER/MSG/client3", Buffer.buffer("Hello"), MqttQoS.AT_MOST_ONCE, false, false,
							new Handler<AsyncResult<Integer>>() {

								@Override
								public void handle(AsyncResult<Integer> s) {
									System.out.println("Publish sent to a server");

								}

							});
				} else {
					System.out.println("Failed to connect to a server");
					System.out.println(ch.cause());
				}
			}

		});
	}
}

여기도 마찬가지로 connect를 하고 publish로 message를 보냈다.

실행 순서는 당연히 subscribe을 하고 publish를 실행한다.

 

publisher 로그
.....
15:39:53.871 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
15:39:53.871 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
Connected to a server
Publish sent to a server

 

subscriber 로그
.....
15:39:35.596 [vert.x-eventloop-thread-0] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
Connected to a server
Receive SUBACK from server with granted QoS : [0]
/USER/MSG/client3
Hello
Just received message on [/USER/MSG/client3] payload [Hello] with QoS [AT_MOST_ONCE]

message를 정상적으로 받아오는 걸 확인할 수 있다.