1、引入依赖
在 pom.xml 中添加Paho客户端依赖:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
2. 配置文件
在 application.yml 中添加EMQX连接配置:
mqtt:
host: tcp://localhost:1883 # EMQX连接地址
client-id: service-backend-1 # 客户端ID(需唯一)
username: admin # EMQX认证用户名
password: public # EMQX认证密码
topic:
upstream: /vehicle/+/position # 订阅的上行Topic(车机上报)
downstream: /cloud/{vin}/command # 下发的Topic模板
qos: 1 # 默认服务质量
timeout: 30 # 连接超时(秒)
keepalive: 60 # 心跳间隔(秒)
3. 配置类与核心客户端
@Configuration
@Slf4j
public class MqttPahoConfig {
@Value("${mqtt.host}")
private String host;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.timeout}")
private int timeout;
@Value("${mqtt.keepalive}")
private int keepalive;
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = new MqttClient(host, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true); // 自动重连(车联网必备)
options.setCleanSession(true); // 是否清除会话
client.connect(options);
log.info("MQTT客户端连接成功,clientId: {}", clientId);
// 设置回调处理器
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT连接完成,重连状态:{}", reconnect);
// 连接成功后订阅Topic(可在此处订阅)
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT连接丢失,原因:", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 消息到达处理(交由业务Service处理)
String payload = new String(message.getPayload());
log.info("收到消息,topic: {}, payload: {}", topic, payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.debug("消息发送完成,messageId: {}", token.getMessageId());
}
});
return client;
}
}
4. 消息发送与接收服务
@Service
@Slf4j
public class MqttPahoService {
@Autowired
private MqttClient mqttClient;
/**
* 发送消息到指定Topic
*/
public void publish(String topic, String payload, int qos) throws MqttException {
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
message.setQos(qos);
mqttClient.publish(topic, message);
log.debug("消息发布成功,topic: {}, qos: {}", topic, qos);
}
/**
* 订阅Topic(可在连接成功后调用)
*/
public void subscribe(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
log.info("订阅Topic成功,topic: {}, qos: {}", topic, qos);
}
/**
* 取消订阅
*/
public void unsubscribe(String topic) throws MqttException {
mqttClient.unsubscribe(topic);
}
}