Dapr 入门教程之发布订阅

Dapr 入门教程之发布订阅

作者:k8s技术圈 2022-09-19 16:08:31

云计算

云原生 Dapr 使用可插拔的消息总线来支持发布-订阅,并将消息传递给 CloudEvents(一个 CNCF 项目) 作为通用的事件信封格式,以提高连接服务的互操作性。

前面我们了解了如果在 Dapr 下面进行服务调用,以及最简单的状态管理,本节我们来了解如何启用 Dapr 的发布/订阅模式,发布者将生成特定主题的消息,而订阅者将监听特定主题的信息。

  • 使用发布服务,开发人员可以重复发布消息到一个主题上。
  • Pub/sub 组件对这些消息进行排队处理。
  • 该主题订阅者将从队列中获取到消息并处理他们。

接下来我们使用的这个示例包含一个发布者:

  • React 前端消息生成器

包含另外 3 个消息订阅者:

  • Node.js 订阅者
  • Python 订阅者
  • C# 订阅者

Dapr 使用可插拔的消息总线来支持发布-订阅,并将消息传递给 CloudEvents(一个 CNCF 项目) 作为通用的事件信封格式,以提高连接服务的互操作性。

我们这里将使用 Redis Streams(在 Redis version = > 5 中启用),当然也可以使用 RabbitMQ、Kafka 等中间件。下图是用来说明组件之间是如何在本地模式下互相连接的。

dapr pub/sub

本地初始化

Dapr 允许你将相同的微服务从本地机器部署到云环境中去,这里为了和大家说明这种便利性,我们先在本地部署这个实例项目,然后再将其部署到 Kubernetes 环境中去。

要在本地使用 Dapr 服务,需要先在本地初始化 Dapr:

$daprinit

由于某些网络原因使用上面的命令可能并不能初始化成功,我们可以使用离线的方式进行安装,前往 https://github.com/dapr/installer-bundle/releases 下载对应系统的 Bundle 包,然后解压,比如我这里是 Mac M1,使用下面的命令下载:

$wgethttps://github.91chi.fun/https://github.com/dapr/installer-bundle/releases/download/v1.8.4/daprbundle_darwin_arm64.tar.gz
$tar-xvfdaprbundle_darwin_arm64.tar.gz
xdaprbundle/
xdaprbundle/README.md
xdaprbundle/dapr
xdaprbundle/details.json
xdaprbundle/dist/
xdaprbundle/dist/daprd_darwin_arm64.tar.gz
xdaprbundle/dist/dashboard_darwin_arm64.tar.gz
xdaprbundle/dist/placement_darwin_arm64.tar.gz
xdaprbundle/docker/
xdaprbundle/docker/daprio-dapr-1.8.4.tar.gz

然后我们可以重新使用下面的命令进行初始化:

$daprinit--from-dirdaprbundle/
Makingthejumptohyperspace...
Localbundleinstallationusing--from-dirflagiscurrentlyapreviewfeatureandissubjecttochange. ItisonlyavailablefromCLIversion1.7onwards.
ℹ️Installingruntimeversion1.8.4
Extractingbinariesandsettingupcomponents...
Daprruntimeinstalledto/Users/cnych/.dapr/bin, youmayrunthefollowingtoaddittoyourpathifyouwanttorundaprddirectly:
exportPATH=$PATH:/Users/cnych/.dapr/bin
8d7366c22fd8: Loadinglayer [==================================================>] 3.697MB/3.697MB
61f7f94319f6: Loadinglayer [==================================================>] 238.6MB/238.6MB
Extractingbinariesandsettingupcomponents... Loadedimage: daprio/dapr:1.8.4
Extractingbinariesandsettingupcomponents...
Extractedbinariesandcompletedcomponentssetup.
ℹ️daprdbinaryhasbeeninstalledto/Users/cnych/.dapr/bin.
ℹ️dapr_placementcontainerisrunning.
ℹ️Use`dockerps`tocheckrunningcontainers.
$daprversion
CLIversion: 1.8.0
Runtimeversion: 1.8.4

默认会启用 ​​zipkin​​​ 这个 tracing 服务,使用上面的命令初始化如果没有对应的容器,则可以使用 ​​docker run --name dapr_zipkin -d -p 9411:9411 dockerproxy.com/openzipkin/zipkin​​​ 启动该服务。同样也需要运行一个 Redis 服务:​​docker run --name dapr_redis -d -p 6379:6379 dockerproxy.com/redislabs/rejson​​。

dapr containers

消息订阅服务

这里我们还是使用前面使用的 quickstarts 这个项目,克隆项目到本地:

git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git

进入 tutorials/pub_sub 目录下面:

  pub-sub git:(622b7d9) ls
README.md deploy makefile message_b.json node-subscriber react-form
csharp-subscriber img message_a.json message_c.json python-subscriber

