package com.data.emqx; /** * @author licd * @className MqttSubscribeSample * @description com.data.emqx * @date 2024/12/16 14:37 */ import com.alibaba.fastjson.JSON; import com.data.emqx.domain.LoadCarDataSoure; import com.data.emqx.service.ILoaderCarDataSourceService; import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; @Component public class MqttSubscribeSample { private static final Logger logger = (Logger) LoggerFactory.getLogger(MqttSubscribeSample.class); @Value("${emqx.username}") private String USERNAME; @Value("${emqx.password}") private String PASSWORD; @Value("${emqx.broker}") private String broker; @Value("${emqx.clientid}") private String clientId; @Value("${emqx.subscriptiontopic}") private String subscriptionTopic; @Value("${emqx.qos}") private String qos; @Autowired private ILoaderCarDataSourceService loaderCarDataSourceService; public void getEmqsData() { try { MqttClient sampleClient = new MqttClient(broker, clientId); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(USERNAME); connOpts.setPassword(PASSWORD.toCharArray()); // 保留会话 connOpts.setCleanSession(false); // 自动连接 connOpts.setAutomaticReconnect(true); connOpts.setConnectionTimeout(10); connOpts.setKeepAliveInterval(20); // 建立连接 sampleClient.connect(connOpts); // 订阅主题 sampleClient.subscribe(subscriptionTopic); // 设置回调 sampleClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { // 连接丢失后的回调 System.out.println("连接断开,可以做重连"); } @Override public void messageArrived(String topic, MqttMessage message) { String pubTopic=""; String json = new String(message.getPayload()); try { // 接收到消息的回调 logger.info("emqx接收到的数据:" + json); LoadCarDataSoure loadCarDataSoure = JSON.parseObject(json, LoadCarDataSoure.class); pubTopic = loadCarDataSoure.getDeviceId(); loaderCarDataSourceService.addLoaderCarDataSource(message.getPayload()); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 消息发布所需参数 MqttMessage message1 = new MqttMessage(("received success,received time:"+sdf.format(System.currentTimeMillis())).getBytes()); message.setQos(0); // 发布消息 sampleClient.publish(pubTopic, message1); }catch (Exception e){ logger.error("emqx接收到的数据:" + json); logger.error("消息接收异常:"+e.getMessage()); try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 消息发布所需参数 MqttMessage message1 = new MqttMessage(("received fail,received time:"+sdf.format(System.currentTimeMillis())).getBytes()); message.setQos(Integer.valueOf(qos)); // 发布消息 sampleClient.publish(pubTopic, message1); }catch (Exception e2){ logger.error("消息响应异常:"+e2.getMessage()); } } } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 消息发送成功的回调 /** * 消息发布完成且收到ack确认后的回调 * QoS0:消息被网络发出后触发一次 * QoS1:当收到broker的PUBACK消息后触发 * QoS2:当收到broer的PUBCOMP消息后触发 */ System.out.println("deliveryComplete---------"+ token.isComplete()); } }); } catch (MqttException me) { logger.error("reason " + me.getReasonCode()); logger.error("msg " + me.getMessage()); logger.error("loc " + me.getLocalizedMessage()); logger.error("cause " + me.getCause()); logger.error("excep " + me); me.printStackTrace(); } } }