Drasi Sources SDK
什么是Drasi数据源(Source)?Source提供了与系统的连接,Drasi 可以将这些系统视为变化源。source 在 Drasi 中执行三个重要功能:
[*]处理源系统生成的更改日志/源,并将这些更改推送到使用该源作为输入的每个连续查询。
[*]将源更改数据转换为一致的属性图数据模型,以便订阅的 Continuous Queries 可以使用该数据,就像使用 Nodes 和 Relations 的图形一样。对于图形源(如 Gremlin),无需转换。但对于非图形源,例如 PostgreSQL 和 Kubernetes,Source 会转换数据(更多详细信息在各个 Sources 部分中提供)。
[*]为 Continuous Queries 提供一种方法,以便在启动时查询源系统以初始化 Continuous Query 结果的状态。
Drasi Sources SDK 是一个用于实现 Drasi 数据源的多语言开发工具包,目前支持 Java、.NET、Rust 等编程语言。这个 SDK 的主要目的是帮助开发者创建和管理 Drasi 平台的数据源。扩展Drasi Sources的文档参见 https://github.com/drasi-project/docs/blob/main/docs/content/how-to-guides/extend-drasi/implement-a-source.md
每个数据源由两个核心部分组成:
[*]两个主要组件:
a) Source Reactivator(数据源响应器):负责监控和处理数据变化
[*]监听源数据存储的变更流
[*]将数据转换为图结构
[*]将变更推送到持续查询系统
[*]支持状态存储,用于保存游标等信息
b) Source Proxy(数据源代理):负责初始数据的获取和加载 :
[*]处理新的持续查询的初始化
[*]通过查询数据存储获取初始状态
[*]将数据转换为图结构
[*]支持的数据操作:
[*]节点(Node)的创建和管理
[*]关系(Relation)的创建和管理
[*]属性(Properties)的管理
[*]变更事件的处理
[*]控制事件的处理
[*] 特点和优势
[*]多语言支持:提供 Java、.NET、Rust 等多种语言的实现
[*]异步处理:支持异步流式处理数据变更
[*]状态管理:提供状态存储功能,支持游标管理
[*]配置灵活:支持自定义配置属性
[*]容器化部署:支持容器化部署和管理
[*]事件驱动:基于事件驱动架构处理数据变更
SDK 的设计理念是提供一个统一的接口来实现各种数据源的接入,同时保持足够的灵活性以适应不同的使用场景。无论是简单的数据源还是复杂的数据处理系统,都可以通过这个 SDK 来实现与 Drasi 平台的集成。
实现自定义数据源的步骤
第一步:实现Source Proxy(数据源代理)
Source Proxy主要负责在查询部署时获取初始数据。它需要提供一个HTTP服务器,并实现/acquire接口来处理初始数据的加载。
using System.Runtime.CompilerServices;
using System.Text.Json.Nodes;
using Drasi.Source.SDK;
using Drasi.Source.SDK.Models;
using Microsoft.Extensions.Configuration;
var proxy = new SourceProxyBuilder()
.UseBootstrapHandler<BootstrapHandler>()
.Build();
await proxy.StartAsync();
class BootstrapHandler : IBootstrapHandler
{
public BootstrapHandler(IConfiguration configuration)
{
Console.WriteLine($"Connection string: {configuration["connectionString"]}");
}
public async IAsyncEnumerable<SourceElement> Bootstrap(BootstrapRequest request, CancellationToken cancellationToken = default)
{
if (request.NodeLabels.Contains("Person"))
{
yield return new SourceElement("person-1", ["Person"], new JsonObject
{
{ "name", "Alice" },
{ "age", 30 }
});
yield return new SourceElement("person-2", ["Person"], new JsonObject
{
{ "name", "Bob" },
{ "age", 40 }
});
}
if (request.RelationLabels.Contains("Knows"))
{
yield return new SourceElement("1-2", ["Knows"], new JsonObject
{
{ "since", 2010 }
}, "person-1", "person-2");
}
}
}
数据模型
[*]SourceElement 类
[*]支持节点和关系数据
[*]JSON 属性支持
[*]标签系统
第二步:实现Source Reactivator(数据源响应器)
Source Reactivator负责监控数据变化并通过Dapr的pub/sub(发布/订阅)功能将变化事件发送给其他组件。
数据变化事件格式
所有的数据变化事件都需要包含三个必须字段:
[*]op:操作类型
[*]payload:数据负载
[*]ts_ms:时间戳(毫秒)
1. 新增数据事件格式
{ "op": "i",// i 表示 insert(插入) "payload": { "after": { "id": "001", "labels": ["用户", "VIP"], "properties": { "name": "张三", "age": 30 } }, "before": {},// 新增时before为空 "source": { "table": "node",// node表示节点,relation表示关系 "ts_ms": "1676908799000" } }, "ts_ms": 1676908799000}2. 更新数据事件格式
{ "op": "u",// u 表示 update(更新) "payload": { "after": { "id": "001", "labels": ["用户", "VIP"], "properties": { "name": "张三", "age": 31 } }, "before": { "id": "001", "labels": ["用户", "VIP"], "properties": { "name": "张三", "age": 30 } }, "source": { "table": "node", "ts_ms": "1676908799000" } }, "ts_ms": 1676908799000}3. 删除数据事件格式
{ "op": "d",// d 表示 delete(删除) "payload": { "after": {},// 删除时after为空 "before": { "id": "001", "labels": ["用户", "VIP"], "properties": { "name": "张三", "age": 31 } }, "source": { "table": "node", "ts_ms": "1676908799000" } }, "ts_ms": 1676908799000}
注册你的数据源
创建SourceProvider配置
要注册新的数据源类型,你需要创建一个SourceProvider配置文件。这个配置描述了数据源的组件和配置选项。
apiVersion: v1
kind: SourceProvider
name: MySource
spec:
services:
proxy:
image: my-proxy
externalImage: true
dapr:
app-port: "80"
reactivator:
image: my-reactivator
externalImage: true
deprovisionHandler: true
dapr:
app-port: "80"
config_schema:
type: object
properties:
connectionString: # sample config property
type: string
使用数据源
创建Source配置文件来使用已注册的数据源:
apiVersion: v1
kind: Source
name: test-source
spec:
kind: MySource
properties:
connectionString: "my-connection-string"
部署和管理命令
# 注册数据源提供者drasi apply -f source-provider.yaml# 查看所有可用的数据源类型drasi list sourceprovider# 部署具体的数据源实例drasi apply -f source.yaml调试和验证
[*]配置文件验证:
[*]使用Drasi CLI:drasi apply --dry-run -f your-source.yaml
[*]使用VSCode插件:安装Drasi VSCode扩展,可以自动验证配置文件
[*]常见问题排查:
[*]确保Docker镜像已正确推送到镜像仓库
[*]检查服务端口配置是否正确
[*]验证数据库连接信息
[*]查看容器日志排查问题
[*]最佳实践:
[*]在开发环境中充分测试
[*]使用环境变量管理敏感信息
[*]实现健康检查接口
[*]添加详细的日志记录
常见问题
[*]Q: 如何确保数据源的安全性? A: 使用环境变量存储敏感信息,启用SSL连接,实施适当的访问控制。
[*]Q: 数据源支持哪些类型的数据变化监控? A: 支持新增(insert)、更新(update)和删除(delete)三种基本操作的监控。
[*]Q: 如何处理大量数据的初始加载? A: 考虑使用分页加载,实现断点续传,或者使用批量处理机制。
页:
[1]