使用Kafka和MongoDB进行Go异步处理

使用Kafka和MongoDB进行Go异步处理

作者:Melvin Vivas 2018-08-19 09:15:25

开发

后端

其他数据库

Kafka

MongoDB 在我前面的博客文章 “我的第一个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建” 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数据保存到 MongoDB 数据库。

[[240575]]

在我前面的博客文章 “我的***个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建” 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数据保存到 MongoDB 数据库。

在这个示例中,我将数据的保存和 MongoDB 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

如果你有时间去看,我将这个博客文章的整个过程录制到 这个视频中了 🙂

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

rest-kafka-mongo-microservice-draw-io

微服务 1 —— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。

微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。

在你继续之前,我们需要能够去运行这些微服务的几件东西:

  1. 下载 Kafka —— 我使用的版本是 kafka_2.11-1.1.0
  2. 安装 librdkafka —— 不幸的是,这个库应该在目标系统中
  3. 安装 Kafka Go 客户端
  4. 运行 MongoDB。你可以去看我的 以前的文章 中关于这一块的内容,那篇文章中我使用了一个 MongoDB docker 镜像。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

  1. $ cd/<download path>/kafka_2.11-1.1.0
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

  1. $ bin/kafka-server-start.sh config/server.properties

Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。

  1. version:'3'
  2. services:
  3. mongodb:
  4. image: mongo
  5. ports:
  6. -"27017:27017"
  7. volumes:
  8. -"mongodata:/data/db"
  9. networks:
  10. - network1
  11.  
  12. volumes:
  13. mongodata:
  14.  
  15. networks:
  16. network1:

使用 Docker Compose 去运行 MongoDB docker 容器。

  1. docker-compose up

这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

  1. func jobsPostHandler(w http.ResponseWriter, r *http.Request){
  2.  
  3. //Retrieve body from http request
  4. b, err := ioutil.ReadAll(r.Body)
  5. defer r.Body.Close()
  6. if err !=nil{
  7. panic(err)
  8. }
  9.  
  10. //Save data into Job struct
  11. var _job Job
  12. err = json.Unmarshal(b,&_job)
  13. if err !=nil{
  14. http.Error(w, err.Error(),500)
  15. return
  16. }
  17.  
  18. saveJobToKafka(_job)
  19.  
  20. //Convert job struct into json
  21. jsonString, err := json.Marshal(_job)
  22. if err !=nil{
  23. http.Error(w, err.Error(),500)
  24. return
  25. }
  26.  
  27. //Set content-type http header
  28. w.Header().Set("content-type","application/json")
  29.  
  30. //Send back data as response
  31. w.Write(jsonString)
  32.  
  33. }
  34.  
  35. func saveJobToKafka(job Job){
  36.  
  37. fmt.Println("save to kafka")
  38.  
  39. jsonString, err := json.Marshal(job)
  40.  
  41. jobString := string(jsonString)
  42. fmt.Print(jobString)
  43.  
  44. p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost:9092"})
  45. if err !=nil{
  46. panic(err)
  47. }
  48.  
  49. // Produce messages to topic (asynchronously)
  50. topic :="jobs-topic1"
  51. for _, word := range []string{string(jobString)}{
  52. p.Produce(&kafka.Message{
  53. TopicPartition: kafka.TopicPartition{Topic:&topic,Partition: kafka.PartitionAny},
  54. Value:[]byte(word),
  55. },nil)
  56. }
  57. }

这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

  1. func main(){
  2.  
  3. //Create MongoDB session
  4. session := initialiseMongo()
  5. mongoStore.session = session
  6.  
  7. receiveFromKafka()
  8.  
  9. }
  10.  
  11. func receiveFromKafka(){
  12.  
  13. fmt.Println("Start receiving from Kafka")
  14. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  15. "bootstrap.servers":"localhost:9092",
  16. "group.id":"group-id-1",
  17. "auto.offset.reset":"earliest",
  18. })
  19.  
  20. if err !=nil{
  21. panic(err)
  22. }
  23.  
  24. c.SubscribeTopics([]string{"jobs-topic1"},nil)
  25.  
  26. for{
  27. msg, err := c.ReadMessage(-1)
  28.  
  29. if err ==nil{
  30. fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
  31. job := string(msg.Value)
  32. saveJobToMongo(job)
  33. }else{
  34. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  35. break
  36. }
  37. }
  38.  
  39. c.Close()
  40.  
  41. }
  42.  
  43. func saveJobToMongo(jobString string){
  44.  
  45. fmt.Println("Save to MongoDB")
  46. col := mongoStore.session.DB(database).C(collection)
  47.  
  48. //Save data into Job struct
  49. var _job Job
  50. b :=[]byte(jobString)
  51. err := json.Unmarshal(b,&_job)
  52. if err !=nil{
  53. panic(err)
  54. }
  55.  
  56. //Insert job into MongoDB
  57. errMongo := col.Insert(_job)
  58. if errMongo !=nil{
  59. panic(errMongo)
  60. }
  61.  
  62. fmt.Printf("Saved to MongoDB : %s", jobString)
  63.  
  64. }

我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

  1. $ go run rest-kafka-sample.go

我使用 Postman 向微服务 1 发送数据。

Screenshot-2018-04-29-22.20.33

这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 MongoDB。

  1. $ go run kafka-mongo-sample.go

现在,你将在微服务 2 上看到消费的数据,并将它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

检查一下数据是否保存到了 MongoDB。如果有数据,我们成功了!

Screenshot-2018-04-29-22.26.39

完整的源代码可以在这里找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice 

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/306928.html<

(0)
运维的头像运维
上一篇2025-05-26 21:33
下一篇 2025-05-26 21:34

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注