数据同步(消息网关)开发指南
前提条件
开发者控制台数据同步设置已经打开启用状态开关,针对单个设备型号,可以单独开关 数据同步和测试通道配置。
查看网关地址及消费凭证
认证身份信息需要使用 MQ_URL、GATEWAY_URL、appKey 和 appSecret 用于鉴权。见:sdk 集成及接收消息示例
- 其中,MQ_URL 是连接节点,具体取值如下表所示:
| 区域 | MQ_URL |
|---|---|
| 中国大陆 | pulsar+ssl://mqp.iotrtc.cn:6651 |
| 美国 | pulsar+ssl://mqp.iotrtcus.com:6651 |
| 德国 | pulsar+ssl://mqp.iotrtceu.com:6651 |
| 新加坡 | pulsar+ssl://mqp.iotrtcx.com:6651 |
- GATEWAY_URL,参见 API 网关协议与地址
- 身份认证需要使用 appKey 和 appSecret,该信息可以从控制台 数据同步 中获取。
1,一个消息只能被一个消费端消费,如果启动了多个消费端,则一个消息会被多个消费端中的一个随机消费。
2,为了满足开发调试和正式生产消息隔离。针对测试环境可以配置测试设备白名单。具体见测试和生产环境说明。
3,私有化部署版本所需的URL、凭证等信息,请咨询相关对接人员。也支持开发调试和正式生产消息隔离。
4,国标设备定义的消息,仅适用于项目中使用了国标设备,一般是私有化部署版本需要。
消息格式
物的属性变更消息
topic:/${productKey}/${deviceName}/thing/event/property/post
消息字段说明如下。
| 参数 | 类型 | 含义 |
|---|---|---|
| deviceType | String | 设备所属品类 |
| gmtCreate | Long | 数据流转消息产生时间,自 1970-1-1 起流逝的毫秒值 |
| iotId | String | 设备的唯一 ID |
| productKey | String | 设备所属品类 |
| deviceName | String | 设备所属品类 |
| items | JSON | 变更的数据集 |
${attribute} | String | 发生变更的属性,具体取值由具体情况确定 |
| value | 具体数据类型由具体情况确定 | 变更值 |
| time | Long | 设备属性发生变化的时间,自 1970-1-1 起流逝的毫秒值 |
消息示例如下。
{
"deviceType": "常电摄像机",
"iotId": "OfEiaUv**********fXQzu",
"productKey": "OfEiaUv******fP",
"gmtCreate": 1737083508000,
"deviceName": "TSV**********fXQzu",
"items": {
"StorageTotalCapacity": {
"value": 0,
"time": 1737083507980
}
}
}
物的事件变更消息
topic:/${productKey}/${deviceName}/thing/event/{tsl.event.identifier}/post
消息字段说明如下。
| 参数 | 类型 | 含义 |
|---|---|---|
| deviceType | String | 设备所属品类 |
| iotId | String | 设备的唯一 ID |
| productKey | String | 设备所属产品的唯一标识符 |
| deviceName | String | 设备名称 |
| identifier | String | 事件标识符,对应事件的 identifier |
| name | String | 事件名称 |
| type | String | 事件类型 |
| time | Long | 设备上报 value 对应的时间,自 1970-1-1 起流逝的毫秒值 |
| value | JSON | 变更的事件属性列表:key-value 键值对 |
| key | String | 属性 key |
| value | 具体数据类型由具体情况确定 | 属性取值 |
消息示例如下:
{
"deviceType": "常电摄像机",
"identifier": "Open****",
"iotId": "OfEiaUv**********fXQzu",
"name": "开****通知",
"time": 1737083508000,
"type": "info",
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"value": {
"KeyID": "x8******DY",
"Method": "fin******t"
}
}
物的状态变更消息
topic:/as/mqtt/status/{productKey}/{deviceName}
消息字段说明如下:
| 参数 | 类型 | 含义 |
|---|---|---|
| status | String | 设备状态。online:上线。offline:离线。 |
| iotId | String | 设备的唯一 ID |
| offlineReasonCode | Integer | 设备下线时,返回的错误码。 1911:设备跟云端之间 TCP 连接断开,导致设备离线。 |
| productKey | String | 设备所属产品的唯一标识符 |
| deviceName | String | 设备名称 |
| time | Long | 设备上、下线的时间,自 1970-1-1 起流逝的毫秒值 |
| clientIp | String | 设备公网出口 IP。 |
消息示例如下:
{
"status": "offline",
"iotId": "OfEiaUv**********fXQzu",
"offlineReasonCode": 1911,
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"time": 1737083508000,
"clientIp": "192.0.2.1"
}
用户绑定变更消息
用户绑定/解绑设备产生的回流消息,用于同步用户与设备的绑定、解绑。
topic:/${productKey}/${deviceName}/thing/awss/enrollee/user
消息字段说明如下:
| 参数 | 类型 | 含义 |
|---|---|---|
| bind | bool | true-绑定;false-解绑 |
| productKey | String | 设备所属产品的唯一标识符 |
| deviceName | String | 设备名称 |
| iotId | String | 设备的唯一 ID |
| messageCreateTime | Long | 消息创建时间 |
| identityInfos | list | 用户信息列表 |
| identityId | String | 用户身份 ID |
| owned | Integer | 拥有标记 0:分享者 1:拥有者 |
| params | Map | 扩展参数 |
消息示例如下:
{
"bind": true,
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"iotId": "OfEiaUv**********fXQzu",
"messageCreateTime": 1737083508000,
"identityInfos": [
{
"identityId": "1121***8",
"owned": 1
}
],
"params": {}
}
国标设备事件消息
topic:/gb/event/post
消息字段说明如下:
| 参数 | 类型 | 含义 |
|---|---|---|
| productKey | String | 设备所属产品的唯一标识符 |
| deviceName | String | 设备名称 |
| channelNo | String | 国标通道号 |
| data | JSON | 数据内容 |
| alarmDescription | String | 报警内容描述 |
| alarmMethod | String | 报警方式,1:电话报警,2:设备报警,3:短信报警,4:GPS 报警,5:视频报警,6:设备故障报警,7:其他报警 |
| alarmMethodDescription | String | 报警方式描述 |
| alarmPriority | String | 报警级别,1:一级警情,2:二级警情,3:三级警情,4:四级警情 |
| alarmPriorityDescription | String | 报警级别描述 |
| alarmTime | String | 报警时间,格式'2025-12-05 13:47:52' |
| alarmType | String | 报警类型。 - 报警方式为 2 时,不携带 AlarmType 为默认的报警设备报警,携带 AlarmType 取值及对应报警类型如下:1-视频丢失报警;2-设备防拆扣报警;3-存储设备磁盘满报警;4-设备高温报警;5-设备低温报警。 - 报警方式为 5 时,取值如下:1-人工视频报警;2-运动目标检测报警;3-遗留物检测报警;4-物体移除检测报警;5-绊线检测报警;6-人侵检测报警;7-逆行检测报警;8-徘徊检测报警;9-流量统计报警;10-密度检测报警;11-视频异常检测排警;12-快速移动报警;13-图像遮挡报警。 - 报警方式为 6 时,取值如下:1-存储设备磁盘故障报警;2-存储设备风扇故障报警。 |
| alarmTypeDescription | String | 报警类型描述 |
| channelId | String | 国标通道 id |
| createTime | String | 事件消息创建时间 |
| deviceId | String | 报警设备编码或报警中心编码(10 位) |
| deviceName | String | 国标设备配置名称 |
| latitude | Double | 经纬度信息 |
| longitude | Double | 经纬度信息 |
消息示例如下:
{
"productKey": "OfEiaUv******fP",
"deviceName": "330**********001",
"channelNo": "330**********001",
"data": {
"alarmDescription": "",
"alarmMethod": "5",
"alarmMethodDescription": "视频报警",
"alarmPriority": "4",
"alarmPriorityDescription": "四级警情",
"alarmTime": "2025-12-05 13:47:52",
"alarmType": "2",
"alarmTypeDescription": "运动目标检测报警",
"channelId": "330**********001",
"createTime": "2025-12-05 05:47:52",
"deviceId": "330**********001",
"deviceName": "Network Video Recorder",
"latitude": 0.0,
"longitude": 0.0
}
}
国标设备状态变更消息
topic:/gb/status/post
消息字段说明如下:
| 参数 | 类型 | 含义 |
|---|---|---|
| productKey | String | 设备所属产品的唯一标识符 |
| deviceName | String | 设备名称 |
| channelNo | String | 国标通道号 |
| data | JSON | 数据内容 |
| status | String | 设备状态。online:上线。offline:离线。 |
| time | Long | 设备上、下线的时间,自 1970-1-1 起流逝的毫秒值 |
| clientIp | String | 设备公网出口 IP。 |
消息示例如下:
{
"productKey": "OfEiaUv******fP",
"deviceName": "330**********001",
"channelNo": "330**********001",
"data": {
"status": "online",
"time": 1764911597121,
"clientIp": "127.0.0.1"
}
}
国标设备获取本地录像文件列表消息
topic:/gb/record/query/callback
消息字段说明如下:
| 参数 | 类型 | 含义 |
|---|---|---|
| taskId | String | 异步查询设备的任务 id |
| productKey | String | 设备所属产品的唯一标识符 |
| deviceName | String | 设备名称 |
| channelNo | String | 国标通道号 |
| protocol | Integer | 录像回放地址协议:1-RTMP 2-RTSP 3-HLS 4-FLV |
| startTime | Long | 本次查询请求的开始时间 自 1970-1-1 起流逝的毫秒值 |
| endTime | Long | 本次查询请求的结束时间 自 1970-1-1 起流逝的毫秒值 |
| recordList | List | 录像文件列表 |
| url | String | 录像回放地址 |
| internalUrl | String | 录像回放内网地址 |
| stream | String | 录像文件流ID |
| startTime | Long | 录像文件的开始时间 自 1970-1-1 起流逝的毫秒值 |
| endTime | Long | 录像文件的结束时间 自 1970-1-1 起流逝的毫秒值 |
| recordType | Integer | 录像类型:0 计划录像,1 报警录像,2 主动录像 |
| streamType | Integer | 事码流类型:0 主码流,1 辅码流 |
| fileSize | Long | 文件大小 单位:Byte |
消息示例如下:
{
"taskId": "kH********",
"productKey": "OfEiaUv******fP",
"deviceName": "330*********001",
"channelNo": "330**********001",
"protocol": 3,
"startTime": 1764913668072,
"endTime": 1764913868072,
"recordList": [
{
"url": "https://********/....",
"internalUrl": "http://********/...",
"stream": "********",
"startTime": 1764913668072,
"endTime": 1764913868072,
"recordType": 0,
"streamType": 0,
"fileSize": 0
}
]
}
国标设备截图结果消息
topic:/gb/snapshot/callback
消息字段说明如下:
| 参数 | 类型 | 含义 |
|---|---|---|
| id | String | 会话ID,调用设备截图接口时返回 |
| success | Boolean | 截图是否成功 |
| productKey | String | 设备所属产品的唯一标识符 |
| deviceName | String | 设备名称 |
| channelNo | String | 国标通道号 |
| data | JSON | 数据内容 |
| url | String | 图片访问URL,1h过期 |
消息示例如下:
{
"id": "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"success": true,
"productKey": "OfEiaUv******fP",
"deviceName": "330**********001",
"channelNo": "330**********001",
"data": {
"url": "https://xxxx..."
}
}
Pulsar 消息网关
支持系统各类即时消息订阅和转发,例如:订阅设备开门数据进行本地存储或消息推送,甚至是实现和其他系统联动等。
工作原理
消息网关通过 Pulsar 主动推送各种被提前订阅的事件数据,以满足对消息实时性和消息持久化的要求。
Pulsar 是一个支持多租户、高性能、持久化、分布式的服务器到服务器之间消息通讯的解决方案。它提供了可靠的消息传递、数据流处理和事件驱动等功能。Pulsar 最初由雅虎开发,后续由 Apache 软件基金会管理。
消息网关作为消息代理,采用了 Pub/Sub(发布/订阅)的设计模式。该设计模式中:
-
发布:生产者将消息发布到主题,然后消费者可以订阅这些主题,处理传入消息,并在处理完成时发送确认。消息网关为每个主题分配了多个分区,根据分区向消费者分发消息。
-
订阅 :当订阅被创建时,所有的消息都将被消息网关保留。只有在消息处理设备确认消息被成功处理后,保留下来的消息才会被丢弃。一个主题可以由多个消费者订阅,并且当消费者成功处理消息时,消息处理设备需要向消息网关发送确认,以便确认可以丢弃该消息。
消息流程

