数据同步开发指南
前提条件
开发者控制台已经开启数据同步,同时设置数据同步包括推送配置。
查看网关地址及消费凭证
认证身份信息需要使用 MQ_URL、GATEWAY_URL、appKey和appSecret用于鉴权。见:sdk集成及接收消息示例
- 其中,MQ_URL 是连接节点,具体取值如下表所示:
区域 | MQ_URL |
---|---|
中国内地 | pulsar+ssl://mqp.iotrtc.cn:6651 |
- GATEWAY_URL,参见 API网关协议与地址
- 身份认证需要使用appKey和appSecret,该信息可以从控制台 数据同步 中获取。
消息格式
物的属性变更消息
topic:/${productKey}/${deviceName}/thing/event/property/post
消息字段说明如下。
参数 | 类型 | 含义 |
---|---|---|
deviceType | String | 设备所属品类 |
gmtCreate | Long | 数据流转消息产生时间,自1970-1-1起流逝的毫秒值 |
iotId | String | 设备的唯一ID |
productKey | String | 设备所属品类 |
deviceName | String | 设备所属品类 |
items | JSON | 变更的数据集 |
${attribute} | String | 发生变更的属性,具体取值由具体情况确定 |
value | 具体数据类型由具体情况确定 | 变更值 |
time | Long | 设备属性发生变化的时间,自1970-1-1起流逝的毫秒值 |
消息示例如下。
{
"deviceType": "常电摄像机",
"iotId": "OfEiaUv**********fXQzu",
"productKey": "OfEiaUv******fP",
"gmtCreate": 1737083508000,
"deviceName": "TSV**********fXQzu",
"items": {
"StorageTotalCapacity": {
"value": 0,
"time": 1737083507980
}
}
}
物的事件变更消息
topic:/${productKey}/${deviceName}/thing/event/{tsl.event.identifier}/post
消息字段说明如下。
参数 | 类型 | 含义 |
---|---|---|
deviceType | String | 设备所属品类 |
iotId | String | 设备的唯一ID |
productKey | String | 设备所属产品的唯一标识符 |
deviceName | String | 设备名称 |
identifier | String | 事件标识符,对应事件的identifier |
name | String | 事件名称 |
type | String | 事件类型 |
time | Long | 设备上报value对应的时间,自1970-1-1起流逝的毫秒值 |
value | JSON | 变更的事件属性列表:key-value键值对 |
key | String | 属性key |
value | 具体数据类型由具体情况确定 | 属性取值 |
消息示例如下:
{
"deviceType": "常电摄像机",
"identifier": "Open****",
"iotId": "OfEiaUv**********fXQzu",
"name": "开****通知",
"time": 1737083508000,
"type": "info",
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"value": {
"KeyID": "x8******DY",
"Method": "fin******t"
}
}
物的状态变更消息
topic:/as/mqtt/status/{productKey}/{deviceName}
消息字段说明如下:
参数 | 类型 | 含义 |
---|---|---|
status | String | 设备状态。online:上线。offline:离线。 |
iotId | String | 设备的唯一ID |
offlineReasonCode | Integer | 设备下线时,返回的错误码。 1911:设备跟云端之间TCP连接断开,导致设备离线。 |
productKey | String | 设备所属产品的唯一标识符 |
deviceName | String | 设备名称 |
time | Long | 设备上、下线的时间,自1970-1-1起流逝的毫秒值 |
clientIp | String | 设备公网出口IP。 |
消息示例如下:
{
"status": "offline",
"iotId": "OfEiaUv**********fXQzu",
"offlineReasonCode": 1911,
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"time": 1737083508000,
"clientIp": "192.0.2.1"
}
用户绑定变更消息
用户绑定/解绑设备产生的回流消息,用于同步用户与设备的绑定、解绑。
topic:/${productKey}/${deviceName}/thing/awss/enrollee/user
消息字段说明如下:
参数 | 类型 | 含义 |
---|---|---|
bind | bool | true-绑定;false-解绑 |
productKey | String | 设备所属产品的唯一标识符 |
deviceName | String | 设备名称 |
iotId | String | 设备的唯一ID |
messageCreateTime | Long | 消息创建时间 |
identityInfos | list | 用户信息列表 |
identityId | String | 用户身份ID |
owned | Integer | 拥有标记 0:分享者 1:拥有者 |
params | Map | 扩展参数 |
消息示例如下:
{
"bind": true,
"productKey": "OfEiaUv******fP",
"deviceName": "TSV**********fXQzu",
"iotId": "OfEiaUv**********fXQzu",
"messageCreateTime": 1737083508000,
"identityInfos": [
{
"identityId": "1121***8",
"owned": 1
}
],
"params": {
}
}
Pulsar消息网关
支持系统各类即时消息订阅和转发,例如:订阅设备开门数据进行本地存储或消息推送,甚至是实现和其他系统联动等。
工作原理
消息网关通 过 Pulsar 主动推送各种被提前订阅的事件数据,以满足对消息实时性和消息持久化的要求。
Pulsar 是一个支持多租户、高性能、持久化、分布式的服务器到服务器之间消息通讯的解决方案。它提供了可靠的消息传递、数据流处理和事件驱动等功能。Pulsar最初由雅虎开发,后续由Apache软件基金会管理。
消息网关作为消息代理,采用了Pub/Sub(发布/订阅)的设计模式。该设计模式中:
-
发布:生产者将消息发布到主题,然后消费者可以订阅这些主题,处理传入消息,并在处理完成时发送确认。消息网关为每个主题分配了多个分区,根据分区向消费者分发消息。
-
订阅:当订阅被创建时,所有的消息都将被消息网关保留。只有在消息处理设备确认消息被成功处理后,保留下来的消息才会被丢弃。一个主题可以由多个消费者订阅,并且当消费者成功处理消息时,消息处理设备需要向消息网关发送确认,以便确认可以丢弃该消息。
消息流程
安全性
-
认证安全:
消息网关系统针对身份认证进行了优化以满足高安全性要求,采用动态令牌机制增强安全,您可忽略实现细节,基于相速提供的SDK完成认证。 -
数据安全:
传输安全:基于SSL传输业务数据。
基于Pulsar Java SDK获取消息
前提条件
已完成产品创建,设备已经可以上报智能事件或物模型自定义的事件,已经设置数据同步。
下载Pulsar Java SDK
单击下载最新的Pulsar Java SDK,下载zip包至本地,解压缩即可。
SDK集成及接收消息
集成代码示例如下:
public class ConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(ConsumerExample.class);
private static final String APP_KEY = "appKey";
private static final String APP_SECRET = "appSecret";
private static final String GATEWAY_URL = "xxxxx";
private static final String MQ_URL = "xxxxxx";
public static void main(String[] args) throws Exception {
MqConsumer mqConsumer = MqConsumer.build()
.mqServiceUrl(MQ_URL)
.gatewayUrl(GATEWAY_URL)
.appKey(APP_KEY)
.appSecret(APP_SECRET)
.messageListener(new MessageListener());
mqConsumer.start();
}
private static class MessageListener implements MqConsumer.IMessageListener {
@Override
public void onMessageArrived(MessageVO messageVO) throws Exception {
try {
String content = messageVO.getContent();
String topic = messageVO.getTopic();
String messageId = messageVO.getMessageId();
logger.info("receive message, topic = {}, messageId = {}, content = {}", topic, messageId, content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
}
}
- 消息最多会保存 1 天,如果 1 天后仍然未被消费,则会被删除。
FAQ
- Q:为什么无法接收到设备信息?
A: 排查步骤如下:- 确认设备是否已经产生了事件,可以通过DemoApp消息列表观察。
- 确认控制台上是否已开启消息订阅。
- 确认Pulsar客户端的环境/凭证配置正常,确认服务已经启动。