package com.data.emqx; /** * @author licd * @className MqttSubscribeSample * @description com.data.emqx * @date 2024/12/16 14:37 */ import com.data.emqx.service.ILoaderCarDataSourceService; import lombok.Synchronized; import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class MqttSubscribeSample { private static final Logger logger = (Logger) LoggerFactory.getLogger(MqttSubscribeSample.class); @Value("${emqx.username}") private String USERNAME; @Value("${emqx.password}") private String PASSWORD; @Value("${emqx.broker}") private String broker; @Value("${emqx.clientid}") private String clientId; @Value("${emqx.subscriptiontopic}") private String subscriptionTopic; @Value("${emqx.qos}") private String qos; @Value("${emqx.pubTopic}") private String pubTopic; @Autowired private ILoaderCarDataSourceService loaderCarDataSourceService; public void getEmqsData() { try { MqttClient sampleClient = new MqttClient(broker, clientId); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(USERNAME); connOpts.setPassword(PASSWORD.toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 建立连接 sampleClient.connect(connOpts); // 订阅主题 sampleClient.subscribe(subscriptionTopic); /*try{ loaderCarDataSourceService.addLoaderCarDataSource(null); }catch (Exception e){ logger.error("消息接收失败:"+e.getMessage()); }*/ // 设置回调 sampleClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { // 连接丢失后的回调 System.out.println("连接断开,可以做重连"); } @Override public void messageArrived(String topic, MqttMessage message) { try { // 接收到消息的回调 loaderCarDataSourceService.addLoaderCarDataSource(message.getPayload()); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 消息发布所需参数 MqttMessage message1 = new MqttMessage(("received success,received time:"+sdf.format(System.currentTimeMillis())).getBytes()); message.setQos(Integer.valueOf(qos)); // 发布消息 sampleClient.publish(pubTopic, message1); }catch (Exception e){ logger.error("消息接收异常:"+e.getMessage()); try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 消息发布所需参数 MqttMessage message1 = new MqttMessage(("received fail,received time:"+sdf.format(System.currentTimeMillis())).getBytes()); message.setQos(Integer.valueOf(qos)); // 发布消息 sampleClient.publish(pubTopic, message1); }catch (Exception e2){ logger.error("消息响应异常:"+e2.getMessage()); } } } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 消息发送成功的回调 /** * 消息发布完成且收到ack确认后的回调 * QoS0:消息被网络发出后触发一次 * QoS1:当收到broker的PUBACK消息后触发 * QoS2:当收到broer的PUBCOMP消息后触发 */ System.out.println("deliveryComplete---------"+ token.isComplete()); } }); } catch (MqttException me) { logger.error("reason " + me.getReasonCode()); logger.error("msg " + me.getMessage()); logger.error("loc " + me.getLocalizedMessage()); logger.error("cause " + me.getCause()); logger.error("excep " + me); me.printStackTrace(); } /* String HOST = "tcp://124.71.134.146:1883"; String TOPIC = "emqx_topic"; int qos =2; String clientid = "emqx_clientid"; String userName = "test"; String passWord = "test"; try { // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的连接设置 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置连接的用户名 options.setUserName(userName); // 设置连接的密码 options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 自动重连 options.setAutomaticReconnect(true); // 设置回调函数 client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } public void messageArrived(String topic, MqttMessage message) throws Exception { *//** * 订阅到消息后的回调 * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoSl,QoS2且客户端未进行ack确认的消息都将由 * broker服务器再次发送到客户端 *//* System.out.println("messageId:" +message.getId()); System.out.println("接收消息主题:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息内容:" + new String(message.getPayload())); System.out.println(); } public void deliveryComplete(IMqttDeliveryToken token) { *//** * 消息发布完成且收到ack确认后的回调 * QoS0:消息被网络发出后触发一次 * QoS1:当收到broker的PUBACK消息后触发 * QoS2:当收到broer的PUBCOMP消息后触发 *//* System.out.println("deliveryComplete---------"+ token.isComplete()); } }); // 建立连接 client.connect(options); //订阅消息 client.subscribe(TOPIC, qos); } catch (Exception e) { e.printStackTrace(); }*/ } }