测试和生产环境
两个环境相互独立,建议你先在测试环境(上图中的客户云平台【测试】)完成调试,验证无误后再迁移到生产环境(上图中的客户云平台【生产】)。如何配置环境见示例代码部分。
- 生产环境:默认情况下,如果前提条件(比如:已经勾选相关配置)都满足时,设备的消息都会打入生产环境消息网关。
- 测试环境(也叫测试通道):测试通道仅会收到测试设备的消息,如需测试,打开该测试设备的测试通道配置 开关。如果已经设置为测试通道,则这些测试设备产生的消息将不会进入到生产环境。如果需要进入到生产环境,则关闭掉该设备的测试通道开关即可。
安全性
-
认证安全:
消息网关系统针对身份认证进行了优化以满足高安全性要求,采用动态令牌机制增强安全,您可忽略实现细节,基于相速提供的 SDK 完成认证。 -
数据安全:
传输安全:基于 SSL 传输业务数据。
基于 Pulsar Java SDK 获取消息
前提条件
已完成产品创建,设备已经可以上报智能事件或物模型自定义的事件,已经设置数据同步。
下载 Pulsar Java SDK
单击下载最新的Pulsar Java SDK ,下载 zip 包至本地,解压缩即可。
SDK集成及接收消息
- 使用 IntelliJ IDEA 打开下载并解压后的源码工程。
- 修改 src/main/java/com/rtcx/open/sdk/example/ConsumerExample.java 文件参数,修改域名和访问秘钥对。
private static final String APP_KEY = "Ks*********************zq";
private static final String APP_SECRET = "g9***************************Ve";
private static final String GATEWAY_URL = "https://gateway.iotrtc.cn";
private static final String MQ_URL = "pulsar+ssl://mqp.iotrtc.cn:6651";
- 修改 src/main/java/com/rtcx/open/sdk/example/ConsumerExample.java 文件参数,修改环境。
MqConsumer mqConsumer = MqConsumer.build()
.mqServiceUrl(MQ_URL)
.gatewayUrl(GATEWAY_URL)
.appKey(APP_KEY)
.appSecret(APP_SECRET)
.env(Env.PROD) //环境,取值:PROD、TEST,默认值:PROD
.messageListener(new MessageListener());
说明
- Env.PROD 代表正式环境。
- Env.TEST 代表测试环境。
- 完整参考示例:src/main/java/com/rtcx/open/sdk/example/ConsumerExample.java:
public class ConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(ConsumerExample.class);
private static final String APP_KEY = "Ks*********************zq";
private static final String APP_SECRET = "g9***************************Ve";
private static final String GATEWAY_URL = "https://gateway.iotrtc.cn";
private static final String MQ_URL = "pulsar+ssl://mqp.iotrtc.cn:6651";
public static void main(String[] args) throws Exception {
MqConsumer mqConsumer = MqConsumer.build()
.mqServiceUrl(MQ_URL)
.gatewayUrl(GATEWAY_URL)
.appKey(APP_KEY)
.appSecret(APP_SECRET)
.env(Env.PROD) //环境,取值:PROD、TEST,默认值:PROD
.messageListener(new MessageListener());
mqConsumer.start();
}
private static class MessageListener implements MqConsumer.IMessageListener {
@Override
public void onMessageArrived(MessageVO messageVO) throws Exception {
try {
String content = messageVO.getContent();
String topic = messageVO.getTopic();
String messageId = messageVO.getMessageId();
logger.info("receive message, topic = {}, messageId = {}, content = {}", topic, messageId, content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
}
}
说明
注意
- 消息最多会保存 1 天,如果 1 天后仍然未被消费,则会被删除。 :::
FAQ
- Q:为什么无法接收到设备信息?
A: 排查步骤如下:- 确认测试设备所属的区域机房,各区域机房的消息是隔离的。
- 确认 Pulsar 客户端的环境/凭证配置正常,确认服务已经启动。
- 确认代码配置的环境参数是否正确,如果是测试环境,则测试设备是否打开测试通道配置开关。
- 确认是否配置了多个消费客户端,如果配置了多个,则只有单个客户端会消费,且是随机的。
- 确认设备是否已经产生了事件,可以通过 DemoApp 消息列表观察。
- 确认控制台上是否已开启消息订阅。