diff --git a/bs-loader/pom.xml b/bs-loader/pom.xml
index d4ad6b8..9d79550 100644
--- a/bs-loader/pom.xml
+++ b/bs-loader/pom.xml
@@ -108,8 +108,6 @@
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.5
- system
- ${project.basedir}/.mvn/wrapper/org.eclipse.paho.client.mqttv3-1.2.5.jar
diff --git a/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java b/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java
index d024364..23497ff 100644
--- a/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java
+++ b/bs-loader/src/main/java/com/data/emqx/EmqxTimer.java
@@ -1,3 +1,11 @@
+/*
+ * @Author: zxf 1532322479@qq.com
+ * @Date: 2026-03-16 16:59:08
+ * @LastEditors: zxf 1532322479@qq.com
+ * @LastEditTime: 2026-05-04 23:13:10
+ * @FilePath: \bs-jxc-test1\bs-jxc\bs-loader\src\main\java\com\data\emqx\EmqxTimer.java
+ * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
+ */
package com.data.emqx;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/bs-loader/src/main/java/com/data/emqx/MqttDeviceClient.java b/bs-loader/src/main/java/com/data/emqx/MqttDeviceClient.java
new file mode 100644
index 0000000..0b8be9f
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/MqttDeviceClient.java
@@ -0,0 +1,806 @@
+package com.data.emqx;
+
+/**
+ * @author licd
+ * @className MqttDeviceClient
+ * @description 集疏运终端设备MQTT协议接收客户端
+ * @date 2026/03/17
+ */
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.data.emqx.service.IDeviceDataService;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Component
+public class MqttDeviceClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(MqttDeviceClient.class);
+
+ // Broker配置
+ @Value("${device.mqtt.broker:tcp://127.0.0.1:1883}")
+ private String broker;
+
+ @Value("${device.mqtt.username:emqx_public}")
+ private String username;
+
+ @Value("${device.mqtt.password:emqx_public}")
+ private String password;
+
+ @Value("${device.mqtt.clientId:server_client_001}")
+ private String clientId;
+
+ // V2.0.1 Topic配置 - 动态层级结构
+ // 设备端发布(上行): WgtRfid/device/${did}
+ // 服务器下发(下行): WgtRfid/server/${did}
+ private static final String TOPIC_DEVICE_UPLINK = "WgtRfid/device/#";
+ private static final String TOPIC_SERVER_DOWNLINK_PREFIX = "WgtRfid/server/";
+
+ // 兼容旧版 Topic(V1.0.1)
+ private static final String TOPIC_RFID_WGT = "Topic_RFID_WGT";
+ private static final String TOPIC_HEART = "Topic_Heart";
+ private static final String TOPIC_ALARM = "Topic_ALARM";
+
+ // V2.0.1 QoS配置 - 统一为QoS 0
+ private static final int QOS = 0;
+
+ // V2.0.1 消息类型
+ private static final String MSG_TYPE_WEIGHT_RFID = "weight_rfid";
+ private static final String MSG_TYPE_ALARM = "alarm";
+ private static final String MSG_TYPE_HEART = "heart";
+ private static final String MSG_TYPE_OTA_PROGRESS = "ota_progress";
+ private static final String MSG_TYPE_PARAM_READ = "param_read";
+ private static final String MSG_TYPE_PARAM_WRITE = "param_write";
+
+ private MqttClient mqttClient;
+
+ // 异步ACK发送线程池(避免在messageArrived回调中直接调用publish导致死锁)
+ private final ExecutorService ackSenderExecutor = Executors.newFixedThreadPool(2);
+
+ @Autowired
+ private IDeviceDataService deviceDataService;
+
+ /**
+ * 初始化MQTT客户端连接
+ */
+ @PostConstruct
+ public void init() {
+ // 测试数据保存功能
+// testDataSave();
+ connect();
+
+ }
+
+ /**
+ * 测试数据保存功能
+ */
+ private void testDataSave() {
+ try {
+ logger.info("=== 开始测试数据保存功能 ===");
+
+ // 测试保存称重&标签数据
+ if (deviceDataService != null) {
+ String deviceId = "test_device_001";
+ Long seqNum = 123456789L;
+ Long sendTime = System.currentTimeMillis() / 1000;
+ Integer weight = 1000;
+ String rfid = "e280111122223333,b001111122223333";
+
+ deviceDataService.saveWeightRfidData(deviceId, seqNum, sendTime, weight, rfid);
+ logger.info("称重&标签数据保存测试成功");
+
+ // 测试保存告警数据
+ Long alarmSeqNum = 987654321L;
+ String uwb = "ID:1234,dist:2000; ID:5678,dist:1500";
+ com.alibaba.fastjson.JSONArray uwbArray = new com.alibaba.fastjson.JSONArray();
+ com.alibaba.fastjson.JSONObject uwbObj1 = new com.alibaba.fastjson.JSONObject();
+ uwbObj1.put("ID", "1234");
+ uwbObj1.put("Dist", "2000 cm");
+ uwbArray.add(uwbObj1);
+ com.alibaba.fastjson.JSONObject uwbObj2 = new com.alibaba.fastjson.JSONObject();
+ uwbObj2.put("ID", "5678");
+ uwbObj2.put("Dist", "1500 cm");
+ uwbArray.add(uwbObj2);
+
+ deviceDataService.saveAlarmData(deviceId, alarmSeqNum, sendTime, uwb, uwbArray);
+ logger.info("告警数据保存测试成功");
+
+ // 测试保存心跳数据
+ Double sysVol = 12.8;
+ Integer netRSSI = -75;
+
+ deviceDataService.saveHeartData(deviceId, sysVol, netRSSI, sendTime);
+ logger.info("心跳数据保存测试成功");
+
+ // V2.0.1 测试保存OTA升级进度数据
+ String taskID = "ota_task_001";
+ Integer progress = 50;
+ String otaStatus = "InProgress";
+ deviceDataService.saveOtaProgressData(deviceId, taskID, progress, otaStatus);
+ logger.info("OTA升级进度数据保存测试成功");
+
+ // V2.0.1 测试保存设备参数写入响应
+ Long paramSeqNum = 111222333L;
+ String paramStatus = "OK";
+ String failReason = null;
+ deviceDataService.saveParamWriteData(deviceId, paramSeqNum, paramStatus, failReason);
+ logger.info("设备参数写入响应保存测试成功");
+
+ // V2.0.1 测试保存设备参数期望值(用于设备上线后自动拉平配置)
+ // 需要通过 DeviceDataServiceImpl 直接调用
+ if (deviceDataService instanceof com.data.emqx.service.impl.DeviceDataServiceImpl) {
+ com.data.emqx.service.impl.DeviceDataServiceImpl impl =
+ (com.data.emqx.service.impl.DeviceDataServiceImpl) deviceDataService;
+
+ // 保存几个设备参数期望值
+ impl.saveParamExpectation(deviceId, "Rs485_PollMs", "1000");
+ impl.saveParamExpectation(deviceId, "UWB_AlarmMs", "5000");
+ impl.saveParamExpectation(deviceId, "VoiceAlarmMs", "3000");
+ impl.saveParamExpectation(deviceId, "UWB_IgnoreCm", "50");
+ impl.saveParamExpectation(deviceId, "UWB_AlarmCm", "200");
+ logger.info("设备参数期望值保存测试成功");
+ }
+
+ } else {
+ logger.error("deviceDataService 为 null,无法测试数据保存功能");
+ }
+
+ logger.info("=== 数据保存功能测试完成 ===");
+
+ // V2.0.1 测试设备上线参数同步功能
+ testDeviceParamSync("test_device_001");
+ } catch (Exception e) {
+ logger.error("测试数据保存功能异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 连接MQTT服务器
+ */
+ public void connect() {
+ try {
+ // 创建MQTT客户端
+ mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
+
+ // 配置连接选项
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setUserName(username);
+ connOpts.setPassword(password.toCharArray());
+ connOpts.setCleanSession(true);
+ connOpts.setAutomaticReconnect(true);
+ connOpts.setConnectionTimeout(30);
+ connOpts.setKeepAliveInterval(60);
+ connOpts.setMaxInflight(100);
+
+ // 设置断线重连回调
+ mqttClient.setCallback(new MqttCallbackExtended() {
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ if (reconnect) {
+ logger.info("MQTT连接已自动重连到: {}", serverURI);
+ subscribeTopics();
+ } else {
+ logger.info("MQTT连接已建立: {}", serverURI);
+ }
+ }
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ logger.error("MQTT连接断开: {}", cause.getMessage(), cause);
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ processMessage(topic, message);
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ logger.debug("消息发送完成");
+ }
+ });
+
+ // 建立连接
+ logger.info("正在连接MQTT Broker: {}", broker);
+ mqttClient.connect(connOpts);
+
+ // 订阅Topic
+ subscribeTopics();
+
+ logger.info("MQTT客户端初始化完成");
+
+ } catch (MqttException e) {
+ logger.error("MQTT连接失败: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 订阅Topic(V2.0.1 动态层级结构)
+ * 设备端发布到: WgtRfid/device/${did}
+ */
+ private void subscribeTopics() {
+ try {
+ // V2.0.1 使用通配符订阅所有设备的动态Topic
+ IMqttToken token = mqttClient.subscribeWithResponse(TOPIC_DEVICE_UPLINK, QOS);
+ token.waitForCompletion();
+ if (token.isComplete()) {
+ logger.info("订阅成功 - Topic: {}, QoS: {} (V2.0.1动态Topic)", TOPIC_DEVICE_UPLINK, QOS);
+ } else {
+ logger.error("订阅失败 - Topic: {}", TOPIC_DEVICE_UPLINK);
+ }
+
+ // 兼容旧版 Topic(V1.0.1)
+ subscribeTopic(TOPIC_RFID_WGT);
+ subscribeTopic(TOPIC_HEART);
+ subscribeTopic(TOPIC_ALARM);
+
+ } catch (MqttException e) {
+ logger.error("订阅Topic异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 订阅单个Topic
+ */
+ private void subscribeTopic(String topic) {
+ try {
+ IMqttToken token = mqttClient.subscribeWithResponse(topic, QOS);
+ token.waitForCompletion();
+ if (token.isComplete()) {
+ logger.info("订阅成功 - Topic: {}, QoS: {} (旧版兼容)", topic, QOS);
+ } else {
+ logger.error("订阅失败 - Topic: {}", topic);
+ }
+ } catch (MqttException e) {
+ logger.error("订阅Topic异常: {}, Error: {}", topic, e.getMessage());
+ }
+ }
+
+ /**
+ * 处理接收到的消息(兼容V1.0.1和V2.0.1)
+ * V2.0.1 Topic格式: WgtRfid/device/${did} - deviceId从Topic路径提取
+ * V1.0.1 Topic格式: Topic_RFID_WGT/Topic_HEART/Topic_ALARM - deviceId从JSON提取
+ */
+ private void processMessage(String topic, MqttMessage message) {
+ // 调试:确认消息被接收
+ String debugMsg = "===== processMessage 方法被调用 ===== Topic: " + topic + ", QoS: " + message.getQos();
+ logger.info(debugMsg);
+ System.out.println(debugMsg);
+
+ String payload = new String(message.getPayload());
+ logger.info("收到消息 - Topic: {}, Payload: {}", topic, payload);
+
+ try {
+ // 解析JSON(先解析,因为旧版需要从JSON提取deviceId)
+ JSONObject jsonObject = JSON.parseObject(payload);
+
+ // V2.0.1: deviceId从Topic路径提取
+ // V1.0.1: deviceId从JSON提取(兼容旧版)
+ String deviceId = extractDeviceIdFromTopic(topic);
+ boolean isOldProtocol = false;
+
+ // 如果从Topic无法提取deviceId,尝试从JSON提取(旧版兼容)
+ if (deviceId == null || deviceId.isEmpty()) {
+ deviceId = jsonObject.getString("deviceID");
+ isOldProtocol = true;
+ }
+
+ // 调试:显示提取的deviceId
+ String deviceIdMsg = "从" + (isOldProtocol ? "JSON" : "Topic") + "提取的DeviceID: " + (deviceId != null ? deviceId : "null") + " (协议版本: " + (isOldProtocol ? "V1.0.1" : "V2.0.1") + ")";
+ logger.info(deviceIdMsg);
+ System.out.println(deviceIdMsg);
+
+ if (deviceId == null || deviceId.isEmpty()) {
+ logger.error("无法从Topic或JSON提取deviceId: {}", topic);
+ return;
+ }
+
+ // 获取消息类型
+ String messageType = jsonObject.getString("messageType");
+
+ // 旧版兼容:如果没有messageType字段,根据Topic推断消息类型
+ if (messageType == null || messageType.isEmpty()) {
+ messageType = inferMessageTypeFromTopic(topic);
+ }
+
+ if (messageType == null) {
+ logger.error("消息缺少必要字段: messageType");
+ return;
+ }
+
+ // 根据messageType处理不同类型的消息(传递协议版本)
+ switch (messageType) {
+ case MSG_TYPE_WEIGHT_RFID:
+ processWeightRfidData(deviceId, jsonObject, isOldProtocol);
+ break;
+ case MSG_TYPE_ALARM:
+ processAlarmData(deviceId, jsonObject, isOldProtocol);
+ break;
+ case MSG_TYPE_HEART:
+ processHeartData(deviceId, jsonObject);
+ break;
+ case MSG_TYPE_OTA_PROGRESS:
+ processOtaProgressData(deviceId, jsonObject);
+ break;
+ case MSG_TYPE_PARAM_READ:
+ processParamReadData(deviceId, jsonObject);
+ break;
+ case MSG_TYPE_PARAM_WRITE:
+ processParamWriteData(deviceId, jsonObject);
+ break;
+ default:
+ logger.warn("未知的消息类型: {}", messageType);
+ }
+
+ } catch (Exception e) {
+ logger.error("消息处理异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 从Topic路径提取deviceId
+ * V2.0.1 Topic格式: WgtRfid/device/${did}
+ */
+ private String extractDeviceIdFromTopic(String topic) {
+ if (topic == null || topic.isEmpty()) {
+ return null;
+ }
+ String prefix = "WgtRfid/device/";
+ if (topic.startsWith(prefix)) {
+ return topic.substring(prefix.length());
+ }
+ return null;
+ }
+
+ /**
+ * 从Topic推断消息类型(旧版兼容)
+ * V1.0.1 的Topic直接表示消息类型
+ */
+ private String inferMessageTypeFromTopic(String topic) {
+ if (topic == null || topic.isEmpty()) {
+ return null;
+ }
+ switch (topic) {
+ case TOPIC_RFID_WGT:
+ return MSG_TYPE_WEIGHT_RFID;
+ case TOPIC_HEART:
+ return MSG_TYPE_HEART;
+ case TOPIC_ALARM:
+ return MSG_TYPE_ALARM;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * 处理称重&标签数据
+ */
+ private void processWeightRfidData(String deviceId, JSONObject jsonObject, boolean isOldProtocol) {
+ try {
+ // 调试:确认方法被调用
+ String debugMsg = "===== processWeightRfidData 方法被调用 ===== DeviceID: " + deviceId + ", Payload: " + jsonObject.toJSONString() + ", Protocol: " + (isOldProtocol ? "V1.0.1" : "V2.0.1");
+ logger.info(debugMsg);
+ System.out.println(debugMsg);
+
+ logger.info("处理称重&标签数据 - DeviceID: {}", deviceId);
+
+ Long seqNum = jsonObject.getLong("seqNum");
+ if (seqNum == null) {
+ logger.error("非法消息:seqNum 缺失,丢弃消息且不发送ACK。Payload: {}", jsonObject.toJSONString());
+ return;
+ }
+
+ // 检查是否已存在相同数据
+ if (deviceDataService != null && deviceDataService.existsWeightRfidData(deviceId, seqNum)) {
+ logger.info("称重数据已存在,直接返回ACK - DeviceID: {}, SeqNum: {}", deviceId, seqNum);
+ // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁)
+ final Long finalSeqNum1 = seqNum;
+ final boolean finalIsOldProtocol1 = isOldProtocol;
+ ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum1, "weight_rfid", finalIsOldProtocol1));
+ return;
+ }
+
+ Long sendTime = jsonObject.getLong("sendTime");
+ Integer weight = jsonObject.getInteger("weight");
+ if (weight == null) {
+ logger.warn("警告:weight 字段缺失,使用默认值0。DeviceID: {}", deviceId);
+ weight = 0;
+ }
+
+ String rfidStr = null;
+ JSONArray rfidArray = jsonObject.getJSONArray("RFID");
+ if (rfidArray != null && !rfidArray.isEmpty()) {
+ List rfidList = rfidArray.toJavaList(String.class);
+ rfidStr = String.join(",", rfidList);
+ } else {
+ rfidStr = "";
+ }
+
+ logger.info("称重数据 - SeqNum: {}, Weight: {}, RFID: {}", seqNum, weight, rfidStr);
+
+ if (deviceDataService != null) {
+ deviceDataService.saveWeightRfidData(deviceId, seqNum, sendTime, weight, rfidStr);
+ }
+
+ // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁)
+ final Long finalSeqNum2 = seqNum;
+ final boolean finalIsOldProtocol2 = isOldProtocol;
+ ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum2, "weight_rfid", finalIsOldProtocol2));
+
+ } catch (Exception e) {
+ logger.error("处理称重&标签数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 处理告警数据
+ */
+ private void processAlarmData(String deviceId, JSONObject jsonObject, boolean isOldProtocol) {
+ try {
+ logger.info("处理告警数据 - DeviceID: {}", deviceId);
+
+ Long seqNum = jsonObject.getLong("seqNum");
+ if (seqNum == null) {
+ logger.error("非法消息:seqNum 缺失,丢弃消息且不发送ACK。Payload: {}", jsonObject.toJSONString());
+ return;
+ }
+
+ // 检查是否已存在相同数据
+ if (deviceDataService != null && deviceDataService.existsAlarmData(deviceId, seqNum)) {
+ logger.info("告警数据已存在,直接返回ACK - DeviceID: {}, SeqNum: {}", deviceId, seqNum);
+ // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁)
+ final Long finalSeqNum3 = seqNum;
+ final boolean finalIsOldProtocol3 = isOldProtocol;
+ ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum3, "alarm", finalIsOldProtocol3));
+ return;
+ }
+
+ Long sendTime = jsonObject.getLong("sendTime");
+
+ StringBuilder uwbInfo = new StringBuilder();
+ JSONArray uwbArray = jsonObject.getJSONArray("UWB");
+ if (uwbArray != null && !uwbArray.isEmpty()) {
+ for (int i = 0; i < uwbArray.size(); i++) {
+ JSONObject uwbObj = uwbArray.getJSONObject(i);
+ if (uwbObj == null) continue;
+
+ Integer id = uwbObj.getInteger("ID");
+ Integer dist = uwbObj.getInteger("dist");
+
+ if (id == null || dist == null) {
+ logger.warn("UWB子项数据不完整,跳过该项。Item: {}", uwbObj.toJSONString());
+ continue;
+ }
+
+ if (uwbInfo.length() > 0) {
+ uwbInfo.append("; ");
+ }
+ uwbInfo.append("ID:").append(id).append(",dist:").append(dist);
+ }
+ }
+
+ logger.info("告警数据 - SeqNum: {}, SendTime: {}, UWB: {}", seqNum, sendTime, uwbInfo);
+
+ if (deviceDataService != null) {
+ deviceDataService.saveAlarmData(deviceId, seqNum, sendTime, uwbInfo.toString(), uwbArray);
+ }
+
+ // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁)
+ final Long finalSeqNum4 = seqNum;
+ final boolean finalIsOldProtocol4 = isOldProtocol;
+ ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum4, "alarm", finalIsOldProtocol4));
+
+ } catch (Exception e) {
+ logger.error("处理告警数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 处理心跳数据
+ */
+ private void processHeartData(String deviceId, JSONObject jsonObject) {
+ try {
+ logger.info("处理心跳数据 - DeviceID: {}", deviceId);
+
+ Double sysVol = jsonObject.getDouble("sysVol");
+ Integer netRSSI = jsonObject.getInteger("netRSSI");
+ Long sendTime = jsonObject.getLong("sendTime");
+
+ // 检查是否已存在相同数据
+ if (deviceDataService != null && deviceDataService.existsHeartData(deviceId, sendTime)) {
+ logger.info("心跳数据已存在,直接返回ACK - DeviceID: {}, SendTime: {}", deviceId, sendTime);
+ // 心跳数据没有seqNum,不需要发送ACK
+ return;
+ }
+
+ logger.info("心跳数据 - sysVol: {}, netRSSI: {}, sendTime: {}",
+ sysVol, netRSSI, sendTime);
+
+ if (deviceDataService != null) {
+ deviceDataService.saveHeartData(deviceId, sysVol, netRSSI, sendTime);
+ }
+
+ } catch (Exception e) {
+ logger.error("处理心跳数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 处理OTA升级进度数据
+ * 设备上报格式:{"messageType": "ota_progress", "taskID": "xxx", "progress": 0-100, "status": "OK/Fail"}
+ */
+ private void processOtaProgressData(String deviceId, JSONObject jsonObject) {
+ try {
+ logger.info("处理OTA升级进度数据 - DeviceID: {}", deviceId);
+
+ String taskID = jsonObject.getString("taskID");
+ Integer progress = jsonObject.getInteger("progress");
+ String status = jsonObject.getString("status");
+
+ logger.info("OTA进度数据 - DeviceID: {}, TaskID: {}, Progress: {}%, Status: {}",
+ deviceId, taskID, progress, status);
+
+ if (deviceDataService != null) {
+ deviceDataService.saveOtaProgressData(deviceId, taskID, progress, status);
+ }
+
+ } catch (Exception e) {
+ logger.error("处理OTA升级进度数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 处理设备参数读取响应数据
+ * 设备上报格式:{"messageType": "param_read", "seqNum": xxx, "params": {"Rs485_PollMs": xxx, ...}}
+ */
+ private void processParamReadData(String deviceId, JSONObject jsonObject) {
+ try {
+ logger.info("处理设备参数读取响应 - DeviceID: {}", deviceId);
+
+ Long seqNum = jsonObject.getLong("seqNum");
+ JSONObject params = jsonObject.getJSONObject("params");
+
+ logger.info("参数读取响应 - DeviceID: {}, SeqNum: {}, Params: {}",
+ deviceId, seqNum, params != null ? params.toJSONString() : "null");
+
+ if (deviceDataService != null && params != null) {
+ deviceDataService.saveParamReadData(deviceId, seqNum, params);
+ }
+
+ if (seqNum != null) {
+ // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁)
+ final Long finalSeqNum5 = seqNum;
+ ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum5, MSG_TYPE_PARAM_READ));
+ }
+
+ } catch (Exception e) {
+ logger.error("处理设备参数读取响应异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 处理设备参数写入响应数据
+ * 设备上报格式:{"messageType": "param_write", "seqNum": xxx, "status": "OK/Fail", "failReason": "xxx"}
+ */
+ private void processParamWriteData(String deviceId, JSONObject jsonObject) {
+ try {
+ logger.info("处理设备参数写入响应 - DeviceID: {}", deviceId);
+
+ Long seqNum = jsonObject.getLong("seqNum");
+ String status = jsonObject.getString("status");
+ String failReason = jsonObject.getString("failReason");
+
+ logger.info("参数写入响应 - DeviceID: {}, SeqNum: {}, Status: {}, FailReason: {}",
+ deviceId, seqNum, status, failReason);
+
+ if (deviceDataService != null) {
+ deviceDataService.saveParamWriteData(deviceId, seqNum, status, failReason);
+ }
+
+ if (seqNum != null) {
+ // 异步发送ACK(避免在messageArrived回调中直接调用publish导致死锁)
+ final Long finalSeqNum6 = seqNum;
+ ackSenderExecutor.execute(() -> sendAck(deviceId, finalSeqNum6, MSG_TYPE_PARAM_WRITE));
+ }
+
+ } catch (Exception e) {
+ logger.error("处理设备参数写入响应异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 发送ACK回复(兼容V1.0.1和V2.0.1协议)
+ * ACK格式:{"messageType": "xxx", "seqNum": xxx, "ackTime": UTC0时间戳(Uint32)}
+ * V2.0.1: 下行Topic格式: WgtRfid/server/${deviceId}
+ * V1.0.1: 下行Topic格式: ${deviceId}(直接使用设备ID)
+ */
+ private void sendAck(String deviceId, Long seqNum, String messageType) {
+ // 默认使用V2.0.1协议
+ sendAck(deviceId, seqNum, messageType, false);
+ }
+
+ /**
+ * 发送ACK回复(兼容V1.0.1和V2.0.1协议)
+ * @param deviceId 设备ID
+ * @param seqNum 序列号
+ * @param messageType 消息类型
+ * @param isOldProtocol 是否为旧版协议(V1.0.1)
+ */
+ private void sendAck(String deviceId, Long seqNum, String messageType, boolean isOldProtocol) {
+ String startMsg = "开始发送ACK - DeviceID: " + deviceId + ", Type: " + messageType + ", SeqNum: " + seqNum + ", Protocol: " + (isOldProtocol ? "V1.0.1" : "V2.0.1");
+ logger.info(startMsg);
+ System.out.println(startMsg);
+
+ if (seqNum == null) {
+ String errorMsg = "尝试发送ACK但seqNum为空,拒绝发送!DeviceID: " + deviceId;
+ logger.error(errorMsg);
+ System.out.println(errorMsg);
+ return;
+ }
+
+ if (mqttClient == null) {
+ String errorMsg = "MQTT客户端未初始化,无法发送ACK!DeviceID: " + deviceId;
+ logger.error(errorMsg);
+ System.out.println(errorMsg);
+ return;
+ }
+
+ if (!mqttClient.isConnected()) {
+ String errorMsg = "MQTT客户端未连接,无法发送ACK!DeviceID: " + deviceId;
+ logger.error(errorMsg);
+ System.out.println(errorMsg);
+ return;
+ }
+
+ try {
+ JSONObject ackJson = new JSONObject();
+ ackJson.put("messageType", messageType);
+ ackJson.put("seqNum", seqNum);
+
+ long utcSeconds = System.currentTimeMillis() / 1000;
+ ackJson.put("ackTime", utcSeconds);
+
+ String ackPayload = ackJson.toJSONString();
+
+ // 确定下行Topic
+ String downlinkTopic;
+ if (isOldProtocol) {
+ // V1.0.1: 直接使用deviceId作为Topic
+ downlinkTopic = deviceId;
+ } else {
+ // V2.0.1: 使用动态下行Topic
+ downlinkTopic = TOPIC_SERVER_DOWNLINK_PREFIX + deviceId;
+ }
+
+ String prepareMsg = "准备发送ACK消息 - Topic: " + downlinkTopic + ", Payload: " + ackPayload;
+ logger.info(prepareMsg);
+ System.out.println(prepareMsg);
+
+ MqttMessage ackMessage = new MqttMessage(ackPayload.getBytes());
+ ackMessage.setQos(QOS);
+ String createMsg = "ACK消息创建成功 - QoS: " + QOS;
+ logger.info(createMsg);
+ System.out.println(createMsg);
+
+ mqttClient.publish(downlinkTopic, ackMessage);
+ String successMsg = "ACK已成功发送 - DeviceID: " + deviceId + ", Type: " + messageType + ", SeqNum: " + seqNum + ", ackTime: " + utcSeconds + ", Topic: " + downlinkTopic;
+ logger.info(successMsg);
+ System.out.println(successMsg);
+
+ } catch (MqttException e) {
+ String errorMsg = "发送ACK失败 - DeviceID: " + deviceId + ", SeqNum: " + seqNum + ", Error: " + e.getMessage();
+ logger.error(errorMsg, e);
+ System.out.println(errorMsg);
+ } catch (Exception e) {
+ String errorMsg = "发送ACK时发生未知异常 - DeviceID: " + deviceId + ", SeqNum: " + seqNum + ", Error: " + e.getMessage();
+ logger.error(errorMsg, e);
+ System.out.println(errorMsg);
+ }
+ }
+
+ /**
+ * 断开连接
+ */
+ public void disconnect() {
+ try {
+ if (mqttClient != null && mqttClient.isConnected()) {
+ mqttClient.disconnect();
+ mqttClient.close();
+ logger.info("MQTT连接已断开");
+ }
+ } catch (MqttException e) {
+ logger.error("断开MQTT连接异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 设备上线参数同步测试方法
+ * 模拟设备上线后,服务器查询待同步的参数并发送给设备
+ * 这是"设备期望值"功能的核心使用场景
+ */
+ private void testDeviceParamSync(String deviceId) {
+ try {
+ logger.info("=== 开始设备上线参数同步测试 ===");
+ logger.info("设备ID: {}", deviceId);
+
+ // 查询设备待同步的参数(状态为Pending的参数)
+ if (deviceDataService instanceof com.data.emqx.service.impl.DeviceDataServiceImpl) {
+ com.data.emqx.service.impl.DeviceDataServiceImpl impl =
+ (com.data.emqx.service.impl.DeviceDataServiceImpl) deviceDataService;
+
+ // 获取 DeviceParamWriteMapper
+ // 这里为了测试,我们直接模拟查询待同步参数
+ logger.info("查询设备待同步的参数...");
+
+ // 模拟查询结果(实际应该调用 deviceParamWriteMapper.selectPendingParamsByDeviceId)
+ // 这里打印一些测试信息
+ logger.info("设备 {} 的待同步参数列表:", deviceId);
+ logger.info(" - Rs485_PollMs: 1000 (Pending)");
+ logger.info(" - UWB_AlarmMs: 5000 (Pending)");
+ logger.info(" - VoiceAlarmMs: 3000 (Pending)");
+ logger.info(" - UWB_IgnoreCm: 50 (Pending)");
+ logger.info(" - UWB_AlarmCm: 200 (Pending)");
+
+ // 模拟发送参数同步指令到设备
+ sendParamSyncCommand(deviceId);
+
+ logger.info("=== 设备上线参数同步测试完成 ===");
+ }
+
+ } catch (Exception e) {
+ logger.error("设备上线参数同步测试异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 发送参数同步指令到设备
+ * V2.0.1: 下行Topic格式: WgtRfid/server/${deviceId}
+ */
+ private void sendParamSyncCommand(String deviceId) {
+ try {
+ if (mqttClient == null || !mqttClient.isConnected()) {
+ logger.warn("MQTT客户端未连接,无法发送参数同步指令");
+ return;
+ }
+
+ // 构建参数同步指令
+ JSONObject paramSyncJson = new JSONObject();
+ paramSyncJson.put("messageType", "param_write");
+ paramSyncJson.put("seqNum", System.currentTimeMillis());
+
+ // 参数列表
+ JSONObject params = new JSONObject();
+ params.put("Rs485_PollMs", 1000);
+ params.put("UWB_AlarmMs", 5000);
+ params.put("VoiceAlarmMs", 3000);
+ params.put("UWB_IgnoreCm", 50);
+ params.put("UWB_AlarmCm", 200);
+ paramSyncJson.put("params", params);
+
+ String payload = paramSyncJson.toJSONString();
+ String downlinkTopic = TOPIC_SERVER_DOWNLINK_PREFIX + deviceId;
+
+ MqttMessage message = new MqttMessage(payload.getBytes());
+ message.setQos(QOS);
+
+ mqttClient.publish(downlinkTopic, message);
+ logger.info("参数同步指令已发送 - DeviceID: {}, Topic: {}, Payload: {}",
+ deviceId, downlinkTopic, payload);
+
+ } catch (Exception e) {
+ logger.error("发送参数同步指令异常: {}", e.getMessage(), e);
+ }
+ }
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java b/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java
index 3563afb..8b751a8 100644
--- a/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java
+++ b/bs-loader/src/main/java/com/data/emqx/MqttSubscribeSample.java
@@ -7,8 +7,9 @@ package com.data.emqx;
* @date 2024/12/16 14:37
*/
+import com.alibaba.fastjson.JSON;
+import com.data.emqx.domain.LoadCarDataSoure;
import com.data.emqx.service.ILoaderCarDataSourceService;
-import lombok.Synchronized;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,7 +18,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
-import java.util.Date;
@Component
public class MqttSubscribeSample {
@@ -37,8 +37,6 @@ public class MqttSubscribeSample {
@Value("${emqx.qos}")
private String qos;
- @Value("${emqx.pubTopic}")
- private String pubTopic;
@Autowired
private ILoaderCarDataSourceService loaderCarDataSourceService;
@@ -46,25 +44,24 @@ public class MqttSubscribeSample {
public void getEmqsData() {
- try {
+ try {
MqttClient sampleClient = new MqttClient(broker, clientId);
- // MQTT 连接选项
+ // MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(USERNAME);
connOpts.setPassword(PASSWORD.toCharArray());
- // 保留会话
- connOpts.setCleanSession(true);
+ // 保留会话
+ connOpts.setCleanSession(false);
+ // 自动连接
+ connOpts.setAutomaticReconnect(true);
+ connOpts.setConnectionTimeout(10);
+ connOpts.setKeepAliveInterval(20);
// 建立连接
sampleClient.connect(connOpts);
- // 订阅主题
+ // 订阅主题
sampleClient.subscribe(subscriptionTopic);
- /*try{
- loaderCarDataSourceService.addLoaderCarDataSource(null);
- }catch (Exception e){
- logger.error("消息接收失败:"+e.getMessage());
- }*/
- // 设置回调
+ // 设置回调
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
@@ -74,16 +71,22 @@ public class MqttSubscribeSample {
@Override
public void messageArrived(String topic, MqttMessage message) {
+ String pubTopic="";
+ String json = new String(message.getPayload());
try {
// 接收到消息的回调
+ logger.info("emqx接收到的数据:" + json);
+ LoadCarDataSoure loadCarDataSoure = JSON.parseObject(json, LoadCarDataSoure.class);
+ pubTopic = loadCarDataSoure.getDeviceId();
loaderCarDataSourceService.addLoaderCarDataSource(message.getPayload());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 消息发布所需参数
MqttMessage message1 = new MqttMessage(("received success,received time:"+sdf.format(System.currentTimeMillis())).getBytes());
- message.setQos(Integer.valueOf(qos));
+ message.setQos(0);
// 发布消息
sampleClient.publish(pubTopic, message1);
}catch (Exception e){
+ logger.error("emqx接收到的数据:" + json);
logger.error("消息接收异常:"+e.getMessage());
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -114,85 +117,13 @@ public class MqttSubscribeSample {
} catch (MqttException me) {
- logger.error("reason " + me.getReasonCode());
- logger.error("msg " + me.getMessage());
- logger.error("loc " + me.getLocalizedMessage());
- logger.error("cause " + me.getCause());
- logger.error("excep " + me);
+ logger.error("reason " + me.getReasonCode());
+ logger.error("msg " + me.getMessage());
+ logger.error("loc " + me.getLocalizedMessage());
+ logger.error("cause " + me.getCause());
+ logger.error("excep " + me);
me.printStackTrace();
}
-
-
-
-
-
- /* String HOST = "tcp://124.71.134.146:1883";
- String TOPIC = "emqx_topic";
- int qos =2;
- String clientid = "emqx_clientid";
- String userName = "test";
- String passWord = "test";
- try {
- // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
- MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
-
- // MQTT的连接设置
- MqttConnectOptions options = new MqttConnectOptions();
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
- options.setCleanSession(true);
- // 设置连接的用户名
- options.setUserName(userName);
- // 设置连接的密码
- options.setPassword(passWord.toCharArray());
- // 设置超时时间 单位为秒
- options.setConnectionTimeout(10);
- // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- options.setKeepAliveInterval(20);
- // 自动重连
- options.setAutomaticReconnect(true);
-
- // 设置回调函数
- client.setCallback(new MqttCallback() {
-
- public void connectionLost(Throwable cause) {
- // 连接丢失后,一般在这里面进行重连
- System.out.println("连接断开,可以做重连");
- }
-
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- *//**
- * 订阅到消息后的回调
- * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
- * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoSl,QoS2且客户端未进行ack确认的消息都将由
- * broker服务器再次发送到客户端
- *//*
- System.out.println("messageId:" +message.getId());
- System.out.println("接收消息主题:" + topic);
- System.out.println("接收消息Qos:" + message.getQos());
- System.out.println("接收消息内容:" + new String(message.getPayload()));
- System.out.println();
- }
-
- public void deliveryComplete(IMqttDeliveryToken token) {
- *//**
- * 消息发布完成且收到ack确认后的回调
- * QoS0:消息被网络发出后触发一次
- * QoS1:当收到broker的PUBACK消息后触发
- * QoS2:当收到broer的PUBCOMP消息后触发
- *//*
- System.out.println("deliveryComplete---------"+ token.isComplete());
- }
-
- });
-
- // 建立连接
- client.connect(options);
-
- //订阅消息
- client.subscribe(TOPIC, qos);
- } catch (Exception e) {
- e.printStackTrace();
- }*/
}
}
diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarm.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarm.java
new file mode 100644
index 0000000..23a02c2
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarm.java
@@ -0,0 +1,36 @@
+package com.data.emqx.domain;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @author licd
+ * @className DeviceAlarm
+ * @description 设备告警数据
+ * @date 2026/03/23
+ */
+@Data
+public class DeviceAlarm implements Serializable {
+
+ //主键
+ private String id;
+ //设备ID号
+ private String deviceId;
+ //序列号
+ private Long seqNum;
+ //发送时间(UTC0时间戳)
+ private Long sendTime;
+ //UWB异常数据
+ private String uwb;
+ //是否已处理:1:已处理
+ private int flag;
+ //创建时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+ //更新时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date updateTime;
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarmUwb.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarmUwb.java
new file mode 100644
index 0000000..1b023bb
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceAlarmUwb.java
@@ -0,0 +1,40 @@
+/*
+ * @Author: zxf 1532322479@qq.com
+ * @Date: 2026-03-24 10:42:27
+ * @LastEditors: zxf 1532322479@qq.com
+ * @LastEditTime: 2026-03-24 10:51:18
+ * @FilePath: \bs-jxc-test1\bs-jxc\bs-loader\src\main\java\com\data\emqx\domain\DeviceAlarmUwb.java
+ * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
+ */
+package com.data.emqx.domain;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @author licd
+ * @className DeviceAlarmUwb
+ * @description 设备告警UWB数据
+ * @date 2026/03/24
+ */
+@Data
+public class DeviceAlarmUwb implements Serializable {
+
+ //主键
+ private String id;
+ //告警ID(关联emqx_device_alarm表的id)
+ private String alarmId;
+ //UWB节点ID
+ private String uwbId;
+ //距离
+ private String dist;
+ //创建时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+ //更新时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date updateTime;
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceHeart.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceHeart.java
new file mode 100644
index 0000000..dc2340a
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceHeart.java
@@ -0,0 +1,36 @@
+package com.data.emqx.domain;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @author licd
+ * @className DeviceHeart
+ * @description 设备心跳数据
+ * @date 2026/03/23
+ */
+@Data
+public class DeviceHeart implements Serializable {
+
+ //主键
+ private String id;
+ //设备ID号
+ private String deviceId;
+ //系统电压
+ private Double sysVol;
+ //网络信号强度
+ private Integer netRSSI;
+ //发送时间(UTC0时间戳)
+ private Long sendTime;
+ //是否已处理:1:已处理
+ private int flag;
+ //创建时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+ //更新时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date updateTime;
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceParamWrite.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceParamWrite.java
new file mode 100644
index 0000000..0b1535a
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceParamWrite.java
@@ -0,0 +1,88 @@
+package com.data.emqx.domain;
+
+import java.util.Date;
+
+public class DeviceParamWrite {
+
+ private String id;
+ private String deviceId;
+ private Long seqNum;
+ private String paramKey;
+ private String paramValue;
+ private String status;
+ private String failReason;
+ private Date createTime;
+ private Date updateTime;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getDeviceId() {
+ return deviceId;
+ }
+
+ public void setDeviceId(String deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ public Long getSeqNum() {
+ return seqNum;
+ }
+
+ public void setSeqNum(Long seqNum) {
+ this.seqNum = seqNum;
+ }
+
+ public String getParamKey() {
+ return paramKey;
+ }
+
+ public void setParamKey(String paramKey) {
+ this.paramKey = paramKey;
+ }
+
+ public String getParamValue() {
+ return paramValue;
+ }
+
+ public void setParamValue(String paramValue) {
+ this.paramValue = paramValue;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getFailReason() {
+ return failReason;
+ }
+
+ public void setFailReason(String failReason) {
+ this.failReason = failReason;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Date getUpdateTime() {
+ return updateTime;
+ }
+
+ public void setUpdateTime(Date updateTime) {
+ this.updateTime = updateTime;
+ }
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/domain/DeviceWeightRfid.java b/bs-loader/src/main/java/com/data/emqx/domain/DeviceWeightRfid.java
new file mode 100644
index 0000000..44b5820
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/domain/DeviceWeightRfid.java
@@ -0,0 +1,38 @@
+package com.data.emqx.domain;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @author licd
+ * @className DeviceWeightRfid
+ * @description 设备称重&标签数据
+ * @date 2026/03/23
+ */
+@Data
+public class DeviceWeightRfid implements Serializable {
+
+ //主键
+ private String id;
+ //设备ID号
+ private String deviceId;
+ //序列号
+ private Long seqNum;
+ //发送时间(UTC0时间戳)
+ private Long sendTime;
+ //重量(kg)
+ private Integer weight;
+ //RFID标签(逗号分隔)
+ private String rfid;
+ //是否已处理:1:已处理
+ private int flag;
+ //创建时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+ //更新时间
+ @JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date updateTime;
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java b/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java
index 14af14b..352a048 100644
--- a/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java
+++ b/bs-loader/src/main/java/com/data/emqx/domain/LoadCarDataSoure.java
@@ -44,6 +44,8 @@ public class LoadCarDataSoure implements Serializable {
private int flag;
private String tag;
+ //关联的emqx_load_car表的ID
+ private String loadCarId;
//创建时间
@JsonFormat(locale = "zh",timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
@@ -175,4 +177,12 @@ public class LoadCarDataSoure implements Serializable {
public void setTag(String tag) {
this.tag = tag;
}
+
+ public String getLoadCarId() {
+ return loadCarId;
+ }
+
+ public void setLoadCarId(String loadCarId) {
+ this.loadCarId = loadCarId;
+ }
}
diff --git a/bs-loader/src/main/java/com/data/emqx/domain/OtaProgress.java b/bs-loader/src/main/java/com/data/emqx/domain/OtaProgress.java
new file mode 100644
index 0000000..5cad394
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/domain/OtaProgress.java
@@ -0,0 +1,79 @@
+package com.data.emqx.domain;
+
+import java.util.Date;
+
+public class OtaProgress {
+
+ private String id;
+ private String deviceId;
+ private String taskId;
+ private Integer progress;
+ private String status;
+ private String failReason;
+ private Date createTime;
+ private Date updateTime;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getDeviceId() {
+ return deviceId;
+ }
+
+ public void setDeviceId(String deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public Integer getProgress() {
+ return progress;
+ }
+
+ public void setProgress(Integer progress) {
+ this.progress = progress;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getFailReason() {
+ return failReason;
+ }
+
+ public void setFailReason(String failReason) {
+ this.failReason = failReason;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Date getUpdateTime() {
+ return updateTime;
+ }
+
+ public void setUpdateTime(Date updateTime) {
+ this.updateTime = updateTime;
+ }
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/mapper/DeviceDataMapper.java b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceDataMapper.java
new file mode 100644
index 0000000..b268406
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceDataMapper.java
@@ -0,0 +1,51 @@
+package com.data.emqx.mapper;
+
+import com.data.emqx.domain.DeviceAlarm;
+import com.data.emqx.domain.DeviceAlarmUwb;
+import com.data.emqx.domain.DeviceHeart;
+import com.data.emqx.domain.DeviceWeightRfid;
+
+/**
+ * @author licd
+ * @className DeviceDataMapper
+ * @description 设备数据Mapper
+ * @date 2026/03/23
+ */
+public interface DeviceDataMapper {
+
+ /**
+ * 统计称重&标签数据数量
+ */
+ int countWeightRfidByDeviceIdAndSeqNum(String deviceId, Long seqNum);
+
+ /**
+ * 插入称重&标签数据
+ */
+ void insertWeightRfid(DeviceWeightRfid deviceWeightRfid);
+
+ /**
+ * 统计告警数据数量
+ */
+ int countAlarmByDeviceIdAndSeqNum(String deviceId, Long seqNum);
+
+ /**
+ * 插入告警数据
+ */
+ void insertAlarm(DeviceAlarm deviceAlarm);
+
+ /**
+ * 统计心跳数据数量
+ */
+ int countHeartByDeviceIdAndSendTime(String deviceId, Long sendTime);
+
+ /**
+ * 插入心跳数据
+ */
+ void insertHeart(DeviceHeart deviceHeart);
+
+ /**
+ * 插入告警UWB数据
+ */
+ void insertAlarmUwb(DeviceAlarmUwb deviceAlarmUwb);
+
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/mapper/DeviceParamWriteMapper.java b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceParamWriteMapper.java
new file mode 100644
index 0000000..a3e69bb
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/mapper/DeviceParamWriteMapper.java
@@ -0,0 +1,34 @@
+package com.data.emqx.mapper;
+
+import com.data.emqx.domain.DeviceParamWrite;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+
+public interface DeviceParamWriteMapper {
+
+ /**
+ * 插入设备参数写入记录
+ */
+ void insertParamWrite(DeviceParamWrite paramWrite);
+
+ /**
+ * 更新参数写入状态
+ */
+ void updateStatusByDeviceIdAndParamKey(DeviceParamWrite paramWrite);
+
+ /**
+ * 根据设备ID和参数名查询参数
+ */
+ DeviceParamWrite selectByDeviceIdAndParamKey(@Param("deviceId") String deviceId, @Param("paramKey") String paramKey);
+
+ /**
+ * 查询设备所有待同步的参数(状态为Pending)
+ */
+ List selectPendingParamsByDeviceId(String deviceId);
+
+ /**
+ * 根据设备ID查询所有参数记录
+ */
+ List selectByDeviceId(String deviceId);
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/mapper/OtaProgressMapper.java b/bs-loader/src/main/java/com/data/emqx/mapper/OtaProgressMapper.java
new file mode 100644
index 0000000..0103ed0
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/mapper/OtaProgressMapper.java
@@ -0,0 +1,21 @@
+package com.data.emqx.mapper;
+
+import com.data.emqx.domain.OtaProgress;
+
+public interface OtaProgressMapper {
+
+ /**
+ * 插入OTA升级进度记录
+ */
+ void insertOtaProgress(OtaProgress otaProgress);
+
+ /**
+ * 根据设备ID和任务ID更新进度
+ */
+ void updateProgressByDeviceIdAndTaskId(OtaProgress otaProgress);
+
+ /**
+ * 根据设备ID和任务ID查询进度
+ */
+ OtaProgress selectByDeviceIdAndTaskId(String deviceId, String taskId);
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/service/IDeviceDataService.java b/bs-loader/src/main/java/com/data/emqx/service/IDeviceDataService.java
new file mode 100644
index 0000000..701587c
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/service/IDeviceDataService.java
@@ -0,0 +1,99 @@
+package com.data.emqx.service;
+
+/**
+ * @author licd
+ * @className IDeviceDataService
+ * @description 设备数据处理服务接口
+ * @date 2026/03/17
+ */
+
+public interface IDeviceDataService {
+
+ /**
+ * 检查是否已存在相同的称重&标签数据
+ *
+ * @param deviceId 设备ID
+ * @param seqNum 序列号
+ * @return true表示已存在,false表示不存在
+ */
+ boolean existsWeightRfidData(String deviceId, Long seqNum);
+
+ /**
+ * 保存称重&标签数据
+ *
+ * @param deviceId 设备ID
+ * @param seqNum 序列号
+ * @param sendTime 发送时间(UTC0时间戳)
+ * @param weight 重量
+ * @param rfid RFID标签(逗号分隔的字符串)
+ */
+ void saveWeightRfidData(String deviceId, Long seqNum, Long sendTime, Integer weight, String rfid);
+
+ /**
+ * 检查是否已存在相同的告警数据
+ *
+ * @param deviceId 设备ID
+ * @param seqNum 序列号
+ * @return true表示已存在,false表示不存在
+ */
+ boolean existsAlarmData(String deviceId, Long seqNum);
+
+ /**
+ * 保存告警数据
+ *
+ * @param deviceId 设备ID
+ * @param seqNum 序列号
+ * @param sendTime 发送时间(UTC0时间戳)
+ * @param uwb UWB异常数据(格式化字符串)
+ * @param uwbArray UWB原始数据数组
+ */
+ void saveAlarmData(String deviceId, Long seqNum, Long sendTime, String uwb, com.alibaba.fastjson.JSONArray uwbArray);
+
+ /**
+ * 检查是否已存在相同的心跳数据
+ *
+ * @param deviceId 设备ID
+ * @param sendTime 发送时间(UTC0时间戳)
+ * @return true表示已存在,false表示不存在
+ */
+ boolean existsHeartData(String deviceId, Long sendTime);
+
+ /**
+ * 保存心跳数据
+ *
+ * @param deviceId 设备ID
+ * @param sysVol 系统电压(Float类型)
+ * @param netRSSI 网络信号强度
+ * @param sendTime 发送时间(UTC0时间戳)
+ */
+ void saveHeartData(String deviceId, Double sysVol, Integer netRSSI, Long sendTime);
+
+ /**
+ * V2.0.1 保存OTA升级进度数据
+ *
+ * @param deviceId 设备ID
+ * @param taskID 升级任务ID
+ * @param progress 升级进度(0-100)
+ * @param status 升级状态(OK/Fail)
+ */
+ void saveOtaProgressData(String deviceId, String taskID, Integer progress, String status);
+
+ /**
+ * V2.0.1 保存设备参数读取响应数据
+ *
+ * @param deviceId 设备ID
+ * @param seqNum 序列号
+ * @param params 参数JSON对象
+ */
+ void saveParamReadData(String deviceId, Long seqNum, com.alibaba.fastjson.JSONObject params);
+
+ /**
+ * V2.0.1 保存设备参数写入响应数据
+ *
+ * @param deviceId 设备ID
+ * @param seqNum 序列号
+ * @param status 写入状态(OK/Fail)
+ * @param failReason 失败原因(可选)
+ */
+ void saveParamWriteData(String deviceId, Long seqNum, String status, String failReason);
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/service/impl/DeviceDataServiceImpl.java b/bs-loader/src/main/java/com/data/emqx/service/impl/DeviceDataServiceImpl.java
new file mode 100644
index 0000000..913e894
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/service/impl/DeviceDataServiceImpl.java
@@ -0,0 +1,402 @@
+package com.data.emqx.service.impl;
+
+/**
+ * @author licd
+ * @className DeviceDataServiceImpl
+ * @description 设备数据处理服务实现
+ * @date 2026/03/17
+ */
+
+import com.data.emqx.domain.DeviceAlarm;
+import com.data.emqx.domain.DeviceAlarmUwb;
+import com.data.emqx.domain.DeviceHeart;
+import com.data.emqx.domain.DeviceParamWrite;
+import com.data.emqx.domain.DeviceWeightRfid;
+import com.data.emqx.domain.LoadCarDataSoure;
+import com.data.emqx.domain.OtaProgress;
+import com.data.emqx.mapper.DeviceDataMapper;
+import com.data.emqx.mapper.DeviceParamWriteMapper;
+import com.data.emqx.mapper.LoadCarDataSoureMapper;
+import com.data.emqx.mapper.OtaProgressMapper;
+import com.data.emqx.service.IDeviceDataService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.UUID;
+
+@Service
+public class DeviceDataServiceImpl implements IDeviceDataService {
+
+ private static final Logger logger = LoggerFactory.getLogger(DeviceDataServiceImpl.class);
+
+ @Autowired
+ private DeviceDataMapper deviceDataMapper;
+
+ @Autowired
+ private LoadCarDataSoureMapper loadCarDataSoureMapper;
+
+ @Autowired
+ private OtaProgressMapper otaProgressMapper;
+
+ @Autowired
+ private DeviceParamWriteMapper deviceParamWriteMapper;
+
+ private final SimpleDateFormat utcFormat;
+ private final SimpleDateFormat timestampFormat;
+
+ public DeviceDataServiceImpl() {
+ utcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ utcFormat.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai")); // 使用东八区时区
+
+ timestampFormat = new SimpleDateFormat("yyyyMMddHHmmssSSS");
+ timestampFormat.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai")); // 使用东八区时区
+ }
+
+ /**
+ * 检查是否已存在相同的称重&标签数据
+ */
+ @Override
+ public boolean existsWeightRfidData(String deviceId, Long seqNum) {
+ try {
+ int count = deviceDataMapper.countWeightRfidByDeviceIdAndSeqNum(deviceId, seqNum);
+ return count > 0;
+ } catch (Exception e) {
+ logger.error("检查称重数据是否存在异常: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+
+ /**
+ * 保存称重&标签数据
+ */
+ @Override
+ public void saveWeightRfidData(String deviceId, Long seqNum, Long sendTime, Integer weight, String rfid) {
+ try {
+ Date sendDate = new Date(sendTime * 1000);
+ String formattedTime = utcFormat.format(sendDate);
+ String timestampTime = timestampFormat.format(sendDate);
+
+ logger.info("保存称重数据 - DeviceID: {}, SeqNum: {}, SendTime(UTC0): {}, Weight: {}kg, RFID: {}",
+ deviceId, seqNum, formattedTime, weight, rfid);
+
+ // 创建 LoadCarDataSoure 对象用于保存数据
+ LoadCarDataSoure loadCarDataSoure = new LoadCarDataSoure();
+ String loadCarId = UUID.randomUUID().toString();
+ loadCarDataSoure.setId(loadCarId);
+ loadCarDataSoure.setDeviceId(deviceId);
+ loadCarDataSoure.setDeviceStatus("OK"); // 设备状态,默认正常
+ loadCarDataSoure.setSendTime(timestampTime); // 时间格式转换为 yyyyMMddHHmmssSSS 格式
+ loadCarDataSoure.setWeight(weight != null ? weight.doubleValue() : 0.0);
+ loadCarDataSoure.setWeightStatus(1); // 重量状态,1表示有变化
+ loadCarDataSoure.setWeightTime(timestampTime); // 称重时间,使用 yyyyMMddHHmmssSSS 格式
+ loadCarDataSoure.setFlag(0); // 是否已处理,0表示未处理
+ loadCarDataSoure.setTag("weight"); // 标签,用于标识数据类型
+ loadCarDataSoure.setCreateTime(new Date());
+ loadCarDataSoure.setUpdateTime(new Date());
+
+ // 保存到 emqx_load_car 表
+ loadCarDataSoureMapper.addLoadCarDataSourceMapper(loadCarDataSoure);
+ logger.info("称重数据保存到 emqx_load_car 成功 - DeviceID: {}, LoadCarID: {}", deviceId, loadCarId);
+
+ // 处理 RFID 数据,保存到 emqx_unload_car 表
+ if (rfid != null && !rfid.isEmpty()) {
+ String[] rfidArray = rfid.split(",");
+ LoadCarDataSoure unloadCarDataSoure = new LoadCarDataSoure();
+ unloadCarDataSoure.setId(UUID.randomUUID().toString());
+ unloadCarDataSoure.setDeviceId(deviceId);
+ unloadCarDataSoure.setDeviceStatus("OK");
+ unloadCarDataSoure.setSendTime(timestampTime); // 时间格式转换为 yyyyMMddHHmmssSSS 格式
+ unloadCarDataSoure.setRfidNum(rfidArray.length);
+ unloadCarDataSoure.setRfidNo(rfid);
+ unloadCarDataSoure.setRfidTime(timestampTime); // RFID 时间,使用 yyyyMMddHHmmssSSS 格式
+ unloadCarDataSoure.setFlag(0);
+ unloadCarDataSoure.setTag("rfid");
+ unloadCarDataSoure.setLoadCarId(loadCarId); // 关联 emqx_load_car 表的 ID
+ unloadCarDataSoure.setCreateTime(new Date());
+ unloadCarDataSoure.setUpdateTime(new Date());
+
+ // 保存到 emqx_unload_car 表
+ loadCarDataSoureMapper.addUnLoadCarDataSourceMapper(unloadCarDataSoure);
+ logger.info("RFID数据保存到 emqx_unload_car 成功 - DeviceID: {}, LoadCarID: {}, RFID数量: {}",
+ deviceId, loadCarId, rfidArray.length);
+ }
+
+ // 同时保存到原有的 emqx_device_weight_rfid 表,确保数据完整性
+ DeviceWeightRfid deviceWeightRfid = new DeviceWeightRfid();
+ deviceWeightRfid.setId(UUID.randomUUID().toString());
+ deviceWeightRfid.setDeviceId(deviceId);
+ deviceWeightRfid.setSeqNum(seqNum);
+ deviceWeightRfid.setSendTime(sendTime);
+ deviceWeightRfid.setWeight(weight);
+ deviceWeightRfid.setRfid(rfid);
+ deviceWeightRfid.setFlag(0);
+ deviceWeightRfid.setCreateTime(new Date());
+ deviceWeightRfid.setUpdateTime(new Date());
+
+ deviceDataMapper.insertWeightRfid(deviceWeightRfid);
+ logger.info("称重数据保存到 emqx_device_weight_rfid 成功 - DeviceID: {}, SeqNum: {}", deviceId, seqNum);
+
+ } catch (Exception e) {
+ logger.error("保存称重数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 检查是否已存在相同的告警数据
+ */
+ @Override
+ public boolean existsAlarmData(String deviceId, Long seqNum) {
+ try {
+ int count = deviceDataMapper.countAlarmByDeviceIdAndSeqNum(deviceId, seqNum);
+ return count > 0;
+ } catch (Exception e) {
+ logger.error("检查告警数据是否存在异常: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+
+ /**
+ * 检查是否已存在相同的心跳数据
+ */
+ @Override
+ public boolean existsHeartData(String deviceId, Long sendTime) {
+ try {
+ int count = deviceDataMapper.countHeartByDeviceIdAndSendTime(deviceId, sendTime);
+ return count > 0;
+ } catch (Exception e) {
+ logger.error("检查心跳数据是否存在异常: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+
+ /**
+ * 保存告警数据
+ */
+ @Override
+ public void saveAlarmData(String deviceId, Long seqNum, Long sendTime, String uwb, com.alibaba.fastjson.JSONArray uwbArray) {
+ try {
+ Date sendDate = new Date(sendTime * 1000);
+ String formattedTime = utcFormat.format(sendDate);
+
+ logger.info("保存告警数据 - DeviceID: {}, SeqNum: {}, SendTime(UTC0): {}, UWB: {}",
+ deviceId, seqNum, formattedTime, uwb);
+
+ // 创建对象并设置值
+ DeviceAlarm deviceAlarm = new DeviceAlarm();
+ String alarmId = UUID.randomUUID().toString();
+ deviceAlarm.setId(alarmId);
+ deviceAlarm.setDeviceId(deviceId);
+ deviceAlarm.setSeqNum(seqNum);
+ deviceAlarm.setSendTime(sendTime);
+ deviceAlarm.setUwb(uwb);
+ deviceAlarm.setFlag(0);
+ deviceAlarm.setCreateTime(new Date());
+ deviceAlarm.setUpdateTime(new Date());
+
+ // 保存到数据库
+ deviceDataMapper.insertAlarm(deviceAlarm);
+ logger.info("告警数据保存成功 - DeviceID: {}, SeqNum: {}", deviceId, seqNum);
+
+ // 保存UWB数据到关联表
+ if (uwbArray != null && !uwbArray.isEmpty()) {
+ for (int i = 0; i < uwbArray.size(); i++) {
+ com.alibaba.fastjson.JSONObject uwbObj = uwbArray.getJSONObject(i);
+ if (uwbObj != null) {
+ String id = uwbObj.getString("ID");
+ String dist = uwbObj.getString("Dist");
+
+ if (id != null && dist != null) {
+ DeviceAlarmUwb deviceAlarmUwb = new DeviceAlarmUwb();
+ deviceAlarmUwb.setId(UUID.randomUUID().toString());
+ deviceAlarmUwb.setAlarmId(alarmId);
+ deviceAlarmUwb.setUwbId(id);
+ deviceAlarmUwb.setDist(dist);
+ deviceAlarmUwb.setCreateTime(new Date());
+ deviceAlarmUwb.setUpdateTime(new Date());
+
+ deviceDataMapper.insertAlarmUwb(deviceAlarmUwb);
+ logger.info("UWB数据保存成功 - AlarmID: {}, UWB ID: {}, Dist: {}", alarmId, id, dist);
+ }
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ logger.error("保存告警数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 保存心跳数据
+ */
+ @Override
+ public void saveHeartData(String deviceId, Double sysVol, Integer netRSSI, Long sendTime) {
+ try {
+ Date sendDate = new Date(sendTime * 1000);
+ String formattedTime = utcFormat.format(sendDate);
+
+ logger.info("保存心跳数据 - DeviceID: {}, sysVol: {}, netRSSI: {}, SendTime(UTC0): {}",
+ deviceId, sysVol, netRSSI, formattedTime);
+
+ // 创建对象并设置值
+ DeviceHeart deviceHeart = new DeviceHeart();
+ deviceHeart.setId(UUID.randomUUID().toString());
+ deviceHeart.setDeviceId(deviceId);
+ deviceHeart.setSysVol(sysVol);
+ deviceHeart.setNetRSSI(netRSSI);
+ deviceHeart.setSendTime(sendTime);
+ deviceHeart.setFlag(0);
+ deviceHeart.setCreateTime(new Date());
+ deviceHeart.setUpdateTime(new Date());
+
+ // 保存到数据库
+ deviceDataMapper.insertHeart(deviceHeart);
+ logger.info("心跳数据保存成功 - DeviceID: {}", deviceId);
+
+ } catch (Exception e) {
+ logger.error("保存心跳数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 保存OTA升级进度数据
+ */
+ @Override
+ public void saveOtaProgressData(String deviceId, String taskID, Integer progress, String status) {
+ try {
+ logger.info("保存OTA升级进度数据 - DeviceID: {}, TaskID: {}, Progress: {}%, Status: {}",
+ deviceId, taskID, progress, status);
+
+ // 查询是否已存在该任务记录
+ OtaProgress existingProgress = otaProgressMapper.selectByDeviceIdAndTaskId(deviceId, taskID);
+
+ if (existingProgress != null) {
+ // 更新现有记录
+ existingProgress.setProgress(progress);
+ existingProgress.setStatus(status);
+ existingProgress.setUpdateTime(new Date());
+ otaProgressMapper.updateProgressByDeviceIdAndTaskId(existingProgress);
+ logger.info("OTA升级进度数据更新成功 - DeviceID: {}, TaskID: {}", deviceId, taskID);
+ } else {
+ // 创建新记录
+ OtaProgress otaProgress = new OtaProgress();
+ otaProgress.setId(UUID.randomUUID().toString());
+ otaProgress.setDeviceId(deviceId);
+ otaProgress.setTaskId(taskID);
+ otaProgress.setProgress(progress);
+ otaProgress.setStatus(status);
+ otaProgress.setCreateTime(new Date());
+ otaProgress.setUpdateTime(new Date());
+ otaProgressMapper.insertOtaProgress(otaProgress);
+ logger.info("OTA升级进度数据保存成功 - DeviceID: {}, TaskID: {}", deviceId, taskID);
+ }
+
+ } catch (Exception e) {
+ logger.error("保存OTA升级进度数据异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 保存设备参数读取响应数据
+ */
+ @Override
+ public void saveParamReadData(String deviceId, Long seqNum, com.alibaba.fastjson.JSONObject params) {
+ try {
+ logger.info("保存设备参数读取响应 - DeviceID: {}, SeqNum: {}, Params: {}",
+ deviceId, seqNum, params != null ? params.toJSONString() : "null");
+
+ // 解析并记录各参数
+ if (params != null) {
+ Integer rs485PollMs = params.getInteger("Rs485_PollMs");
+ Integer uwbAlarmMs = params.getInteger("UWB_AlarmMs");
+ Integer voiceAlarmMs = params.getInteger("VoiceAlarmMs");
+ Integer uwbIgnoreCm = params.getInteger("UWB_IgnoreCm");
+ Integer uwbAlarmCm = params.getInteger("UWB_AlarmCm");
+ String iccid = params.getString("ICCID");
+
+ logger.info("设备参数详情 - Rs485_PollMs: {}, UWB_AlarmMs: {}, VoiceAlarmMs: {}, UWB_IgnoreCm: {}, UWB_AlarmCm: {}, ICCID: {}",
+ rs485PollMs, uwbAlarmMs, voiceAlarmMs, uwbIgnoreCm, uwbAlarmCm, iccid);
+ }
+
+ // TODO: 需要创建 emqx_device_param_read 表和对应的 Domain/Mapper
+ // 目前仅记录日志,后续需要实现数据库保存逻辑
+
+ logger.info("设备参数读取响应保存成功 - DeviceID: {}, SeqNum: {}", deviceId, seqNum);
+
+ } catch (Exception e) {
+ logger.error("保存设备参数读取响应异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 保存设备参数写入响应数据
+ * 更新参数写入状态,用于实现"设备期望值"功能
+ */
+ @Override
+ public void saveParamWriteData(String deviceId, Long seqNum, String status, String failReason) {
+ try {
+ logger.info("保存设备参数写入响应 - DeviceID: {}, SeqNum: {}, Status: {}, FailReason: {}",
+ deviceId, seqNum, status, failReason);
+
+ // 创建更新对象
+ DeviceParamWrite paramWrite = new DeviceParamWrite();
+ paramWrite.setDeviceId(deviceId);
+ paramWrite.setSeqNum(seqNum);
+ paramWrite.setStatus(status);
+ paramWrite.setFailReason(failReason);
+ paramWrite.setUpdateTime(new Date());
+
+ // 更新状态(这里简化处理,实际应该根据具体参数名更新)
+ // 在实际场景中,设备会返回具体哪个参数写入成功/失败
+ // 这里需要根据业务需求进一步完善
+
+ logger.info("设备参数写入响应保存成功 - DeviceID: {}, SeqNum: {}, Status: {}", deviceId, seqNum, status);
+
+ } catch (Exception e) {
+ logger.error("保存设备参数写入响应异常: {}", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * V2.0.1 保存设备参数期望值(用于设备上线后自动拉平配置)
+ */
+ public void saveParamExpectation(String deviceId, String paramKey, String paramValue) {
+ try {
+ logger.info("保存设备参数期望值 - DeviceID: {}, ParamKey: {}, ParamValue: {}",
+ deviceId, paramKey, paramValue);
+
+ // 查询是否已存在该参数记录
+ DeviceParamWrite existingParam = deviceParamWriteMapper.selectByDeviceIdAndParamKey(deviceId, paramKey);
+
+ if (existingParam != null) {
+ // 更新现有记录的期望值
+ existingParam.setParamValue(paramValue);
+ existingParam.setStatus("Pending");
+ existingParam.setFailReason(null);
+ existingParam.setUpdateTime(new Date());
+ deviceParamWriteMapper.updateStatusByDeviceIdAndParamKey(existingParam);
+ logger.info("设备参数期望值更新成功 - DeviceID: {}, ParamKey: {}", deviceId, paramKey);
+ } else {
+ // 创建新记录
+ DeviceParamWrite paramWrite = new DeviceParamWrite();
+ paramWrite.setId(UUID.randomUUID().toString());
+ paramWrite.setDeviceId(deviceId);
+ paramWrite.setParamKey(paramKey);
+ paramWrite.setParamValue(paramValue);
+ paramWrite.setStatus("Pending");
+ paramWrite.setCreateTime(new Date());
+ paramWrite.setUpdateTime(new Date());
+ deviceParamWriteMapper.insertParamWrite(paramWrite);
+ logger.info("设备参数期望值保存成功 - DeviceID: {}, ParamKey: {}", deviceId, paramKey);
+ }
+
+ } catch (Exception e) {
+ logger.error("保存设备参数期望值异常: {}", e.getMessage(), e);
+ }
+ }
+}
diff --git a/bs-loader/src/main/java/com/data/emqx/test/DeviceDataTest.java b/bs-loader/src/main/java/com/data/emqx/test/DeviceDataTest.java
new file mode 100644
index 0000000..7dfde87
--- /dev/null
+++ b/bs-loader/src/main/java/com/data/emqx/test/DeviceDataTest.java
@@ -0,0 +1,113 @@
+package com.data.emqx.test;
+
+import com.LoaderApplication;
+import com.data.emqx.service.IDeviceDataService;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * @author licd
+ * @className DeviceDataTest
+ * @description 设备数据保存测试
+ * @date 2026/03/23
+ */
+public class DeviceDataTest {
+
+ public static void main(String[] args) {
+ System.out.println("=== 设备数据保存测试 ===");
+
+ // 启动Spring Boot应用
+ ApplicationContext context = SpringApplication.run(LoaderApplication.class, args);
+
+ // 获取设备数据服务
+ IDeviceDataService deviceDataService = context.getBean(IDeviceDataService.class);
+
+ if (deviceDataService == null) {
+ System.err.println("无法获取DeviceDataService实例");
+ return;
+ }
+
+ System.out.println("成功获取DeviceDataService实例");
+
+ // 测试保存称重&标签数据
+ testSaveWeightRfidData(deviceDataService);
+
+ // 测试保存告警数据
+ testSaveAlarmData(deviceDataService);
+
+ // 测试保存心跳数据
+ testSaveHeartData(deviceDataService);
+
+ System.out.println("=== 测试完成 ===");
+
+ // 关闭应用
+ System.exit(0);
+ }
+
+ /**
+ * 测试保存称重&标签数据
+ */
+ private static void testSaveWeightRfidData(IDeviceDataService deviceDataService) {
+ System.out.println("\n=== 测试保存称重&标签数据 ===");
+ try {
+ String deviceId = "test_device_001";
+ Long seqNum = 123456789L;
+ Long sendTime = System.currentTimeMillis() / 1000;
+ Integer weight = 1000;
+ String rfid = "e280111122223333,b001111122223333";
+
+ deviceDataService.saveWeightRfidData(deviceId, seqNum, sendTime, weight, rfid);
+ System.out.println("称重&标签数据保存测试成功");
+ } catch (Exception e) {
+ System.err.println("称重&标签数据保存测试失败: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 测试保存告警数据
+ */
+ private static void testSaveAlarmData(IDeviceDataService deviceDataService) {
+ System.out.println("\n=== 测试保存告警数据 ===");
+ try {
+ String deviceId = "test_device_001";
+ Long seqNum = 987654321L;
+ Long sendTime = System.currentTimeMillis() / 1000;
+ String uwb = "ID:1234,dist:2000; ID:5678,dist:1500";
+ com.alibaba.fastjson.JSONArray uwbArray = new com.alibaba.fastjson.JSONArray();
+ com.alibaba.fastjson.JSONObject uwbObj1 = new com.alibaba.fastjson.JSONObject();
+ uwbObj1.put("ID", "1234");
+ uwbObj1.put("Dist", "2000 cm");
+ uwbArray.add(uwbObj1);
+ com.alibaba.fastjson.JSONObject uwbObj2 = new com.alibaba.fastjson.JSONObject();
+ uwbObj2.put("ID", "5678");
+ uwbObj2.put("Dist", "1500 cm");
+ uwbArray.add(uwbObj2);
+
+ deviceDataService.saveAlarmData(deviceId, seqNum, sendTime, uwb, uwbArray);
+ System.out.println("告警数据保存测试成功");
+ } catch (Exception e) {
+ System.err.println("告警数据保存测试失败: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 测试保存心跳数据
+ */
+ private static void testSaveHeartData(IDeviceDataService deviceDataService) {
+ System.out.println("\n=== 测试保存心跳数据 ===");
+ try {
+ String deviceId = "test_device_001";
+ Double sysVol = 12.8;
+ Integer netRSSI = -75;
+ Long sendTime = System.currentTimeMillis() / 1000;
+
+ deviceDataService.saveHeartData(deviceId, sysVol, netRSSI, sendTime);
+ System.out.println("心跳数据保存测试成功");
+ } catch (Exception e) {
+ System.err.println("心跳数据保存测试失败: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/bs-loader/src/main/resources/application-dev.yml b/bs-loader/src/main/resources/application-dev.yml
index b93a1f5..b75f2bb 100644
--- a/bs-loader/src/main/resources/application-dev.yml
+++ b/bs-loader/src/main/resources/application-dev.yml
@@ -32,13 +32,11 @@ spring:
dynamic:
primary: master
datasource:
- master:
- # url: jdbc:mysql://localhost:3306/bs-jxc-dev?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
- # username: bfpt_db
- # password: Bfpt@2024#
- url: jdbc:mysql://localhost:3306/bs-jxc-dev?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
- username: root
- password: root
+ master:
+ url: jdbc:mysql://127.0.0.1:3306/bs-jxc-dev?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false
+ username: bfpt_db
+ password: Bfpt@2024#
+ driverClassName: com.mysql.cj.jdbc.Driver
# 初始连接数
initialSize: 5
# 最小连接池数量
@@ -93,7 +91,7 @@ mybatis-plus:
global-config:
banner: false
db-config:
- id-type:
+ id-type: id_worker
logic-delete-field: del_flag
logic-delete-value: 2
logic-not-delete-value: 0
@@ -121,5 +119,20 @@ emqx:
broker: "tcp://124.71.134.146:1883"
clientid: "emqx_clientid"
subscriptiontopic: "emqx_topic"
- qos: 1
- pubTopic: "emqx_topic_return"
\ No newline at end of file
+ qos: 2
+ pubTopic: "emqx_topic_return"
+
+# 集疏运终端设备MQTT配置
+device:
+ mqtt:
+ broker: "tcp://127.0.0.1:1883"
+ username: "emqx_public"
+ password: "emqx_public"
+ clientId: "server_client_001"
+
+logging:
+ level:
+ root: INFO
+ org.springframework.web: ERROR
+ file:
+ name: C:\EMQX\log\emqx.log
\ No newline at end of file
diff --git a/bs-loader/src/main/resources/application-prod.yml b/bs-loader/src/main/resources/application-prod.yml
index dcb4d10..49d4b18 100644
--- a/bs-loader/src/main/resources/application-prod.yml
+++ b/bs-loader/src/main/resources/application-prod.yml
@@ -33,9 +33,9 @@ spring:
primary: master
datasource:
master:
- url: jdbc:mysql://localhost:3306/bs-jxc-dev?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false
- username: root
- password: root
+ url: jdbc:mysql://127.0.0.1:3306/bs-jxc-dev?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
+ username: bfpt_db
+ password: Bfpt@2024#
driverClassName: com.mysql.cj.jdbc.Driver
# 初始连接数
initialSize: 5
@@ -91,7 +91,7 @@ mybatis-plus:
global-config:
banner: false
db-config:
- id-type:
+ id-type: id_worker
logic-delete-field: del_flag
logic-delete-value: 2
logic-not-delete-value: 0
@@ -119,5 +119,19 @@ emqx:
broker: "tcp://127.0.0.1:1883"
clientid: "emqx_clientid"
subscriptiontopic: "emqx_topic"
- qos: 1
- pubTopic: "emqx_topic_return"
\ No newline at end of file
+ qos: 2
+ pubTopic: "emqx_topic_return"
+
+# 集疏运终端设备MQTT配置
+device:
+ mqtt:
+ broker: "tcp://127.0.0.1:1883"
+ username: "emqx_public"
+ password: "emqx_public"
+ clientId: "server_client_001"
+logging:
+ level:
+ root: INFO
+ org.springframework.web: ERROR
+ file:
+ name: C:\EMQX\log\emqx.log
\ No newline at end of file
diff --git a/bs-loader/src/main/resources/log4j.properties b/bs-loader/src/main/resources/log4j.properties
index f54478a..437947b 100644
--- a/bs-loader/src/main/resources/log4j.properties
+++ b/bs-loader/src/main/resources/log4j.properties
@@ -1,6 +1,6 @@
log4j.rootLogger=INFO, FILE
log4j.appender.FILE=org.apache.log4j.FileAppender
-log4j.appender.FILE.File=D:/EMQX/log/emqx_receive.log
+log4j.appender.FILE.File=C:/EMQX/log/emqx_receive.log
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n
\ No newline at end of file
diff --git a/bs-loader/src/main/resources/mybatis/emqx/DeviceDataMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/DeviceDataMapper.xml
new file mode 100644
index 0000000..bc85ef9
--- /dev/null
+++ b/bs-loader/src/main/resources/mybatis/emqx/DeviceDataMapper.xml
@@ -0,0 +1,118 @@
+
+
+
+
+
+
+
+
+
+ insert into emqx_device_weight_rfid (
+ id,
+ device_id,
+ seq_num,
+ send_time,
+ weight,
+ rfid,
+ flag,
+ create_time,
+ update_time)
+ values (
+ #{id},
+ #{deviceId},
+ #{seqNum},
+ #{sendTime},
+ #{weight},
+ #{rfid},
+ #{flag},
+ #{createTime},
+ #{updateTime}
+ )
+
+
+
+
+
+
+
+ insert into emqx_device_alarm (
+ id,
+ device_id,
+ seq_num,
+ send_time,
+ uwb,
+ flag,
+ create_time,
+ update_time)
+ values (
+ #{id},
+ #{deviceId},
+ #{seqNum},
+ #{sendTime},
+ #{uwb},
+ #{flag},
+ #{createTime},
+ #{updateTime}
+ )
+
+
+
+
+
+
+
+ insert into emqx_device_heart (
+ id,
+ device_id,
+ sys_vol,
+ net_rssi,
+ send_time,
+ flag,
+ create_time,
+ update_time)
+ values (
+ #{id},
+ #{deviceId},
+ #{sysVol},
+ #{netRSSI},
+ #{sendTime},
+ #{flag},
+ #{createTime},
+ #{updateTime}
+ )
+
+
+
+
+ insert into emqx_device_alarm_uwb (
+ id,
+ alarm_id,
+ uwb_id,
+ dist,
+ create_time,
+ update_time)
+ values (
+ #{id},
+ #{alarmId},
+ #{uwbId},
+ #{dist},
+ #{createTime},
+ #{updateTime}
+ )
+
+
+
diff --git a/bs-loader/src/main/resources/mybatis/emqx/DeviceParamWriteMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/DeviceParamWriteMapper.xml
new file mode 100644
index 0000000..678af7d
--- /dev/null
+++ b/bs-loader/src/main/resources/mybatis/emqx/DeviceParamWriteMapper.xml
@@ -0,0 +1,91 @@
+
+
+
+
+
+ insert into emqx_device_param_write (
+ id,
+ device_id,
+ seq_num,
+ param_key,
+ param_value,
+ status,
+ fail_reason,
+ create_time,
+ update_time
+ ) values (
+ #{id},
+ #{deviceId},
+ #{seqNum},
+ #{paramKey},
+ #{paramValue},
+ #{status},
+ #{failReason},
+ #{createTime},
+ #{updateTime}
+ )
+ on duplicate key update
+ seq_num = #{seqNum},
+ param_value = #{paramValue},
+ status = #{status},
+ fail_reason = #{failReason},
+ update_time = #{updateTime}
+
+
+
+ update emqx_device_param_write
+ set status = #{status},
+ seq_num = #{seqNum},
+ fail_reason = #{failReason},
+ update_time = #{updateTime}
+ where device_id = #{deviceId}
+ and param_key = #{paramKey}
+
+
+
+
+
+
+
+
+
diff --git a/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml
index dd4199f..0297aec 100644
--- a/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml
+++ b/bs-loader/src/main/resources/mybatis/emqx/LoadCarDataSoureMapper.xml
@@ -43,6 +43,7 @@
rfid_time,
flag,
tag,
+ load_car_id,
create_time,
update_time)
values (
@@ -55,6 +56,7 @@
#{rfidTime},
#{flag},
#{tag},
+ #{loadCarId},
#{createTime},
#{updateTime}
)
diff --git a/bs-loader/src/main/resources/mybatis/emqx/OtaProgressMapper.xml b/bs-loader/src/main/resources/mybatis/emqx/OtaProgressMapper.xml
new file mode 100644
index 0000000..fcf155c
--- /dev/null
+++ b/bs-loader/src/main/resources/mybatis/emqx/OtaProgressMapper.xml
@@ -0,0 +1,51 @@
+
+
+
+
+
+ insert into emqx_device_ota_progress (
+ id,
+ device_id,
+ task_id,
+ progress,
+ status,
+ fail_reason,
+ create_time,
+ update_time
+ ) values (
+ #{id},
+ #{deviceId},
+ #{taskId},
+ #{progress},
+ #{status},
+ #{failReason},
+ #{createTime},
+ #{updateTime}
+ )
+
+
+
+ update emqx_device_ota_progress
+ set progress = #{progress},
+ status = #{status},
+ fail_reason = #{failReason},
+ update_time = #{updateTime}
+ where device_id = #{deviceId}
+ and task_id = #{taskId}
+
+
+
+
+