运行 Node 消息订阅服务

首先我们使用 Dapr 运行 node 消息订阅服务,导航到 node-subscriber 目录,安装依赖:

$ cd node-subscriber
$ npm install # 或者 yarn

执行如下所示命令运行 node 消息订阅服务:

$ dapr run --app-id node-subscriber --app-port3000node app.js
ℹ️ Starting Dapr with id node-subscriber. HTTP Port: 50728. gRPC Port: 50729
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93 app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] app is subscribed to the following topics: [A B] through pubsub=pubsub app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] dapr initialized. Status: Running. Init Elapsed 312.69599999999997ms app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] placement tables updated, version: 0app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
ℹ️ Updating metadata for app command: node app.js
You're up and running! Both Dapr and your app logs will appear here.

上面命令中的 app-id 是微服务的唯一标识符,–app-port 是 Node 应用程序运行的端口,最后,运行应用程序的命令是 node app.js。

运行 Python 消息订阅服务

接下来使用 Dapr 运行 Python 消息订阅服务,导航到 python-subscriber 目录:

$cdpython-subscriber

安装应用依赖:

$pip3install-rrequirements.txt

同样再次使用 dapr run 来运行该订阅服务:

$daprrun--app-idpython-subscriber--app-port5001python3app.py
ℹ️StartingDaprwithidpython-subscriber. HTTPPort: 55508.gRPCPort: 55509
INFO[0000] startingDaprRuntime--version1.8.4--commit18575823c74318c811d6cd6f57ffac76d5debe93app_id=python-subscriberinstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
INFO[0000] loglevelsetto: infoapp_id=python-subscriberinstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
INFO[0000] enabledgRPCmetricsmiddlewareapp_id=python-subscriberinstance=MBP2022.localscope=dapr.runtime.grpc.internaltype=logver=1.8.4
INFO[0000] internalgRPCserverisrunningonport55514app_id=python-subscriberinstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
INFO[0000] applicationprotocol: http. waitingonport5001.Thiswillblockuntiltheappislisteningonthatport. app_id=python-subscriberinstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
INFO[0000] applicationdiscoveredonport5001app_id=python-subscriberinstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
WARN[0000] [DEPRECATIONNOTICE] Addingadefaultcontenttypetoincomingserviceinvocationrequestsisdeprecatedandwillberemovedinthefuture. Seehttps://docs.dapr.io/operations/support/support-preview-features/ for more details. You can opt into the new behavior today by setting the configuration option `ServiceInvocation.NoDefaultContentType` to true. app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
==APP==*ServingFlaskapp"app" (lazyloading)
==APP==*Environment: production
==APP==WARNING: Thisisadevelopmentserver. Donotuseitinaproductiondeployment.
==APP==UseaproductionWSGIserverinstead.
==APP==*Debugmode: off
==APP==*Runningonhttp://127.0.0.1:5001/ (Press CTRL+C to quit)
ℹ️Updatingmetadataforappcommand: python3app.py
You're up and running! Both Dapr and your app logs will appear here.

由于我们这里没有 C# 环境,所以只运行 Node 和 Python 这两个消息订阅服务了。

消息发布服务

接下来我们来运行 React 这个前端消息发布服务,同样先导航到 react-form 项目目录下面:

$cdreact-form

然后执行下面的命令安装依赖并构建服务:

$npmrunbuildclient
$npminstall

构建完成后可以使用下面的 dapr 命令来启动该前端服务:

$daprrun--app-idreact-form--app-port8080npmrunstart
ℹ️StartingDaprwithidreact-form. HTTPPort: 57303.gRPCPort: 57304
INFO[0000] startingDaprRuntime--version1.8.4--commit18575823c74318c811d6cd6f57ffac76d5debe93app_id=react-forminstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
# ......
==APP==
==APP==>react-form@1.0.0start
==APP==>nodeserver.js
==APP==
==APP==Listeningonport8080!
INFO[0000] applicationdiscoveredonport8080app_id=react-forminstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
# ......
INFO[0000] daprinitialized. Status: Running. InitElapsed760.39msapp_id=react-forminstance=MBP2022.localscope=dapr.runtimetype=logver=1.8.4
ℹ️Updatingmetadataforappcommand: npmrunstart
You're up and running! Both Dapr and your app logs will appear here.

INFO[0001] placementtablesupdated, version: 0app_id=react-forminstance=MBP2022.localscope=dapr.runtime.actor.internal.placementtype=logver=1.8.4

当看到 == APP == Listening on port 8080! 这样的日志时,表示应用启动成功了。然后我们就可以在浏览器中访问 http://localhost:8080 访问前端应用了。

前端页面

比如现在我们选择消息类型 A,然后随便输入一些消息内容,点击 Submit 发送,然后观察上面的 Node 和 Python 这两个消息订阅者服务的日志。

