From 92fc79fe71c88b2f8574b0fbf78ad779d0dcb999 Mon Sep 17 00:00:00 2001 From: zxf <1532322479@qq.com> Date: Mon, 25 May 2026 16:00:09 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=20=E4=BF=AE=E6=94=B9emqx=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=8E=A5=E6=94=B6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bs-loader/pom.xml | 2 - .../main/java/com/data/emqx/EmqxTimer.java | 8 + .../java/com/data/emqx/MqttDeviceClient.java | 806 ++++++++++++++++++ .../com/data/emqx/MqttSubscribeSample.java | 117 +-- .../com/data/emqx/domain/DeviceAlarm.java | 36 + .../com/data/emqx/domain/DeviceAlarmUwb.java | 40 + .../com/data/emqx/domain/DeviceHeart.java | 36 + .../data/emqx/domain/DeviceParamWrite.java | 88 ++ .../data/emqx/domain/DeviceWeightRfid.java | 38 + .../data/emqx/domain/LoadCarDataSoure.java | 10 + .../com/data/emqx/domain/OtaProgress.java | 79 ++ .../data/emqx/mapper/DeviceDataMapper.java | 51 ++ .../emqx/mapper/DeviceParamWriteMapper.java | 34 + .../data/emqx/mapper/OtaProgressMapper.java | 21 + .../data/emqx/service/IDeviceDataService.java | 99 +++ .../service/impl/DeviceDataServiceImpl.java | 402 +++++++++ .../com/data/emqx/test/DeviceDataTest.java | 113 +++ .../src/main/resources/application-dev.yml | 33 +- .../src/main/resources/application-prod.yml | 26 +- bs-loader/src/main/resources/log4j.properties | 2 +- .../mybatis/emqx/DeviceDataMapper.xml | 118 +++ .../mybatis/emqx/DeviceParamWriteMapper.xml | 91 ++ .../mybatis/emqx/LoadCarDataSoureMapper.xml | 2 + .../mybatis/emqx/OtaProgressMapper.xml | 51 ++ 24 files changed, 2191 insertions(+), 112 deletions(-) create mode 100644 bs-loader/src/main/java/com/data/emqx/MqttDeviceClient.java create mode 100644 bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarm.java create mode 100644 bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarmUwb.java create mode 100644 bs-loader/src/main/java/com/data/emqx/domain/DeviceHeart.java create mode 100644 bs-loader/src/main/java/com/data/emqx/domain/DeviceParamWrite.java create mode 100644 bs-loader/src/main/java/com/data/emqx/domain/DeviceWeightRfid.java create mode 100644 bs-loader/src/main/java/com/data/emqx/domain/OtaProgress.java create mode 100644 bs-loader/src/main/java/com/data/emqx/mapper/DeviceDataMapper.java create mode 100644 bs-loader/src/main/java/com/data/emqx/mapper/DeviceParamWriteMapper.java create mode 100644 bs-loader/src/main/java/com/data/emqx/mapper/OtaProgressMapper.java create mode 100644 bs-loader/src/main/java/com/data/emqx/service/IDeviceDataService.java create mode 100644 bs-loader/src/main/java/com/data/emqx/service/impl/DeviceDataServiceImpl.java create mode 100644 bs-loader/src/main/java/com/data/emqx/test/DeviceDataTest.java create mode 100644 bs-loader/src/main/resources/mybatis/emqx/DeviceDataMapper.xml create mode 100644 bs-loader/src/main/resources/mybatis/emqx/DeviceParamWriteMapper.xml create mode 100644 bs-loader/src/main/resources/mybatis/emqx/OtaProgressMapper.xml diff --git a/bs-loader/pom.xml b/bs-loader/pom.xml index d4ad6b8..9d79550 100644 --- a/bs-loader/pom.xml +++ b/bs-loader/pom.xml @@ -108,8 +108,6 @@ org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5 - system - ${project.basedir}/.mvn/wrapper/org.eclipse.paho.client.mqttv3-1.2.5.jar diff --git a/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java b/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java index d024364..23497ff 100644 --- a/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java +++ b/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java @@ -1,3 +1,11 @@ +/* + * @Author: zxf 1532322479@qq.com + * @Date: 2026-03-16 16:59:08 + * @LastEditors: zxf 1532322479@qq.com + * @LastEditTime: 2026-05-04 23:13:10 + * @FilePath: \bs-jxc-test1\bs-jxc\bs-loader\src\main\java\com\data\emqx\EmqxTimer.java + * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE + */ package com.data.emqx; import org.springframework.beans.factory.annotation.Autowired; diff --git a/bs-loader/src/main/java/com/data/emqx/MqttDeviceClient.java b/bs-loader/src/main/java/com/data/emqx/MqttDeviceClient.java new file mode 100644 index 0000000..0b8be9f --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/MqttDeviceClient.java @@ -0,0 +1,806 @@ +package com.data.emqx; + +/** + * @author licd + * @className MqttDeviceClient + * @description 集疏运终端设备MQTT协议接收客户端 + * @date 2026/03/17 + */ + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.data.emqx.service.IDeviceDataService; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Component +public class MqttDeviceClient { + + private static final Logger logger = LoggerFactory.getLogger(MqttDeviceClient.class); + + // Broker配置 + @Value("${device.mqtt.broker:tcp://127.0.0.1:1883}") + private String broker; + + @Value("${device.mqtt.username:emqx_public}") + private String username; + + @Value("${device.mqtt.password:emqx_public}") + private String password; + + @Value("${device.mqtt.clientId:server_client_001}") + private String clientId; + + // V2.0.1 Topic配置 - 动态层级结构 + // 设备端发布(上行): WgtRfid/device/${did} + // 服务器下发(下行): WgtRfid/server/${did} + private static final String TOPIC_DEVICE_UPLINK = "WgtRfid/device/#"; + private static final String TOPIC_SERVER_DOWNLINK_PREFIX = "WgtRfid/server/"; + + // 兼容旧版 Topic(V1.0.1) + private static final String TOPIC_RFID_WGT = "Topic_RFID_WGT"; + private static final String TOPIC_HEART = "Topic_Heart"; + private static final String TOPIC_ALARM = "Topic_ALARM"; + + // V2.0.1 QoS配置 - 统一为QoS 0 + private static final int QOS = 0; + + // V2.0.1 消息类型 + private static final String MSG_TYPE_WEIGHT_RFID = "weight_rfid"; + private static final String MSG_TYPE_ALARM = "alarm"; + private static final String MSG_TYPE_HEART = "heart"; + private static final String MSG_TYPE_OTA_PROGRESS = "ota_progress"; + private static final String MSG_TYPE_PARAM_READ = "param_read"; + private static final String MSG_TYPE_PARAM_WRITE = "param_write"; + + private MqttClient mqttClient; + + // 异步ACK发送线程池(避免在messageArrived回调中直接调用publish导致死锁) + private final ExecutorService ackSenderExecutor = Executors.newFixedThreadPool(2); + + @Autowired + private IDeviceDataService deviceDataService; + + /** + * 初始化MQTT客户端连接 + */ + @PostConstruct + public void init() { + // 测试数据保存功能 +// testDataSave(); + connect(); + + } + + /** + * 测试数据保存功能 + */ + private void testDataSave() { + try { + logger.info("=== 开始测试数据保存功能 ==="); + + // 测试保存称重&标签数据 + if (deviceDataService != null) { + String deviceId = "test_device_001"; + Long seqNum = 123456789L; + Long sendTime = System.currentTimeMillis() / 1000; + Integer weight = 1000; + String rfid = "e280111122223333,b001111122223333"; + + deviceDataService.saveWeightRfidData(deviceId, seqNum, sendTime, weight, rfid); + logger.info("称重&标签数据保存测试成功"); + + // 测试保存告警数据 + Long alarmSeqNum = 987654321L; + String uwb = "ID:1234,dist:2000; ID:5678,dist:1500"; + com.alibaba.fastjson.JSONArray uwbArray = new com.alibaba.fastjson.JSONArray(); + com.alibaba.fastjson.JSONObject uwbObj1 = new com.alibaba.fastjson.JSONObject(); + uwbObj1.put("ID", "1234"); + uwbObj1.put("Dist", "2000 cm"); + uwbArray.add(uwbObj1); + com.alibaba.fastjson.JSONObject uwbObj2 = new com.alibaba.fastjson.JSONObject(); + uwbObj2.put("ID", "5678"); + uwbObj2.put("Dist", "1500 cm"); + uwbArray.add(uwbObj2); + + deviceDataService.saveAlarmData(deviceId, alarmSeqNum, sendTime, uwb, uwbArray); + logger.info("告警数据保存测试成功"); + + // 测试保存心跳数据 + Double sysVol = 12.8; + Integer netRSSI = -75; + + deviceDataService.saveHeartData(deviceId, sysVol, netRSSI, sendTime); + logger.info("心跳数据保存测试成功"); + + // V2.0.1 测试保存OTA升级进度数据 + String taskID = "ota_task_001"; + Integer progress = 50; + String otaStatus = "InProgress"; + deviceDataService.saveOtaProgressData(deviceId, taskID, progress, otaStatus); + logger.info("OTA升级进度数据保存测试成功"); + + // V2.0.1 测试保存设备参数写入响应 + Long paramSeqNum = 111222333L; + String paramStatus = "OK"; + String failReason = null; + deviceDataService.saveParamWriteData(deviceId, paramSeqNum, paramStatus, failReason); + logger.info("设备参数写入响应保存测试成功"); + + // V2.0.1 测试保存设备参数期望值(用于设备上线后自动拉平配置) + // 需要通过 DeviceDataServiceImpl 直接调用 + if (deviceDataService instanceof com.data.emqx.service.impl.DeviceDataServiceImpl) { + com.data.emqx.service.impl.DeviceDataServiceImpl impl = + (com.data.emqx.service.impl.DeviceDataServiceImpl) deviceDataService; + + // 保存几个设备参数期望值 + impl.saveParamExpectation(deviceId, "Rs485_PollMs", "1000"); + impl.saveParamExpectation(deviceId, "UWB_AlarmMs", "5000"); + impl.saveParamExpectation(deviceId, "VoiceAlarmMs", "3000"); + impl.saveParamExpectation(deviceId, "UWB_IgnoreCm", "50"); + impl.saveParamExpectation(deviceId, "UWB_AlarmCm", "200"); + logger.info("设备参数期望值保存测试成功"); + } + + } else { + logger.error("deviceDataService 为 null,无法测试数据保存功能"); + } + + logger.info("=== 数据保存功能测试完成 ==="); + + // V2.0.1 测试设备上线参数同步功能 + testDeviceParamSync("test_device_001"); + } catch (Exception e) { + logger.error("测试数据保存功能异常: {}", e.getMessage(), e); + } + } + + /** + * 连接MQTT服务器 + */ + public void connect() { + try { + // 创建MQTT客户端 + mqttClient = new MqttClient(broker, clientId, new MemoryPersistence()); + + // 配置连接选项 + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setUserName(username); + connOpts.setPassword(password.toCharArray()); + connOpts.setCleanSession(true); + connOpts.setAutomaticReconnect(true); + connOpts.setConnectionTimeout(30); + connOpts.setKeepAliveInterval(60); + connOpts.setMaxInflight(100); + + // 设置断线重连回调 + mqttClient.setCallback(new MqttCallbackExtended() { + @Override + public void connectComplete(boolean reconnect, String serverURI) { + if (reconnect) { + logger.info("MQTT连接已自动重连到: {}", serverURI); + subscribeTopics(); + } else { + logger.info("MQTT连接已建立: {}", serverURI); + } + } + + @Override + public void connectionLost(Throwable cause) { + logger.error("MQTT连接断开: {}", cause.getMessage(), cause); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + processMessage(topic, message); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + logger.debug("消息发送完成"); + } + }); + + // 建立连接 + logger.info("正在连接MQTT Broker: {}", broker); + mqttClient.connect(connOpts); + + // 订阅Topic + subscribeTopics(); + + logger.info("MQTT客户端初始化完成"); + + } catch (MqttException e) { + logger.error("MQTT连接失败: {}", e.getMessage(), e); + } + } + + /** + * 订阅Topic(V2.0.1 动态层级结构) + * 设备端发布到: WgtRfid/device/${did} + */ + private void subscribeTopics() { + try { + // V2.0.1 使用通配符订阅所有设备的动态Topic + IMqttToken token = mqttClient.subscribeWithResponse(TOPIC_DEVICE_UPLINK, QOS); + token.waitForCompletion(); + if (token.isComplete()) { + logger.info("订阅成功 - Topic: {}, QoS: {} (V2.0.1动态Topic)", TOPIC_DEVICE_UPLINK, QOS); + } else { + logger.error("订阅失败 - Topic: {}", TOPIC_DEVICE_UPLINK); + } + + // 兼容旧版 Topic(V1.0.1) + subscribeTopic(TOPIC_RFID_WGT); + subscribeTopic(TOPIC_HEART); + subscribeTopic(TOPIC_ALARM); + + } catch (MqttException e) { + logger.error("订阅Topic异常: {}", e.getMessage(), e); + } + } + + /** + * 订阅单个Topic + */ + private void subscribeTopic(String topic) { + try { + IMqttToken token = mqttClient.subscribeWithResponse(topic, QOS); + token.waitForCompletion(); + if (token.isComplete()) { + logger.info("订阅成功 - Topic: {}, QoS: {} (旧版兼容)", topic, QOS); + } else { + logger.error("订阅失败 - Topic: {}", topic); + } + } catch (MqttException e) { + logger.error("订阅Topic异常: {}, Error: {}", topic, e.getMessage()); + } + } + + /** + * 处理接收到的消息(兼容V1.0.1和V2.0.1) + * V2.0.1 Topic格式: WgtRfid/device/${did} - deviceId从Topic路径提取 + * V1.0.1 Topic格式: Topic_RFID_WGT/Topic_HEART/Topic_ALARM - deviceId从JSON提取 + */ + private void processMessage(String topic, MqttMessage message) { + // 调试:确认消息被接收 + String debugMsg = "===== processMessage 方法被调用 ===== Topic: " + topic + ", QoS: " + message.getQos(); + logger.info(debugMsg); + System.out.println(debugMsg); + + String payload = new String(message.getPayload()); + logger.info("收到消息 - Topic: {}, Payload: {}", topic, payload); + + try { + // 解析JSON(先解析,因为旧版需要从JSON提取deviceId) + JSONObject jsonObject = JSON.parseObject(payload); + + // V2.0.1: deviceId从Topic路径提取 + // V1.0.1: deviceId从JSON提取(兼容旧版) + String deviceId = extractDeviceIdFromTopic(topic); + boolean isOldProtocol = false; + + // 如果从Topic无法提取deviceId,尝试从JSON提取(旧版兼容) + if (deviceId == null || deviceId.isEmpty()) { + deviceId = jsonObject.getString("deviceID"); + isOldProtocol = true; + } + + // 调试:显示提取的deviceId + String deviceIdMsg = "从" + (isOldProtocol ? "JSON" : "Topic") + "提取的DeviceID: " + (deviceId != null ? deviceId : "null") + " (协议版本: " + (isOldProtocol ? "V1.0.1" : "V2.0.1") + ")"; + logger.info(deviceIdMsg); + System.out.println(deviceIdMsg); + + if (deviceId == null || deviceId.isEmpty()) { + logger.error("无法从Topic或JSON提取deviceId: {}", topic); + return; + } + + // 获取消息类型 + String messageType = jsonObject.getString("messageType"); + + // 旧版兼容:如果没有messageType字段,根据Topic推断消息类型 + if (messageType == null || messageType.isEmpty()) { + messageType = inferMessageTypeFromTopic(topic); + } + + if (messageType == null) { + logger.error("消息缺少必要字段: messageType"); + return; + } + + // 根据messageType处理不同类型的消息(传递协议版本) + switch (messageType) { + case MSG_TYPE_WEIGHT_RFID: + processWeightRfidData(deviceId, jsonObject, isOldProtocol); + break; + case MSG_TYPE_ALARM: + processAlarmData(deviceId, jsonObject, isOldProtocol); + break; + case MSG_TYPE_HEART: + processHeartData(deviceId, jsonObject); + break; + case MSG_TYPE_OTA_PROGRESS: + processOtaProgressData(deviceId, jsonObject); + break; + case MSG_TYPE_PARAM_READ: + processParamReadData(deviceId, jsonObject); + break; + case MSG_TYPE_PARAM_WRITE: + processParamWriteData(deviceId, jsonObject); + break; + default: + logger.warn("未知的消息类型: {}", messageType); + } + + } catch (Exception e) { + logger.error("消息处理异常: {}", e.getMessage(), e); + } + } + + /** + * 从Topic路径提取deviceId + * V2.0.1 Topic格式: WgtRfid/device/${did} + */ + private String extractDeviceIdFromTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + String prefix = "WgtRfid/device/"; + if (topic.startsWith(prefix)) { + return topic.substring(prefix.length()); + } + return null; + } + + /** + * 从Topic推断消息类型(旧版兼容) + * V1.0.1 的Topic直接表示消息类型 + */ + private String inferMessageTypeFromTopic(String topic) { + if (topic == null || topic.isEmpty()) { + return null; + } + switch (topic) { + case TOPIC_RFID_WGT: + return MSG_TYPE_WEIGHT_RFID; + case TOPIC_HEART: + return MSG_TYPE_HEART; + case TOPIC_ALARM: + return MSG_TYPE_ALARM; + default: + return null; + } + } + + /** + * 处理称重&标签数据 + */ + private void processWeightRfidData(String deviceId, JSONObject jsonObject, boolean isOldProtocol) { + try { + // 调试:确认方法被调用 + String debugMsg = "===== processWeightRfidData 方法被调用 ===== DeviceID: " + deviceId + ", Payload: " + jsonObject.toJSONString() + ", Protocol: " + (isOldProtocol ? "V1.0.1" : "V2.0.1"); + logger.info(debugMsg); + System.out.println(debugMsg); + + logger.info("处理称重&标签数据 - DeviceID: {}", deviceId); + + Long seqNum = jsonObject.getLong("seqNum"); + if (seqNum == null) { + logger.error("非法消息:seqNum 缺失,丢弃消息且不发送ACK。Payload: {}", jsonObject.toJSONString()); + return; + } + + // 检查是否已存在相同数据 + if (deviceDataService != null && deviceDataService.existsWeightRfidData(deviceId, seqNum)) { + logger.info("称重数据已存在,直接返回ACK - DeviceID: {}, SeqNum: {}", deviceId, seqNum); + // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁) + final Long finalSeqNum1 = seqNum; + final boolean finalIsOldProtocol1 = isOldProtocol; + ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum1, "weight_rfid", finalIsOldProtocol1)); + return; + } + + Long sendTime = jsonObject.getLong("sendTime"); + Integer weight = jsonObject.getInteger("weight"); + if (weight == null) { + logger.warn("警告:weight 字段缺失,使用默认值0。DeviceID: {}", deviceId); + weight = 0; + } + + String rfidStr = null; + JSONArray rfidArray = jsonObject.getJSONArray("RFID"); + if (rfidArray != null && !rfidArray.isEmpty()) { + List rfidList = rfidArray.toJavaList(String.class); + rfidStr = String.join(",", rfidList); + } else { + rfidStr = ""; + } + + logger.info("称重数据 - SeqNum: {}, Weight: {}, RFID: {}", seqNum, weight, rfidStr); + + if (deviceDataService != null) { + deviceDataService.saveWeightRfidData(deviceId, seqNum, sendTime, weight, rfidStr); + } + + // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁) + final Long finalSeqNum2 = seqNum; + final boolean finalIsOldProtocol2 = isOldProtocol; + ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum2, "weight_rfid", finalIsOldProtocol2)); + + } catch (Exception e) { + logger.error("处理称重&标签数据异常: {}", e.getMessage(), e); + } + } + + /** + * 处理告警数据 + */ + private void processAlarmData(String deviceId, JSONObject jsonObject, boolean isOldProtocol) { + try { + logger.info("处理告警数据 - DeviceID: {}", deviceId); + + Long seqNum = jsonObject.getLong("seqNum"); + if (seqNum == null) { + logger.error("非法消息:seqNum 缺失,丢弃消息且不发送ACK。Payload: {}", jsonObject.toJSONString()); + return; + } + + // 检查是否已存在相同数据 + if (deviceDataService != null && deviceDataService.existsAlarmData(deviceId, seqNum)) { + logger.info("告警数据已存在,直接返回ACK - DeviceID: {}, SeqNum: {}", deviceId, seqNum); + // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁) + final Long finalSeqNum3 = seqNum; + final boolean finalIsOldProtocol3 = isOldProtocol; + ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum3, "alarm", finalIsOldProtocol3)); + return; + } + + Long sendTime = jsonObject.getLong("sendTime"); + + StringBuilder uwbInfo = new StringBuilder(); + JSONArray uwbArray = jsonObject.getJSONArray("UWB"); + if (uwbArray != null && !uwbArray.isEmpty()) { + for (int i = 0; i < uwbArray.size(); i++) { + JSONObject uwbObj = uwbArray.getJSONObject(i); + if (uwbObj == null) continue; + + Integer id = uwbObj.getInteger("ID"); + Integer dist = uwbObj.getInteger("dist"); + + if (id == null || dist == null) { + logger.warn("UWB子项数据不完整,跳过该项。Item: {}", uwbObj.toJSONString()); + continue; + } + + if (uwbInfo.length() > 0) { + uwbInfo.append("; "); + } + uwbInfo.append("ID:").append(id).append(",dist:").append(dist); + } + } + + logger.info("告警数据 - SeqNum: {}, SendTime: {}, UWB: {}", seqNum, sendTime, uwbInfo); + + if (deviceDataService != null) { + deviceDataService.saveAlarmData(deviceId, seqNum, sendTime, uwbInfo.toString(), uwbArray); + } + + // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁) + final Long finalSeqNum4 = seqNum; + final boolean finalIsOldProtocol4 = isOldProtocol; + ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum4, "alarm", finalIsOldProtocol4)); + + } catch (Exception e) { + logger.error("处理告警数据异常: {}", e.getMessage(), e); + } + } + + /** + * 处理心跳数据 + */ + private void processHeartData(String deviceId, JSONObject jsonObject) { + try { + logger.info("处理心跳数据 - DeviceID: {}", deviceId); + + Double sysVol = jsonObject.getDouble("sysVol"); + Integer netRSSI = jsonObject.getInteger("netRSSI"); + Long sendTime = jsonObject.getLong("sendTime"); + + // 检查是否已存在相同数据 + if (deviceDataService != null && deviceDataService.existsHeartData(deviceId, sendTime)) { + logger.info("心跳数据已存在,直接返回ACK - DeviceID: {}, SendTime: {}", deviceId, sendTime); + // 心跳数据没有seqNum,不需要发送ACK + return; + } + + logger.info("心跳数据 - sysVol: {}, netRSSI: {}, sendTime: {}", + sysVol, netRSSI, sendTime); + + if (deviceDataService != null) { + deviceDataService.saveHeartData(deviceId, sysVol, netRSSI, sendTime); + } + + } catch (Exception e) { + logger.error("处理心跳数据异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 处理OTA升级进度数据 + * 设备上报格式:{"messageType": "ota_progress", "taskID": "xxx", "progress": 0-100, "status": "OK/Fail"} + */ + private void processOtaProgressData(String deviceId, JSONObject jsonObject) { + try { + logger.info("处理OTA升级进度数据 - DeviceID: {}", deviceId); + + String taskID = jsonObject.getString("taskID"); + Integer progress = jsonObject.getInteger("progress"); + String status = jsonObject.getString("status"); + + logger.info("OTA进度数据 - DeviceID: {}, TaskID: {}, Progress: {}%, Status: {}", + deviceId, taskID, progress, status); + + if (deviceDataService != null) { + deviceDataService.saveOtaProgressData(deviceId, taskID, progress, status); + } + + } catch (Exception e) { + logger.error("处理OTA升级进度数据异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 处理设备参数读取响应数据 + * 设备上报格式:{"messageType": "param_read", "seqNum": xxx, "params": {"Rs485_PollMs": xxx, ...}} + */ + private void processParamReadData(String deviceId, JSONObject jsonObject) { + try { + logger.info("处理设备参数读取响应 - DeviceID: {}", deviceId); + + Long seqNum = jsonObject.getLong("seqNum"); + JSONObject params = jsonObject.getJSONObject("params"); + + logger.info("参数读取响应 - DeviceID: {}, SeqNum: {}, Params: {}", + deviceId, seqNum, params != null ? params.toJSONString() : "null"); + + if (deviceDataService != null && params != null) { + deviceDataService.saveParamReadData(deviceId, seqNum, params); + } + + if (seqNum != null) { + // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁) + final Long finalSeqNum5 = seqNum; + ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum5, MSG_TYPE_PARAM_READ)); + } + + } catch (Exception e) { + logger.error("处理设备参数读取响应异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 处理设备参数写入响应数据 + * 设备上报格式:{"messageType": "param_write", "seqNum": xxx, "status": "OK/Fail", "failReason": "xxx"} + */ + private void processParamWriteData(String deviceId, JSONObject jsonObject) { + try { + logger.info("处理设备参数写入响应 - DeviceID: {}", deviceId); + + Long seqNum = jsonObject.getLong("seqNum"); + String status = jsonObject.getString("status"); + String failReason = jsonObject.getString("failReason"); + + logger.info("参数写入响应 - DeviceID: {}, SeqNum: {}, Status: {}, FailReason: {}", + deviceId, seqNum, status, failReason); + + if (deviceDataService != null) { + deviceDataService.saveParamWriteData(deviceId, seqNum, status, failReason); + } + + if (seqNum != null) { + // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁) + final Long finalSeqNum6 = seqNum; + ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum6, MSG_TYPE_PARAM_WRITE)); + } + + } catch (Exception e) { + logger.error("处理设备参数写入响应异常: {}", e.getMessage(), e); + } + } + + /** + * 发送ACK回复(兼容V1.0.1和V2.0.1协议) + * ACK格式:{"messageType": "xxx", "seqNum": xxx, "ackTime": UTC0时间戳(Uint32)} + * V2.0.1: 下行Topic格式: WgtRfid/server/${deviceId} + * V1.0.1: 下行Topic格式: ${deviceId}(直接使用设备ID) + */ + private void sendAck(String deviceId, Long seqNum, String messageType) { + // 默认使用V2.0.1协议 + sendAck(deviceId, seqNum, messageType, false); + } + + /** + * 发送ACK回复(兼容V1.0.1和V2.0.1协议) + * @param deviceId 设备ID + * @param seqNum 序列号 + * @param messageType 消息类型 + * @param isOldProtocol 是否为旧版协议(V1.0.1) + */ + private void sendAck(String deviceId, Long seqNum, String messageType, boolean isOldProtocol) { + String startMsg = "开始发送ACK - DeviceID: " + deviceId + ", Type: " + messageType + ", SeqNum: " + seqNum + ", Protocol: " + (isOldProtocol ? "V1.0.1" : "V2.0.1"); + logger.info(startMsg); + System.out.println(startMsg); + + if (seqNum == null) { + String errorMsg = "尝试发送ACK但seqNum为空,拒绝发送!DeviceID: " + deviceId; + logger.error(errorMsg); + System.out.println(errorMsg); + return; + } + + if (mqttClient == null) { + String errorMsg = "MQTT客户端未初始化,无法发送ACK!DeviceID: " + deviceId; + logger.error(errorMsg); + System.out.println(errorMsg); + return; + } + + if (!mqttClient.isConnected()) { + String errorMsg = "MQTT客户端未连接,无法发送ACK!DeviceID: " + deviceId; + logger.error(errorMsg); + System.out.println(errorMsg); + return; + } + + try { + JSONObject ackJson = new JSONObject(); + ackJson.put("messageType", messageType); + ackJson.put("seqNum", seqNum); + + long utcSeconds = System.currentTimeMillis() / 1000; + ackJson.put("ackTime", utcSeconds); + + String ackPayload = ackJson.toJSONString(); + + // 确定下行Topic + String downlinkTopic; + if (isOldProtocol) { + // V1.0.1: 直接使用deviceId作为Topic + downlinkTopic = deviceId; + } else { + // V2.0.1: 使用动态下行Topic + downlinkTopic = TOPIC_SERVER_DOWNLINK_PREFIX + deviceId; + } + + String prepareMsg = "准备发送ACK消息 - Topic: " + downlinkTopic + ", Payload: " + ackPayload; + logger.info(prepareMsg); + System.out.println(prepareMsg); + + MqttMessage ackMessage = new MqttMessage(ackPayload.getBytes()); + ackMessage.setQos(QOS); + String createMsg = "ACK消息创建成功 - QoS: " + QOS; + logger.info(createMsg); + System.out.println(createMsg); + + mqttClient.publish(downlinkTopic, ackMessage); + String successMsg = "ACK已成功发送 - DeviceID: " + deviceId + ", Type: " + messageType + ", SeqNum: " + seqNum + ", ackTime: " + utcSeconds + ", Topic: " + downlinkTopic; + logger.info(successMsg); + System.out.println(successMsg); + + } catch (MqttException e) { + String errorMsg = "发送ACK失败 - DeviceID: " + deviceId + ", SeqNum: " + seqNum + ", Error: " + e.getMessage(); + logger.error(errorMsg, e); + System.out.println(errorMsg); + } catch (Exception e) { + String errorMsg = "发送ACK时发生未知异常 - DeviceID: " + deviceId + ", SeqNum: " + seqNum + ", Error: " + e.getMessage(); + logger.error(errorMsg, e); + System.out.println(errorMsg); + } + } + + /** + * 断开连接 + */ + public void disconnect() { + try { + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.disconnect(); + mqttClient.close(); + logger.info("MQTT连接已断开"); + } + } catch (MqttException e) { + logger.error("断开MQTT连接异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 设备上线参数同步测试方法 + * 模拟设备上线后,服务器查询待同步的参数并发送给设备 + * 这是"设备期望值"功能的核心使用场景 + */ + private void testDeviceParamSync(String deviceId) { + try { + logger.info("=== 开始设备上线参数同步测试 ==="); + logger.info("设备ID: {}", deviceId); + + // 查询设备待同步的参数(状态为Pending的参数) + if (deviceDataService instanceof com.data.emqx.service.impl.DeviceDataServiceImpl) { + com.data.emqx.service.impl.DeviceDataServiceImpl impl = + (com.data.emqx.service.impl.DeviceDataServiceImpl) deviceDataService; + + // 获取 DeviceParamWriteMapper + // 这里为了测试,我们直接模拟查询待同步参数 + logger.info("查询设备待同步的参数..."); + + // 模拟查询结果(实际应该调用 deviceParamWriteMapper.selectPendingParamsByDeviceId) + // 这里打印一些测试信息 + logger.info("设备 {} 的待同步参数列表:", deviceId); + logger.info(" - Rs485_PollMs: 1000 (Pending)"); + logger.info(" - UWB_AlarmMs: 5000 (Pending)"); + logger.info(" - VoiceAlarmMs: 3000 (Pending)"); + logger.info(" - UWB_IgnoreCm: 50 (Pending)"); + logger.info(" - UWB_AlarmCm: 200 (Pending)"); + + // 模拟发送参数同步指令到设备 + sendParamSyncCommand(deviceId); + + logger.info("=== 设备上线参数同步测试完成 ==="); + } + + } catch (Exception e) { + logger.error("设备上线参数同步测试异常: {}", e.getMessage(), e); + } + } + + /** + * 发送参数同步指令到设备 + * V2.0.1: 下行Topic格式: WgtRfid/server/${deviceId} + */ + private void sendParamSyncCommand(String deviceId) { + try { + if (mqttClient == null || !mqttClient.isConnected()) { + logger.warn("MQTT客户端未连接,无法发送参数同步指令"); + return; + } + + // 构建参数同步指令 + JSONObject paramSyncJson = new JSONObject(); + paramSyncJson.put("messageType", "param_write"); + paramSyncJson.put("seqNum", System.currentTimeMillis()); + + // 参数列表 + JSONObject params = new JSONObject(); + params.put("Rs485_PollMs", 1000); + params.put("UWB_AlarmMs", 5000); + params.put("VoiceAlarmMs", 3000); + params.put("UWB_IgnoreCm", 50); + params.put("UWB_AlarmCm", 200); + paramSyncJson.put("params", params); + + String payload = paramSyncJson.toJSONString(); + String downlinkTopic = TOPIC_SERVER_DOWNLINK_PREFIX + deviceId; + + MqttMessage message = new MqttMessage(payload.getBytes()); + message.setQos(QOS); + + mqttClient.publish(downlinkTopic, message); + logger.info("参数同步指令已发送 - DeviceID: {}, Topic: {}, Payload: {}", + deviceId, downlinkTopic, payload); + + } catch (Exception e) { + logger.error("发送参数同步指令异常: {}", e.getMessage(), e); + } + } +} diff --git a/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java b/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java index 3563afb..8b751a8 100644 --- a/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java +++ b/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java @@ -7,8 +7,9 @@ package com.data.emqx; * @date 2024/12/16 14:37 */ +import com.alibaba.fastjson.JSON; +import com.data.emqx.domain.LoadCarDataSoure; import com.data.emqx.service.ILoaderCarDataSourceService; -import lombok.Synchronized; import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +18,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; -import java.util.Date; @Component public class MqttSubscribeSample { @@ -37,8 +37,6 @@ public class MqttSubscribeSample { @Value("${emqx.qos}") private String qos; - @Value("${emqx.pubTopic}") - private String pubTopic; @Autowired private ILoaderCarDataSourceService loaderCarDataSourceService; @@ -46,25 +44,24 @@ public class MqttSubscribeSample { public void getEmqsData() { - try { + try { MqttClient sampleClient = new MqttClient(broker, clientId); - // MQTT 连接选项 + // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(USERNAME); connOpts.setPassword(PASSWORD.toCharArray()); - // 保留会话 - connOpts.setCleanSession(true); + // 保留会话 + connOpts.setCleanSession(false); + // 自动连接 + connOpts.setAutomaticReconnect(true); + connOpts.setConnectionTimeout(10); + connOpts.setKeepAliveInterval(20); // 建立连接 sampleClient.connect(connOpts); - // 订阅主题 + // 订阅主题 sampleClient.subscribe(subscriptionTopic); - /*try{ - loaderCarDataSourceService.addLoaderCarDataSource(null); - }catch (Exception e){ - logger.error("消息接收失败:"+e.getMessage()); - }*/ - // 设置回调 + // 设置回调 sampleClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { @@ -74,16 +71,22 @@ public class MqttSubscribeSample { @Override public void messageArrived(String topic, MqttMessage message) { + String pubTopic=""; + String json = new String(message.getPayload()); try { // 接收到消息的回调 + logger.info("emqx接收到的数据:" + json); + LoadCarDataSoure loadCarDataSoure = JSON.parseObject(json, LoadCarDataSoure.class); + pubTopic = loadCarDataSoure.getDeviceId(); loaderCarDataSourceService.addLoaderCarDataSource(message.getPayload()); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 消息发布所需参数 MqttMessage message1 = new MqttMessage(("received success,received time:"+sdf.format(System.currentTimeMillis())).getBytes()); - message.setQos(Integer.valueOf(qos)); + message.setQos(0); // 发布消息 sampleClient.publish(pubTopic, message1); }catch (Exception e){ + logger.error("emqx接收到的数据:" + json); logger.error("消息接收异常:"+e.getMessage()); try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -114,85 +117,13 @@ public class MqttSubscribeSample { } catch (MqttException me) { - logger.error("reason " + me.getReasonCode()); - logger.error("msg " + me.getMessage()); - logger.error("loc " + me.getLocalizedMessage()); - logger.error("cause " + me.getCause()); - logger.error("excep " + me); + logger.error("reason " + me.getReasonCode()); + logger.error("msg " + me.getMessage()); + logger.error("loc " + me.getLocalizedMessage()); + logger.error("cause " + me.getCause()); + logger.error("excep " + me); me.printStackTrace(); } - - - - - - /* String HOST = "tcp://124.71.134.146:1883"; - String TOPIC = "emqx_topic"; - int qos =2; - String clientid = "emqx_clientid"; - String userName = "test"; - String passWord = "test"; - try { - // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 - MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); - - // MQTT的连接设置 - MqttConnectOptions options = new MqttConnectOptions(); - // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 - options.setCleanSession(true); - // 设置连接的用户名 - options.setUserName(userName); - // 设置连接的密码 - options.setPassword(passWord.toCharArray()); - // 设置超时时间 单位为秒 - options.setConnectionTimeout(10); - // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 - options.setKeepAliveInterval(20); - // 自动重连 - options.setAutomaticReconnect(true); - - // 设置回调函数 - client.setCallback(new MqttCallback() { - - public void connectionLost(Throwable cause) { - // 连接丢失后,一般在这里面进行重连 - System.out.println("连接断开,可以做重连"); - } - - public void messageArrived(String topic, MqttMessage message) throws Exception { - *//** - * 订阅到消息后的回调 - * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker - * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoSl,QoS2且客户端未进行ack确认的消息都将由 - * broker服务器再次发送到客户端 - *//* - System.out.println("messageId:" +message.getId()); - System.out.println("接收消息主题:" + topic); - System.out.println("接收消息Qos:" + message.getQos()); - System.out.println("接收消息内容:" + new String(message.getPayload())); - System.out.println(); - } - - public void deliveryComplete(IMqttDeliveryToken token) { - *//** - * 消息发布完成且收到ack确认后的回调 - * QoS0:消息被网络发出后触发一次 - * QoS1:当收到broker的PUBACK消息后触发 - * QoS2:当收到broer的PUBCOMP消息后触发 - *//* - System.out.println("deliveryComplete---------"+ token.isComplete()); - } - - }); - - // 建立连接 - client.connect(options); - - //订阅消息 - client.subscribe(TOPIC, qos); - } catch (Exception e) { - e.printStackTrace(); - }*/ } } diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarm.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarm.java new file mode 100644 index 0000000..23a02c2 --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarm.java @@ -0,0 +1,36 @@ +package com.data.emqx.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author licd + * @className DeviceAlarm + * @description 设备告警数据 + * @date 2026/03/23 + */ +@Data +public class DeviceAlarm implements Serializable { + + //主键 + private String id; + //设备ID号 + private String deviceId; + //序列号 + private Long seqNum; + //发送时间(UTC0时间戳) + private Long sendTime; + //UWB异常数据 + private String uwb; + //是否已处理:1:已处理 + private int flag; + //创建时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + //更新时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; +} diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarmUwb.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarmUwb.java new file mode 100644 index 0000000..1b023bb --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarmUwb.java @@ -0,0 +1,40 @@ +/* + * @Author: zxf 1532322479@qq.com + * @Date: 2026-03-24 10:42:27 + * @LastEditors: zxf 1532322479@qq.com + * @LastEditTime: 2026-03-24 10:51:18 + * @FilePath: \bs-jxc-test1\bs-jxc\bs-loader\src\main\java\com\data\emqx\domain\DeviceAlarmUwb.java + * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE + */ +package com.data.emqx.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author licd + * @className DeviceAlarmUwb + * @description 设备告警UWB数据 + * @date 2026/03/24 + */ +@Data +public class DeviceAlarmUwb implements Serializable { + + //主键 + private String id; + //告警ID(关联emqx_device_alarm表的id) + private String alarmId; + //UWB节点ID + private String uwbId; + //距离 + private String dist; + //创建时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + //更新时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; +} diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceHeart.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceHeart.java new file mode 100644 index 0000000..dc2340a --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceHeart.java @@ -0,0 +1,36 @@ +package com.data.emqx.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author licd + * @className DeviceHeart + * @description 设备心跳数据 + * @date 2026/03/23 + */ +@Data +public class DeviceHeart implements Serializable { + + //主键 + private String id; + //设备ID号 + private String deviceId; + //系统电压 + private Double sysVol; + //网络信号强度 + private Integer netRSSI; + //发送时间(UTC0时间戳) + private Long sendTime; + //是否已处理:1:已处理 + private int flag; + //创建时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + //更新时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; +} diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceParamWrite.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceParamWrite.java new file mode 100644 index 0000000..0b1535a --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceParamWrite.java @@ -0,0 +1,88 @@ +package com.data.emqx.domain; + +import java.util.Date; + +public class DeviceParamWrite { + + private String id; + private String deviceId; + private Long seqNum; + private String paramKey; + private String paramValue; + private String status; + private String failReason; + private Date createTime; + private Date updateTime; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public Long getSeqNum() { + return seqNum; + } + + public void setSeqNum(Long seqNum) { + this.seqNum = seqNum; + } + + public String getParamKey() { + return paramKey; + } + + public void setParamKey(String paramKey) { + this.paramKey = paramKey; + } + + public String getParamValue() { + return paramValue; + } + + public void setParamValue(String paramValue) { + this.paramValue = paramValue; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getFailReason() { + return failReason; + } + + public void setFailReason(String failReason) { + this.failReason = failReason; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceWeightRfid.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceWeightRfid.java new file mode 100644 index 0000000..44b5820 --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceWeightRfid.java @@ -0,0 +1,38 @@ +package com.data.emqx.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author licd + * @className DeviceWeightRfid + * @description 设备称重&标签数据 + * @date 2026/03/23 + */ +@Data +public class DeviceWeightRfid implements Serializable { + + //主键 + private String id; + //设备ID号 + private String deviceId; + //序列号 + private Long seqNum; + //发送时间(UTC0时间戳) + private Long sendTime; + //重量(kg) + private Integer weight; + //RFID标签(逗号分隔) + private String rfid; + //是否已处理:1:已处理 + private int flag; + //创建时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + //更新时间 + @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; +} diff --git a/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java b/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java index 14af14b..352a048 100644 --- a/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java +++ b/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java @@ -44,6 +44,8 @@ public class LoadCarDataSoure implements Serializable { private int flag; private String tag; + //关联的emqx_load_car表的ID + private String loadCarId; //创建时间 @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; @@ -175,4 +177,12 @@ public class LoadCarDataSoure implements Serializable { public void setTag(String tag) { this.tag = tag; } + + public String getLoadCarId() { + return loadCarId; + } + + public void setLoadCarId(String loadCarId) { + this.loadCarId = loadCarId; + } } diff --git a/bs-loader/src/main/java/com/data/emqx/domain/OtaProgress.java b/bs-loader/src/main/java/com/data/emqx/domain/OtaProgress.java new file mode 100644 index 0000000..5cad394 --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/domain/OtaProgress.java @@ -0,0 +1,79 @@ +package com.data.emqx.domain; + +import java.util.Date; + +public class OtaProgress { + + private String id; + private String deviceId; + private String taskId; + private Integer progress; + private String status; + private String failReason; + private Date createTime; + private Date updateTime; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public Integer getProgress() { + return progress; + } + + public void setProgress(Integer progress) { + this.progress = progress; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getFailReason() { + return failReason; + } + + public void setFailReason(String failReason) { + this.failReason = failReason; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/bs-loader/src/main/java/com/data/emqx/mapper/DeviceDataMapper.java b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceDataMapper.java new file mode 100644 index 0000000..b268406 --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceDataMapper.java @@ -0,0 +1,51 @@ +package com.data.emqx.mapper; + +import com.data.emqx.domain.DeviceAlarm; +import com.data.emqx.domain.DeviceAlarmUwb; +import com.data.emqx.domain.DeviceHeart; +import com.data.emqx.domain.DeviceWeightRfid; + +/** + * @author licd + * @className DeviceDataMapper + * @description 设备数据Mapper + * @date 2026/03/23 + */ +public interface DeviceDataMapper { + + /** + * 统计称重&标签数据数量 + */ + int countWeightRfidByDeviceIdAndSeqNum(String deviceId, Long seqNum); + + /** + * 插入称重&标签数据 + */ + void insertWeightRfid(DeviceWeightRfid deviceWeightRfid); + + /** + * 统计告警数据数量 + */ + int countAlarmByDeviceIdAndSeqNum(String deviceId, Long seqNum); + + /** + * 插入告警数据 + */ + void insertAlarm(DeviceAlarm deviceAlarm); + + /** + * 统计心跳数据数量 + */ + int countHeartByDeviceIdAndSendTime(String deviceId, Long sendTime); + + /** + * 插入心跳数据 + */ + void insertHeart(DeviceHeart deviceHeart); + + /** + * 插入告警UWB数据 + */ + void insertAlarmUwb(DeviceAlarmUwb deviceAlarmUwb); + +} diff --git a/bs-loader/src/main/java/com/data/emqx/mapper/DeviceParamWriteMapper.java b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceParamWriteMapper.java new file mode 100644 index 0000000..a3e69bb --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceParamWriteMapper.java @@ -0,0 +1,34 @@ +package com.data.emqx.mapper; + +import com.data.emqx.domain.DeviceParamWrite; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +public interface DeviceParamWriteMapper { + + /** + * 插入设备参数写入记录 + */ + void insertParamWrite(DeviceParamWrite paramWrite); + + /** + * 更新参数写入状态 + */ + void updateStatusByDeviceIdAndParamKey(DeviceParamWrite paramWrite); + + /** + * 根据设备ID和参数名查询参数 + */ + DeviceParamWrite selectByDeviceIdAndParamKey(@Param("deviceId") String deviceId, @Param("paramKey") String paramKey); + + /** + * 查询设备所有待同步的参数(状态为Pending) + */ + List selectPendingParamsByDeviceId(String deviceId); + + /** + * 根据设备ID查询所有参数记录 + */ + List selectByDeviceId(String deviceId); +} diff --git a/bs-loader/src/main/java/com/data/emqx/mapper/OtaProgressMapper.java b/bs-loader/src/main/java/com/data/emqx/mapper/OtaProgressMapper.java new file mode 100644 index 0000000..0103ed0 --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/mapper/OtaProgressMapper.java @@ -0,0 +1,21 @@ +package com.data.emqx.mapper; + +import com.data.emqx.domain.OtaProgress; + +public interface OtaProgressMapper { + + /** + * 插入OTA升级进度记录 + */ + void insertOtaProgress(OtaProgress otaProgress); + + /** + * 根据设备ID和任务ID更新进度 + */ + void updateProgressByDeviceIdAndTaskId(OtaProgress otaProgress); + + /** + * 根据设备ID和任务ID查询进度 + */ + OtaProgress selectByDeviceIdAndTaskId(String deviceId, String taskId); +} diff --git a/bs-loader/src/main/java/com/data/emqx/service/IDeviceDataService.java b/bs-loader/src/main/java/com/data/emqx/service/IDeviceDataService.java new file mode 100644 index 0000000..701587c --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/service/IDeviceDataService.java @@ -0,0 +1,99 @@ +package com.data.emqx.service; + +/** + * @author licd + * @className IDeviceDataService + * @description 设备数据处理服务接口 + * @date 2026/03/17 + */ + +public interface IDeviceDataService { + + /** + * 检查是否已存在相同的称重&标签数据 + * + * @param deviceId 设备ID + * @param seqNum 序列号 + * @return true表示已存在,false表示不存在 + */ + boolean existsWeightRfidData(String deviceId, Long seqNum); + + /** + * 保存称重&标签数据 + * + * @param deviceId 设备ID + * @param seqNum 序列号 + * @param sendTime 发送时间(UTC0时间戳) + * @param weight 重量 + * @param rfid RFID标签(逗号分隔的字符串) + */ + void saveWeightRfidData(String deviceId, Long seqNum, Long sendTime, Integer weight, String rfid); + + /** + * 检查是否已存在相同的告警数据 + * + * @param deviceId 设备ID + * @param seqNum 序列号 + * @return true表示已存在,false表示不存在 + */ + boolean existsAlarmData(String deviceId, Long seqNum); + + /** + * 保存告警数据 + * + * @param deviceId 设备ID + * @param seqNum 序列号 + * @param sendTime 发送时间(UTC0时间戳) + * @param uwb UWB异常数据(格式化字符串) + * @param uwbArray UWB原始数据数组 + */ + void saveAlarmData(String deviceId, Long seqNum, Long sendTime, String uwb, com.alibaba.fastjson.JSONArray uwbArray); + + /** + * 检查是否已存在相同的心跳数据 + * + * @param deviceId 设备ID + * @param sendTime 发送时间(UTC0时间戳) + * @return true表示已存在,false表示不存在 + */ + boolean existsHeartData(String deviceId, Long sendTime); + + /** + * 保存心跳数据 + * + * @param deviceId 设备ID + * @param sysVol 系统电压(Float类型) + * @param netRSSI 网络信号强度 + * @param sendTime 发送时间(UTC0时间戳) + */ + void saveHeartData(String deviceId, Double sysVol, Integer netRSSI, Long sendTime); + + /** + * V2.0.1 保存OTA升级进度数据 + * + * @param deviceId 设备ID + * @param taskID 升级任务ID + * @param progress 升级进度(0-100) + * @param status 升级状态(OK/Fail) + */ + void saveOtaProgressData(String deviceId, String taskID, Integer progress, String status); + + /** + * V2.0.1 保存设备参数读取响应数据 + * + * @param deviceId 设备ID + * @param seqNum 序列号 + * @param params 参数JSON对象 + */ + void saveParamReadData(String deviceId, Long seqNum, com.alibaba.fastjson.JSONObject params); + + /** + * V2.0.1 保存设备参数写入响应数据 + * + * @param deviceId 设备ID + * @param seqNum 序列号 + * @param status 写入状态(OK/Fail) + * @param failReason 失败原因(可选) + */ + void saveParamWriteData(String deviceId, Long seqNum, String status, String failReason); +} diff --git a/bs-loader/src/main/java/com/data/emqx/service/impl/DeviceDataServiceImpl.java b/bs-loader/src/main/java/com/data/emqx/service/impl/DeviceDataServiceImpl.java new file mode 100644 index 0000000..913e894 --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/service/impl/DeviceDataServiceImpl.java @@ -0,0 +1,402 @@ +package com.data.emqx.service.impl; + +/** + * @author licd + * @className DeviceDataServiceImpl + * @description 设备数据处理服务实现 + * @date 2026/03/17 + */ + +import com.data.emqx.domain.DeviceAlarm; +import com.data.emqx.domain.DeviceAlarmUwb; +import com.data.emqx.domain.DeviceHeart; +import com.data.emqx.domain.DeviceParamWrite; +import com.data.emqx.domain.DeviceWeightRfid; +import com.data.emqx.domain.LoadCarDataSoure; +import com.data.emqx.domain.OtaProgress; +import com.data.emqx.mapper.DeviceDataMapper; +import com.data.emqx.mapper.DeviceParamWriteMapper; +import com.data.emqx.mapper.LoadCarDataSoureMapper; +import com.data.emqx.mapper.OtaProgressMapper; +import com.data.emqx.service.IDeviceDataService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; +import java.util.UUID; + +@Service +public class DeviceDataServiceImpl implements IDeviceDataService { + + private static final Logger logger = LoggerFactory.getLogger(DeviceDataServiceImpl.class); + + @Autowired + private DeviceDataMapper deviceDataMapper; + + @Autowired + private LoadCarDataSoureMapper loadCarDataSoureMapper; + + @Autowired + private OtaProgressMapper otaProgressMapper; + + @Autowired + private DeviceParamWriteMapper deviceParamWriteMapper; + + private final SimpleDateFormat utcFormat; + private final SimpleDateFormat timestampFormat; + + public DeviceDataServiceImpl() { + utcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + utcFormat.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai")); // 使用东八区时区 + + timestampFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS"); + timestampFormat.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai")); // 使用东八区时区 + } + + /** + * 检查是否已存在相同的称重&标签数据 + */ + @Override + public boolean existsWeightRfidData(String deviceId, Long seqNum) { + try { + int count = deviceDataMapper.countWeightRfidByDeviceIdAndSeqNum(deviceId, seqNum); + return count > 0; + } catch (Exception e) { + logger.error("检查称重数据是否存在异常: {}", e.getMessage(), e); + return false; + } + } + + /** + * 保存称重&标签数据 + */ + @Override + public void saveWeightRfidData(String deviceId, Long seqNum, Long sendTime, Integer weight, String rfid) { + try { + Date sendDate = new Date(sendTime * 1000); + String formattedTime = utcFormat.format(sendDate); + String timestampTime = timestampFormat.format(sendDate); + + logger.info("保存称重数据 - DeviceID: {}, SeqNum: {}, SendTime(UTC0): {}, Weight: {}kg, RFID: {}", + deviceId, seqNum, formattedTime, weight, rfid); + + // 创建 LoadCarDataSoure 对象用于保存数据 + LoadCarDataSoure loadCarDataSoure = new LoadCarDataSoure(); + String loadCarId = UUID.randomUUID().toString(); + loadCarDataSoure.setId(loadCarId); + loadCarDataSoure.setDeviceId(deviceId); + loadCarDataSoure.setDeviceStatus("OK"); // 设备状态,默认正常 + loadCarDataSoure.setSendTime(timestampTime); // 时间格式转换为 yyyyMMddHHmmssSSS 格式 + loadCarDataSoure.setWeight(weight != null ? weight.doubleValue() : 0.0); + loadCarDataSoure.setWeightStatus(1); // 重量状态,1表示有变化 + loadCarDataSoure.setWeightTime(timestampTime); // 称重时间,使用 yyyyMMddHHmmssSSS 格式 + loadCarDataSoure.setFlag(0); // 是否已处理,0表示未处理 + loadCarDataSoure.setTag("weight"); // 标签,用于标识数据类型 + loadCarDataSoure.setCreateTime(new Date()); + loadCarDataSoure.setUpdateTime(new Date()); + + // 保存到 emqx_load_car 表 + loadCarDataSoureMapper.addLoadCarDataSourceMapper(loadCarDataSoure); + logger.info("称重数据保存到 emqx_load_car 成功 - DeviceID: {}, LoadCarID: {}", deviceId, loadCarId); + + // 处理 RFID 数据,保存到 emqx_unload_car 表 + if (rfid != null && !rfid.isEmpty()) { + String[] rfidArray = rfid.split(","); + LoadCarDataSoure unloadCarDataSoure = new LoadCarDataSoure(); + unloadCarDataSoure.setId(UUID.randomUUID().toString()); + unloadCarDataSoure.setDeviceId(deviceId); + unloadCarDataSoure.setDeviceStatus("OK"); + unloadCarDataSoure.setSendTime(timestampTime); // 时间格式转换为 yyyyMMddHHmmssSSS 格式 + unloadCarDataSoure.setRfidNum(rfidArray.length); + unloadCarDataSoure.setRfidNo(rfid); + unloadCarDataSoure.setRfidTime(timestampTime); // RFID 时间,使用 yyyyMMddHHmmssSSS 格式 + unloadCarDataSoure.setFlag(0); + unloadCarDataSoure.setTag("rfid"); + unloadCarDataSoure.setLoadCarId(loadCarId); // 关联 emqx_load_car 表的 ID + unloadCarDataSoure.setCreateTime(new Date()); + unloadCarDataSoure.setUpdateTime(new Date()); + + // 保存到 emqx_unload_car 表 + loadCarDataSoureMapper.addUnLoadCarDataSourceMapper(unloadCarDataSoure); + logger.info("RFID数据保存到 emqx_unload_car 成功 - DeviceID: {}, LoadCarID: {}, RFID数量: {}", + deviceId, loadCarId, rfidArray.length); + } + + // 同时保存到原有的 emqx_device_weight_rfid 表,确保数据完整性 + DeviceWeightRfid deviceWeightRfid = new DeviceWeightRfid(); + deviceWeightRfid.setId(UUID.randomUUID().toString()); + deviceWeightRfid.setDeviceId(deviceId); + deviceWeightRfid.setSeqNum(seqNum); + deviceWeightRfid.setSendTime(sendTime); + deviceWeightRfid.setWeight(weight); + deviceWeightRfid.setRfid(rfid); + deviceWeightRfid.setFlag(0); + deviceWeightRfid.setCreateTime(new Date()); + deviceWeightRfid.setUpdateTime(new Date()); + + deviceDataMapper.insertWeightRfid(deviceWeightRfid); + logger.info("称重数据保存到 emqx_device_weight_rfid 成功 - DeviceID: {}, SeqNum: {}", deviceId, seqNum); + + } catch (Exception e) { + logger.error("保存称重数据异常: {}", e.getMessage(), e); + } + } + + /** + * 检查是否已存在相同的告警数据 + */ + @Override + public boolean existsAlarmData(String deviceId, Long seqNum) { + try { + int count = deviceDataMapper.countAlarmByDeviceIdAndSeqNum(deviceId, seqNum); + return count > 0; + } catch (Exception e) { + logger.error("检查告警数据是否存在异常: {}", e.getMessage(), e); + return false; + } + } + + /** + * 检查是否已存在相同的心跳数据 + */ + @Override + public boolean existsHeartData(String deviceId, Long sendTime) { + try { + int count = deviceDataMapper.countHeartByDeviceIdAndSendTime(deviceId, sendTime); + return count > 0; + } catch (Exception e) { + logger.error("检查心跳数据是否存在异常: {}", e.getMessage(), e); + return false; + } + } + + /** + * 保存告警数据 + */ + @Override + public void saveAlarmData(String deviceId, Long seqNum, Long sendTime, String uwb, com.alibaba.fastjson.JSONArray uwbArray) { + try { + Date sendDate = new Date(sendTime * 1000); + String formattedTime = utcFormat.format(sendDate); + + logger.info("保存告警数据 - DeviceID: {}, SeqNum: {}, SendTime(UTC0): {}, UWB: {}", + deviceId, seqNum, formattedTime, uwb); + + // 创建对象并设置值 + DeviceAlarm deviceAlarm = new DeviceAlarm(); + String alarmId = UUID.randomUUID().toString(); + deviceAlarm.setId(alarmId); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setSeqNum(seqNum); + deviceAlarm.setSendTime(sendTime); + deviceAlarm.setUwb(uwb); + deviceAlarm.setFlag(0); + deviceAlarm.setCreateTime(new Date()); + deviceAlarm.setUpdateTime(new Date()); + + // 保存到数据库 + deviceDataMapper.insertAlarm(deviceAlarm); + logger.info("告警数据保存成功 - DeviceID: {}, SeqNum: {}", deviceId, seqNum); + + // 保存UWB数据到关联表 + if (uwbArray != null && !uwbArray.isEmpty()) { + for (int i = 0; i < uwbArray.size(); i++) { + com.alibaba.fastjson.JSONObject uwbObj = uwbArray.getJSONObject(i); + if (uwbObj != null) { + String id = uwbObj.getString("ID"); + String dist = uwbObj.getString("Dist"); + + if (id != null && dist != null) { + DeviceAlarmUwb deviceAlarmUwb = new DeviceAlarmUwb(); + deviceAlarmUwb.setId(UUID.randomUUID().toString()); + deviceAlarmUwb.setAlarmId(alarmId); + deviceAlarmUwb.setUwbId(id); + deviceAlarmUwb.setDist(dist); + deviceAlarmUwb.setCreateTime(new Date()); + deviceAlarmUwb.setUpdateTime(new Date()); + + deviceDataMapper.insertAlarmUwb(deviceAlarmUwb); + logger.info("UWB数据保存成功 - AlarmID: {}, UWB ID: {}, Dist: {}", alarmId, id, dist); + } + } + } + } + + } catch (Exception e) { + logger.error("保存告警数据异常: {}", e.getMessage(), e); + } + } + + /** + * 保存心跳数据 + */ + @Override + public void saveHeartData(String deviceId, Double sysVol, Integer netRSSI, Long sendTime) { + try { + Date sendDate = new Date(sendTime * 1000); + String formattedTime = utcFormat.format(sendDate); + + logger.info("保存心跳数据 - DeviceID: {}, sysVol: {}, netRSSI: {}, SendTime(UTC0): {}", + deviceId, sysVol, netRSSI, formattedTime); + + // 创建对象并设置值 + DeviceHeart deviceHeart = new DeviceHeart(); + deviceHeart.setId(UUID.randomUUID().toString()); + deviceHeart.setDeviceId(deviceId); + deviceHeart.setSysVol(sysVol); + deviceHeart.setNetRSSI(netRSSI); + deviceHeart.setSendTime(sendTime); + deviceHeart.setFlag(0); + deviceHeart.setCreateTime(new Date()); + deviceHeart.setUpdateTime(new Date()); + + // 保存到数据库 + deviceDataMapper.insertHeart(deviceHeart); + logger.info("心跳数据保存成功 - DeviceID: {}", deviceId); + + } catch (Exception e) { + logger.error("保存心跳数据异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 保存OTA升级进度数据 + */ + @Override + public void saveOtaProgressData(String deviceId, String taskID, Integer progress, String status) { + try { + logger.info("保存OTA升级进度数据 - DeviceID: {}, TaskID: {}, Progress: {}%, Status: {}", + deviceId, taskID, progress, status); + + // 查询是否已存在该任务记录 + OtaProgress existingProgress = otaProgressMapper.selectByDeviceIdAndTaskId(deviceId, taskID); + + if (existingProgress != null) { + // 更新现有记录 + existingProgress.setProgress(progress); + existingProgress.setStatus(status); + existingProgress.setUpdateTime(new Date()); + otaProgressMapper.updateProgressByDeviceIdAndTaskId(existingProgress); + logger.info("OTA升级进度数据更新成功 - DeviceID: {}, TaskID: {}", deviceId, taskID); + } else { + // 创建新记录 + OtaProgress otaProgress = new OtaProgress(); + otaProgress.setId(UUID.randomUUID().toString()); + otaProgress.setDeviceId(deviceId); + otaProgress.setTaskId(taskID); + otaProgress.setProgress(progress); + otaProgress.setStatus(status); + otaProgress.setCreateTime(new Date()); + otaProgress.setUpdateTime(new Date()); + otaProgressMapper.insertOtaProgress(otaProgress); + logger.info("OTA升级进度数据保存成功 - DeviceID: {}, TaskID: {}", deviceId, taskID); + } + + } catch (Exception e) { + logger.error("保存OTA升级进度数据异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 保存设备参数读取响应数据 + */ + @Override + public void saveParamReadData(String deviceId, Long seqNum, com.alibaba.fastjson.JSONObject params) { + try { + logger.info("保存设备参数读取响应 - DeviceID: {}, SeqNum: {}, Params: {}", + deviceId, seqNum, params != null ? params.toJSONString() : "null"); + + // 解析并记录各参数 + if (params != null) { + Integer rs485PollMs = params.getInteger("Rs485_PollMs"); + Integer uwbAlarmMs = params.getInteger("UWB_AlarmMs"); + Integer voiceAlarmMs = params.getInteger("VoiceAlarmMs"); + Integer uwbIgnoreCm = params.getInteger("UWB_IgnoreCm"); + Integer uwbAlarmCm = params.getInteger("UWB_AlarmCm"); + String iccid = params.getString("ICCID"); + + logger.info("设备参数详情 - Rs485_PollMs: {}, UWB_AlarmMs: {}, VoiceAlarmMs: {}, UWB_IgnoreCm: {}, UWB_AlarmCm: {}, ICCID: {}", + rs485PollMs, uwbAlarmMs, voiceAlarmMs, uwbIgnoreCm, uwbAlarmCm, iccid); + } + + // TODO: 需要创建 emqx_device_param_read 表和对应的 Domain/Mapper + // 目前仅记录日志,后续需要实现数据库保存逻辑 + + logger.info("设备参数读取响应保存成功 - DeviceID: {}, SeqNum: {}", deviceId, seqNum); + + } catch (Exception e) { + logger.error("保存设备参数读取响应异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 保存设备参数写入响应数据 + * 更新参数写入状态,用于实现"设备期望值"功能 + */ + @Override + public void saveParamWriteData(String deviceId, Long seqNum, String status, String failReason) { + try { + logger.info("保存设备参数写入响应 - DeviceID: {}, SeqNum: {}, Status: {}, FailReason: {}", + deviceId, seqNum, status, failReason); + + // 创建更新对象 + DeviceParamWrite paramWrite = new DeviceParamWrite(); + paramWrite.setDeviceId(deviceId); + paramWrite.setSeqNum(seqNum); + paramWrite.setStatus(status); + paramWrite.setFailReason(failReason); + paramWrite.setUpdateTime(new Date()); + + // 更新状态(这里简化处理,实际应该根据具体参数名更新) + // 在实际场景中,设备会返回具体哪个参数写入成功/失败 + // 这里需要根据业务需求进一步完善 + + logger.info("设备参数写入响应保存成功 - DeviceID: {}, SeqNum: {}, Status: {}", deviceId, seqNum, status); + + } catch (Exception e) { + logger.error("保存设备参数写入响应异常: {}", e.getMessage(), e); + } + } + + /** + * V2.0.1 保存设备参数期望值(用于设备上线后自动拉平配置) + */ + public void saveParamExpectation(String deviceId, String paramKey, String paramValue) { + try { + logger.info("保存设备参数期望值 - DeviceID: {}, ParamKey: {}, ParamValue: {}", + deviceId, paramKey, paramValue); + + // 查询是否已存在该参数记录 + DeviceParamWrite existingParam = deviceParamWriteMapper.selectByDeviceIdAndParamKey(deviceId, paramKey); + + if (existingParam != null) { + // 更新现有记录的期望值 + existingParam.setParamValue(paramValue); + existingParam.setStatus("Pending"); + existingParam.setFailReason(null); + existingParam.setUpdateTime(new Date()); + deviceParamWriteMapper.updateStatusByDeviceIdAndParamKey(existingParam); + logger.info("设备参数期望值更新成功 - DeviceID: {}, ParamKey: {}", deviceId, paramKey); + } else { + // 创建新记录 + DeviceParamWrite paramWrite = new DeviceParamWrite(); + paramWrite.setId(UUID.randomUUID().toString()); + paramWrite.setDeviceId(deviceId); + paramWrite.setParamKey(paramKey); + paramWrite.setParamValue(paramValue); + paramWrite.setStatus("Pending"); + paramWrite.setCreateTime(new Date()); + paramWrite.setUpdateTime(new Date()); + deviceParamWriteMapper.insertParamWrite(paramWrite); + logger.info("设备参数期望值保存成功 - DeviceID: {}, ParamKey: {}", deviceId, paramKey); + } + + } catch (Exception e) { + logger.error("保存设备参数期望值异常: {}", e.getMessage(), e); + } + } +} diff --git a/bs-loader/src/main/java/com/data/emqx/test/DeviceDataTest.java b/bs-loader/src/main/java/com/data/emqx/test/DeviceDataTest.java new file mode 100644 index 0000000..7dfde87 --- /dev/null +++ b/bs-loader/src/main/java/com/data/emqx/test/DeviceDataTest.java @@ -0,0 +1,113 @@ +package com.data.emqx.test; + +import com.LoaderApplication; +import com.data.emqx.service.IDeviceDataService; +import org.springframework.boot.SpringApplication; +import org.springframework.context.ApplicationContext; + +/** + * @author licd + * @className DeviceDataTest + * @description 设备数据保存测试 + * @date 2026/03/23 + */ +public class DeviceDataTest { + + public static void main(String[] args) { + System.out.println("=== 设备数据保存测试 ==="); + + // 启动Spring Boot应用 + ApplicationContext context = SpringApplication.run(LoaderApplication.class, args); + + // 获取设备数据服务 + IDeviceDataService deviceDataService = context.getBean(IDeviceDataService.class); + + if (deviceDataService == null) { + System.err.println("无法获取DeviceDataService实例"); + return; + } + + System.out.println("成功获取DeviceDataService实例"); + + // 测试保存称重&标签数据 + testSaveWeightRfidData(deviceDataService); + + // 测试保存告警数据 + testSaveAlarmData(deviceDataService); + + // 测试保存心跳数据 + testSaveHeartData(deviceDataService); + + System.out.println("=== 测试完成 ==="); + + // 关闭应用 + System.exit(0); + } + + /** + * 测试保存称重&标签数据 + */ + private static void testSaveWeightRfidData(IDeviceDataService deviceDataService) { + System.out.println("\n=== 测试保存称重&标签数据 ==="); + try { + String deviceId = "test_device_001"; + Long seqNum = 123456789L; + Long sendTime = System.currentTimeMillis() / 1000; + Integer weight = 1000; + String rfid = "e280111122223333,b001111122223333"; + + deviceDataService.saveWeightRfidData(deviceId, seqNum, sendTime, weight, rfid); + System.out.println("称重&标签数据保存测试成功"); + } catch (Exception e) { + System.err.println("称重&标签数据保存测试失败: " + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * 测试保存告警数据 + */ + private static void testSaveAlarmData(IDeviceDataService deviceDataService) { + System.out.println("\n=== 测试保存告警数据 ==="); + try { + String deviceId = "test_device_001"; + Long seqNum = 987654321L; + Long sendTime = System.currentTimeMillis() / 1000; + String uwb = "ID:1234,dist:2000; ID:5678,dist:1500"; + com.alibaba.fastjson.JSONArray uwbArray = new com.alibaba.fastjson.JSONArray(); + com.alibaba.fastjson.JSONObject uwbObj1 = new com.alibaba.fastjson.JSONObject(); + uwbObj1.put("ID", "1234"); + uwbObj1.put("Dist", "2000 cm"); + uwbArray.add(uwbObj1); + com.alibaba.fastjson.JSONObject uwbObj2 = new com.alibaba.fastjson.JSONObject(); + uwbObj2.put("ID", "5678"); + uwbObj2.put("Dist", "1500 cm"); + uwbArray.add(uwbObj2); + + deviceDataService.saveAlarmData(deviceId, seqNum, sendTime, uwb, uwbArray); + System.out.println("告警数据保存测试成功"); + } catch (Exception e) { + System.err.println("告警数据保存测试失败: " + e.getMessage()); + e.printStackTrace(); + } + } + + /** + * 测试保存心跳数据 + */ + private static void testSaveHeartData(IDeviceDataService deviceDataService) { + System.out.println("\n=== 测试保存心跳数据 ==="); + try { + String deviceId = "test_device_001"; + Double sysVol = 12.8; + Integer netRSSI = -75; + Long sendTime = System.currentTimeMillis() / 1000; + + deviceDataService.saveHeartData(deviceId, sysVol, netRSSI, sendTime); + System.out.println("心跳数据保存测试成功"); + } catch (Exception e) { + System.err.println("心跳数据保存测试失败: " + e.getMessage()); + e.printStackTrace(); + } + } +} diff --git a/bs-loader/src/main/resources/application-dev.yml b/bs-loader/src/main/resources/application-dev.yml index b93a1f5..b75f2bb 100644 --- a/bs-loader/src/main/resources/application-dev.yml +++ b/bs-loader/src/main/resources/application-dev.yml @@ -32,13 +32,11 @@ spring: dynamic: primary: master datasource: - master: - # url: jdbc:mysql://localhost:3306/bs-jxc-dev?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true - # username: bfpt_db - # password: Bfpt@2024# - url: jdbc:mysql://localhost:3306/bs-jxc-dev?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true - username: root - password: root + master: + url: jdbc:mysql://127.0.0.1:3306/bs-jxc-dev?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false + username: bfpt_db + password: Bfpt@2024# + driverClassName: com.mysql.cj.jdbc.Driver # 初始连接数 initialSize: 5 # 最小连接池数量 @@ -93,7 +91,7 @@ mybatis-plus: global-config: banner: false db-config: - id-type: + id-type: id_worker logic-delete-field: del_flag logic-delete-value: 2 logic-not-delete-value: 0 @@ -121,5 +119,20 @@ emqx: broker: "tcp://124.71.134.146:1883" clientid: "emqx_clientid" subscriptiontopic: "emqx_topic" - qos: 1 - pubTopic: "emqx_topic_return" \ No newline at end of file + qos: 2 + pubTopic: "emqx_topic_return" + +# 集疏运终端设备MQTT配置 +device: + mqtt: + broker: "tcp://127.0.0.1:1883" + username: "emqx_public" + password: "emqx_public" + clientId: "server_client_001" + +logging: + level: + root: INFO + org.springframework.web: ERROR + file: + name: C:\EMQX\log\emqx.log \ No newline at end of file diff --git a/bs-loader/src/main/resources/application-prod.yml b/bs-loader/src/main/resources/application-prod.yml index dcb4d10..49d4b18 100644 --- a/bs-loader/src/main/resources/application-prod.yml +++ b/bs-loader/src/main/resources/application-prod.yml @@ -33,9 +33,9 @@ spring: primary: master datasource: master: - url: jdbc:mysql://localhost:3306/bs-jxc-dev?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false - username: root - password: root + url: jdbc:mysql://127.0.0.1:3306/bs-jxc-dev?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true + username: bfpt_db + password: Bfpt@2024# driverClassName: com.mysql.cj.jdbc.Driver # 初始连接数 initialSize: 5 @@ -91,7 +91,7 @@ mybatis-plus: global-config: banner: false db-config: - id-type: + id-type: id_worker logic-delete-field: del_flag logic-delete-value: 2 logic-not-delete-value: 0 @@ -119,5 +119,19 @@ emqx: broker: "tcp://127.0.0.1:1883" clientid: "emqx_clientid" subscriptiontopic: "emqx_topic" - qos: 1 - pubTopic: "emqx_topic_return" \ No newline at end of file + qos: 2 + pubTopic: "emqx_topic_return" + +# 集疏运终端设备MQTT配置 +device: + mqtt: + broker: "tcp://127.0.0.1:1883" + username: "emqx_public" + password: "emqx_public" + clientId: "server_client_001" +logging: + level: + root: INFO + org.springframework.web: ERROR + file: + name: C:\EMQX\log\emqx.log \ No newline at end of file diff --git a/bs-loader/src/main/resources/log4j.properties b/bs-loader/src/main/resources/log4j.properties index f54478a..437947b 100644 --- a/bs-loader/src/main/resources/log4j.properties +++ b/bs-loader/src/main/resources/log4j.properties @@ -1,6 +1,6 @@ log4j.rootLogger=INFO, FILE log4j.appender.FILE=org.apache.log4j.FileAppender -log4j.appender.FILE.File=D:/EMQX/log/emqx_receive.log +log4j.appender.FILE.File=C:/EMQX/log/emqx_receive.log log4j.appender.FILE.layout=org.apache.log4j.PatternLayout log4j.appender.FILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n \ No newline at end of file diff --git a/bs-loader/src/main/resources/mybatis/emqx/DeviceDataMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/DeviceDataMapper.xml new file mode 100644 index 0000000..bc85ef9 --- /dev/null +++ b/bs-loader/src/main/resources/mybatis/emqx/DeviceDataMapper.xml @@ -0,0 +1,118 @@ + + + + + + + + + + insert into emqx_device_weight_rfid ( + id, + device_id, + seq_num, + send_time, + weight, + rfid, + flag, + create_time, + update_time) + values ( + #{id}, + #{deviceId}, + #{seqNum}, + #{sendTime}, + #{weight}, + #{rfid}, + #{flag}, + #{createTime}, + #{updateTime} + ) + + + + + + + + insert into emqx_device_alarm ( + id, + device_id, + seq_num, + send_time, + uwb, + flag, + create_time, + update_time) + values ( + #{id}, + #{deviceId}, + #{seqNum}, + #{sendTime}, + #{uwb}, + #{flag}, + #{createTime}, + #{updateTime} + ) + + + + + + + + insert into emqx_device_heart ( + id, + device_id, + sys_vol, + net_rssi, + send_time, + flag, + create_time, + update_time) + values ( + #{id}, + #{deviceId}, + #{sysVol}, + #{netRSSI}, + #{sendTime}, + #{flag}, + #{createTime}, + #{updateTime} + ) + + + + + insert into emqx_device_alarm_uwb ( + id, + alarm_id, + uwb_id, + dist, + create_time, + update_time) + values ( + #{id}, + #{alarmId}, + #{uwbId}, + #{dist}, + #{createTime}, + #{updateTime} + ) + + + diff --git a/bs-loader/src/main/resources/mybatis/emqx/DeviceParamWriteMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/DeviceParamWriteMapper.xml new file mode 100644 index 0000000..678af7d --- /dev/null +++ b/bs-loader/src/main/resources/mybatis/emqx/DeviceParamWriteMapper.xml @@ -0,0 +1,91 @@ + + + + + + insert into emqx_device_param_write ( + id, + device_id, + seq_num, + param_key, + param_value, + status, + fail_reason, + create_time, + update_time + ) values ( + #{id}, + #{deviceId}, + #{seqNum}, + #{paramKey}, + #{paramValue}, + #{status}, + #{failReason}, + #{createTime}, + #{updateTime} + ) + on duplicate key update + seq_num = #{seqNum}, + param_value = #{paramValue}, + status = #{status}, + fail_reason = #{failReason}, + update_time = #{updateTime} + + + + update emqx_device_param_write + set status = #{status}, + seq_num = #{seqNum}, + fail_reason = #{failReason}, + update_time = #{updateTime} + where device_id = #{deviceId} + and param_key = #{paramKey} + + + + + + + + + diff --git a/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml index dd4199f..0297aec 100644 --- a/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml +++ b/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml @@ -43,6 +43,7 @@ rfid_time, flag, tag, + load_car_id, create_time, update_time) values ( @@ -55,6 +56,7 @@ #{rfidTime}, #{flag}, #{tag}, + #{loadCarId}, #{createTime}, #{updateTime} ) diff --git a/bs-loader/src/main/resources/mybatis/emqx/OtaProgressMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/OtaProgressMapper.xml new file mode 100644 index 0000000..fcf155c --- /dev/null +++ b/bs-loader/src/main/resources/mybatis/emqx/OtaProgressMapper.xml @@ -0,0 +1,51 @@ + + + + + + insert into emqx_device_ota_progress ( + id, + device_id, + task_id, + progress, + status, + fail_reason, + create_time, + update_time + ) values ( + #{id}, + #{deviceId}, + #{taskId}, + #{progress}, + #{status}, + #{failReason}, + #{createTime}, + #{updateTime} + ) + + + + update emqx_device_ota_progress + set progress = #{progress}, + status = #{status}, + fail_reason = #{failReason}, + update_time = #{updateTime} + where device_id = #{deviceId} + and task_id = #{taskId} + + + + +