跳到主要内容

数据同步开发指南

更新时间:2025-03-20 19:47:47

前提条件

开发者控制台已经开启数据同步,同时设置数据同步包括推送配置。

查看网关地址及消费凭证

认证身份信息需要使用 MQ_URL、GATEWAY_URL、appKey和appSecret用于鉴权。见:sdk集成及接收消息示例

  1. 其中,MQ_URL 是连接节点,具体取值如下表所示:
区域MQ_URL
中国内地pulsar+ssl://mqp.iotrtc.cn:6651
  1. GATEWAY_URL,参见 API网关协议与地址
  2. 身份认证需要使用appKey和appSecret,该信息可以从控制台 数据同步 中获取。

消息格式

物的属性变更消息

topic:/${productKey}/${deviceName}/thing/event/property/post

消息字段说明如下。

参数类型含义
deviceTypeString设备所属品类
gmtCreateLong数据流转消息产生时间,自1970-1-1起流逝的毫秒值
iotIdString设备的唯一ID
productKeyString设备所属品类
deviceNameString设备所属品类
itemsJSON变更的数据集
${attribute}String发生变更的属性,具体取值由具体情况确定
value具体数据类型由具体情况确定变更值
timeLong设备属性发生变化的时间,自1970-1-1起流逝的毫秒值

消息示例如下。

{
"deviceType": "常电摄像机",
"iotId": "OfEiaUv**********fXQzu",
"productKey": "OfEiaUv******fP",
"gmtCreate": 1737083508000,
"deviceName": "TSV**********fXQzu",
"items": {
"StorageTotalCapacity": {
"value": 0,
"time": 1737083507980
}
}
}

物的事件变更消息

topic:/${productKey}/${deviceName}/thing/event/{tsl.event.identifier}/post

消息字段说明如下。

参数类型含义
deviceTypeString设备所属品类
iotIdString设备的唯一ID
productKeyString设备所属产品的唯一标识符
deviceNameString设备名称
identifierString事件标识符,对应事件的identifier
nameString事件名称
typeString事件类型
timeLong设备上报value对应的时间,自1970-1-1起流逝的毫秒值
valueJSON变更的事件属性列表:key-value键值对
keyString属性key
value具体数据类型由具体情况确定属性取值

消息示例如下:

{
"deviceType": "常电摄像机",
"identifier": "Open****",
"iotId": "OfEiaUv**********fXQzu",
"name": "开****通知",
"time": 1737083508000,
"type": "info",
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"value": {
"KeyID": "x8******DY",
"Method": "fin******t"
}
}

物的状态变更消息

topic:/as/mqtt/status/{productKey}/{deviceName}

消息字段说明如下:

参数类型含义
statusString设备状态。online:上线。offline:离线。
iotIdString设备的唯一ID
offlineReasonCodeInteger设备下线时,返回的错误码。
1911:设备跟云端之间TCP连接断开,导致设备离线。
productKeyString设备所属产品的唯一标识符
deviceNameString设备名称
timeLong设备上、下线的时间,自1970-1-1起流逝的毫秒值
clientIpString设备公网出口IP。

消息示例如下:

{
"status": "offline",
"iotId": "OfEiaUv**********fXQzu",
"offlineReasonCode": 1911,
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"time": 1737083508000,
"clientIp": "192.0.2.1"
}

用户绑定变更消息

用户绑定/解绑设备产生的回流消息,用于同步用户与设备的绑定、解绑。

topic:/${productKey}/${deviceName}/thing/awss/enrollee/user

消息字段说明如下:

参数类型含义
bindbooltrue-绑定;false-解绑
productKeyString设备所属产品的唯一标识符
deviceNameString设备名称
iotIdString设备的唯一ID
messageCreateTimeLong消息创建时间
identityInfoslist用户信息列表
identityIdString用户身份ID
ownedInteger拥有标记
0:分享者
1:拥有者
paramsMap扩展参数

消息示例如下:

{
"bind": true,
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"iotId": "OfEiaUv**********fXQzu",
"messageCreateTime": 1737083508000,
"identityInfos": [
{
"identityId": "1121***8",
"owned": 1
}
],
"params": {
}
}

Pulsar消息网关

支持系统各类即时消息订阅和转发,例如:订阅设备开门数据进行本地存储或消息推送,甚至是实现和其他系统联动等。

工作原理

消息网关通过 Pulsar 主动推送各种被提前订阅的事件数据,以满足对消息实时性和消息持久化的要求。

信息

Pulsar 是一个支持多租户、高性能、持久化、分布式的服务器到服务器之间消息通讯的解决方案。它提供了可靠的消息传递、数据流处理和事件驱动等功能。Pulsar最初由雅虎开发,后续由Apache软件基金会管理。

消息网关作为消息代理,采用了Pub/Sub(发布/订阅)的设计模式。该设计模式中:

  • 发布:生产者将消息发布到主题,然后消费者可以订阅这些主题,处理传入消息,并在处理完成时发送确认。消息网关为每个主题分配了多个分区,根据分区向消费者分发消息。

  • 订阅:当订阅被创建时,所有的消息都将被消息网关保留。只有在消息处理设备确认消息被成功处理后,保留下来的消息才会被丢弃。一个主题可以由多个消费者订阅,并且当消费者成功处理消息时,消息处理设备需要向消息网关发送确认,以便确认可以丢弃该消息。

消息流程

图片

安全性

  • 认证安全:
    消息网关系统针对身份认证进行了优化以满足高安全性要求,采用动态令牌机制增强安全,您可忽略实现细节,基于相速提供的SDK完成认证。

  • 数据安全:
    传输安全:基于SSL传输业务数据。

基于Pulsar Java SDK获取消息

前提条件

已完成产品创建,设备已经可以上报智能事件或物模型自定义的事件,已经设置数据同步。

下载Pulsar Java SDK

单击下载最新的Pulsar Java SDK,下载zip包至本地,解压缩即可。

SDK集成及接收消息

集成代码示例如下:

public class ConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(ConsumerExample.class);

private static final String APP_KEY = "appKey";
private static final String APP_SECRET = "appSecret";
private static final String GATEWAY_URL = "xxxxx";
private static final String MQ_URL = "xxxxxx";

public static void main(String[] args) throws Exception {
MqConsumer mqConsumer = MqConsumer.build()
.mqServiceUrl(MQ_URL)
.gatewayUrl(GATEWAY_URL)
.appKey(APP_KEY)
.appSecret(APP_SECRET)
.messageListener(new MessageListener());
mqConsumer.start();
}

private static class MessageListener implements MqConsumer.IMessageListener {
@Override
public void onMessageArrived(MessageVO messageVO) throws Exception {
try {
String content = messageVO.getContent();
String topic = messageVO.getTopic();
String messageId = messageVO.getMessageId();
logger.info("receive message, topic = {}, messageId = {}, content = {}", topic, messageId, content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
}
}
  1. topic定义详见消息格式中topic定义
  2. content内容详见消息格式中消息字段说明
注意
  • 消息最多会保存 1 天,如果 1 天后仍然未被消费,则会被删除。

FAQ

  1. Q:为什么无法接收到设备信息?
    A: 排查步骤如下:
    1. 确认设备是否已经产生了事件,可以通过DemoApp消息列表观察。
    2. 确认控制台上是否已开启消息订阅。
    3. 确认Pulsar客户端的环境/凭证配置正常,确认服务已经启动。