选择一个主题,输入一些文字,然后发送一条信息!观察通过你们各自的 Dapr 的日志。

Dapr 消息订阅发布服务

注意,Node 订阅者接收类型为 A 和 B 的消息,而 Python 订阅者接收类型为 A和 C 的消息,所以注意每个控制台窗口的日志显示。

此外 Dapr CLI 提供了一个机制来发布消息用于测试,比如我们可以使用如下命令来发布一条消息:

$daprpublish--publish-app-idreact-form--pubsubpubsub--topicA--data-filemessage_a.json

dapr cli publish

到这里我们就完成了使用 Dapr 来进行消息订阅发布的功能演示。

在 Kubernetes 中运行

上面我们是将演示服务在本地部署的,我们知道使用 Dapr 开发的服务是和平台没关系的,可以很轻松迁移到云环境,比如现在我们再将上面的示例应用部署到 Kubernetes 集群中。

要在 Kubernetes 中运行相同的代码,首先需要设置 Redis 存储,然后部署微服务,将使用相同的微服务,但最终架构有所不同:

运行在K8s

前面我们已经使用 Helm 安装了 bitnami 下面的 redis 应用:

$helmrepoaddbitnamihttps://charts.bitnami.com/bitnami
$helmrepoupdate
$helminstallredisbitnami/redis

有了 Redis 服务过后,接着我们需要创建一个发布订阅的 Component 组件,前文是创建的一个使用 Redis 的状态管理组件,对应的组件资源清单如下所示:

#deploy/redis.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
#Thesesettingswillworkoutoftheboxifyouuse`helminstall
#bitnami/redis`. Ifyouhaveyourownsetup, replace
#`redis-master:6379`withyourownRedismasteraddress, andthe
#RedispasswordwithyourownSecret's name. For more information,
#seehttps://docs.dapr.io/operations/components/component-secrets .
-name: redisHost
value: redis-master:6379
-name: redisPassword
secretKeyRef:
name: redis
key: redis-password
auth:
secretStore: kubernetes

直接应用上面的资源清单即可:

$kubectlapply-fdeploy/redis.yaml
component.dapr.io/pubsubcreated
$kubectlgetcomponents
NAMEAGE
pubsub26s
statestore45h

现在我们就有了一个使用 Redis 为中间件的发布订阅组件了,注意上面对象的类型为 ​​pubsub.redis​​。

redis pubsub

接着我们就可以部署 Python、Node 和 React-form 这 3 个微服了:

$kubectlapply-fdeploy/node-subscriber.yaml
$kubectlapply-fdeploy/python-subscriber.yaml
$kubectlapply-fdeploy/react-form.yaml

部署后查看 Pod 的状态:

$kubectlgetpods
NAMEREADYSTATUSRESTARTSAGE
node-subscriber-5b5777c785-z8jzn2/2Running030m
python-subscriber-76d9fc6c87-ffj7r2/2Running030m
react-form-68db4b7777-7qmtj2/2Running030m

react-form 这个微服务会通过一个 LoadBalancer 类型的 Service 来对外暴露服务:

$kubectlgetsvc
NAMETYPECLUSTER-IPEXTERNAL-IPPORT(S) AGE
node-subscriber-daprClusterIPNone<none>80/TCP,50001/TCP,50002/TCP,9090/TCP31m
python-subscriber-daprClusterIPNone<none>80/TCP,50001/TCP,50002/TCP,9090/TCP31m
react-formLoadBalancer10.110.199.146192.168.0.5180:32510/TCP30m
react-form-daprClusterIPNone<none>80/TCP,50001/TCP,50002/TCP,9090/TCP30m

然后我们就可以通过分配的 EXTERNAL-IP 访问前端服务了。同样在前端页面发送几个不同的消息通知,然后使用 kubectl logs 观察 Node 和 Python 订阅服务的日志。

$kubectllogs--selectorapp=node-subscriber-cnode-subscriber
$kubectllogs--selectorapp=python-subscriber-cpython-subscriber

pub-sub on K8s

如何工作

现在,我们已经在本地和 Kubernetes 中运行了订阅发布示例应用,接下来我们来分析下这是如何工作的。该应用程序分为两个订阅者和一个发布者。

Node 消息订阅服务

重新导航到 node-scriber 目录并查看 Node.js 订阅者代码 app.js,该服务通过 Express 暴露了三个 API 端点。第一个是 GET 端点:

app.get("/dapr/subscribe", (_req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "A",
route: "A",
},
{
pubsubname: "pubsub",
topic: "B",
route: "B",
},
]);
});

该段代码是告诉 Dapr 要订阅 pubsub 这个组件的哪些主题,其中的 ​​route​​ 表示使用路由到那个端点来处理消息,当部署(本地或 Kubernetes)时,Dapr 将调用服务以确定它是否订阅了任何内容。其他两个端点是后端点:

app.post("/A", (req, res) => {
console.log("A: ", req.body.data.message);
res.sendStatus(200);
});

app.post("/B", (req, res) => {
console.log("B: ", req.body.data.message);
res.sendStatus(200);
});

这两个端点处理来自每个主题类型的消息,我们这里只是记录消息,当然在更复杂的应用程序中,这里就是需要处理业务逻辑的地方了。

此外我们也可以直接通过创建一个 Subscription 的对象来声明在哪些服务里面来订阅组件中的哪些主题。

Python 消息订阅服务

同样导航到 python-subscriber 目录,查看 Python 订阅服务的代码文件 app.py。与 Node.js 订阅者一样,我们暴露了三个 API 端点,只是这里使用的是 flask,第一个是 GET 端点:

@app.route('/dapr/subscribe', methods=['GET'])
defsubscribe():
subscriptions= [{
'pubsubname': 'pubsub', 'topic': 'A', 'route': 'A'
}, {
'pubsubname': 'pubsub', 'topic': 'C', 'route': 'C'
}]
returnjsonify(subscriptions)

同样的方式,这是告诉 Dapr 要订阅 pubsub 组件的哪些主题,这里我们订阅的组件名为 pubsub 的,主题为 A 和 C,这些主题的消息通过其他两个路由进行处理:

@app.route('/A', methods=['POST'])
defa_subscriber():
print(f'A: {request.json}', flush=True)
print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic']), flush=True)
returnjson.dumps({'success':True}), 200, {'ContentType':'application/json'}

@app.route('/C', methods=['POST'])
defc_subscriber():
print(f'C: {request.json}', flush=True)
print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic']), flush=True)
returnjson.dumps({'success':True}), 200, {'ContentType':'application/json'}

React 前端应用

上面是两个订阅服务,接下来查看下发布者,我们的发布者分为客户端和服务器

客户端是一个简单的单页 React 应用程序,使用 Create React App 启动,相关的客户端代码位于react-form/client/src/MessageForm.js,当用户提交表单时,将使用最新的聚合 JSON 数据更新 React 状态。默认情况下,数据设置为:

{
messageType: "A",
message: ""
};

提交表单后,聚合的 JSON 数据将发送到服务器:

fetch("/publish", {
headers: {
Accept: "application/json",
"Content-Type": "application/json",
},
method: "POST",
body: JSON.stringify(this.state),
});

服务端是一个典型的 express 应用程序,它暴露了一个 POST 端点:/publish。这样可以从客户端接收请求,并根据 Dapr 发布它们。Express 内置的 JSON 中间件函数用于解析传入请求中的 JSON:

app.use(express.json());

这样我们可以获取到提交的 messageType,可以确定使用哪个主题来发布消息。要使用 Dapr 来发布消息,同样也是直接使用 Dapr 提供的 API 端点 http://localhost:<DAPR_URL>/publish/<PUBSUB_NAME>/<TOPIC> 即可,根据获取到的数据构建 Dapr 消息发布的 URL,提交 JSON 数据,POST 请求还需要在成功完成后返回响应中的成功代码。

constpublishUrl=`${daprUrl}/publish/${pubsubName}/${req.body?.messageType}`;
awaitaxios.post(publishUrl, req.body);
returnres.sendStatus(200);

daprUrl 的地址所在的端口可以用下面的代码来获取:

constdaprUrl=`http://localhost:${process.env.DAPR_HTTP_PORT || 3500}/v1.0`;

默认情况下,Dapr 在 3500 上运行,但如果我们在本地运行 Dapr 并将其设置为其他端口(使用 CLI run 命令中的 –app-port 标志),则该端口将作为环境变量注入应用程序。

此外服务端还通过将默认主页 / 路由请求转发到构建的客户端代码来托管 React 应用程序本身:

app.get("/", function (_req, res) {
res.sendFile(path.join(__dirname, "client/build", "index.html"));
});

所以我们可以直接通过服务端来访问到前端页面。

发布-订阅模式是我们微服务开发中非常重要的一个模式,可以用来实现高可伸缩性和松耦合。发布订阅通常用于需要高度可伸缩的大型应用程序,发布和订阅应用程序通常比传统的 client/server 应用程序具有更好的伸缩性。Pub-sub 允许我们完全解耦组件,发布者不必知道他们的任何订阅者,订阅者也不必知道发布者。这使得开发人员可以编写更精简的微服务,而不会直接依赖彼此。

从上面的示例可以看出 Dapr 中使用发布订阅模式进行开发就完全变成了面向 ​​localhost​​ 编程了。

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/230968.html<

(0)
运维的头像运维
上一篇2025-04-19 11:47
下一篇 2025-04-19 11:49

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注