GO 实现高并发高可用分布式系统:Log微服务的实现

GO 实现高并发高可用分布式系统:Log微服务的实现

作者:陈屹 2022-01-10 19:45:40

开发

架构

分布式 在大数据时代,具备高并发,高可用,理解微服务系统设计的人员需求很大,如果你想从事后台开发,在JD的描述中最常见的要求就是有所谓的“高并发”系统开发经验。

本文转载自微信公众号「Coding迪斯尼」,作者陈屹。转载本文请联系Coding迪斯尼公众号。

在大数据时代,具备高并发,高可用,理解微服务系统设计的人员需求很大,如果你想从事后台开发,在JD的描述中最常见的要求就是有所谓的“高并发”系统开发经验。但我发现在市面上并没有直接针对“高并发”,“高可用”的教程,你搜到的资料往往都是只言片语,要不就是阐述那些令人摸不着头脑的理论。但是技术的掌握必须从实践中来,我找了很久发现很少有指导人动手实践基于微服务的高并发系统开发,因此我希望结合自己的学习和实践经验跟大家分享一下这方面的技术,特别是要强调具体的动手实践来理解和掌握分布式系统设计的理论和技术。

所谓“微服务”其实没什么神奇的地方,它只不过是把我们原来聚合在一起的模块分解成多个独立的,基于服务器程序存在的形式,假设我们开发的后台系统分为日志,存储,业务逻辑,算法逻辑等模块,以前这些模块会聚合成一个整体形成一个复杂庞大的应用程序:

这种方式存在很多问题,第一是过多模块糅合在一起会使得系统设计过于复杂,因为模块直接存在各种逻辑耦合,这使得随着时间的推移,系统的开发和维护变得越来越困难。第二是系统越来越脆弱,只要其中一个模块发送错误或奔溃,整个系统可能就会垮塌。第三是可扩展性不强,系统很难通过硬件性能的增强而实现相应扩展。

要实现高并发,高可用,其基本思路就是将模块拆解,然后让他们成为独立运行的服务器程序,各个模块之间通过消息发送的方式完成配合:

这种模式的好处在于:1,模块之间解耦合,一个模块出问题对整个系统影响很小。2,可扩展,高可用,我们可以将模块部署到不同服务器上,当流量增加,我们只要简单的增加服务器数量就能使得系统的响应能力实现同等扩展。3,鲁棒性增强,由于模块能备份多个,其中一个模块出问题,请求可以重定向到其他同样模块,于是系统的可靠性能大大增强。

当然任何收益都有对应代价,分布式系统的设计开发相比于原来的聚合性系统会多出很多难点。例如负载均衡,服务发现,模块协商,共识达成等,分布式算法强调的就是这些问题的解决,但是理论总是抽象难以理解,倘若不能动手实现一个高可用高并发系统,你看多少理论都是雾里看花,越看越糊涂,所以我们必须通过动手实践来理解和掌握理论,首先我们从最简单的服务入手,那就是日志服务,我们将使用GO来实现。

首先创建根目录,可以命名为go_distributed_system,后面所有服务模块都实现在该目录下,然后创建子目录proglog,进去后我们再创建子目录internel/server/在这里我们实现日志服务的逻辑模块,首先在internel/server下面执行初始化命令:

  1. go mod init internal/server 

这里开发的模块会被其他模块引用,所以我们需要创建mod文件。首先我们需要完成日志系统所需的底层数据结构,创建log.go文件,相应代码如下:

  1. package server 
  2.  
  3. import ( 
  4.     "fmt" 
  5.     "sync" 
  6.  
  7. type Log struct { 
  8.     mu sync.Mutex 
  9.     records [] Record  
  10.  
  11. func NewLog() *Log { 
  12.     return &Log{ch : make(chan Record),}  
  13.  
  14. func(c *Log) Append(record Record) (uint64, error) { 
  15.      c.mu.Lock() 
  16.     defer c.mu.Unlock() 
  17.     record.Offset = uint64(len(c.records)) 
  18.     c.records = append(c.records, record) 
  19.     return record.Offset, nil  
  20.  
  21. func (c *Log) Read(offset uint64)(Record, error) { 
  22.     c.mu.Lock() 
  23.     defer c.mu.Unlock() 
  24.     if offset >= uint64(len(c.records)) { 
  25.         return Record{}, ErrOffsetNotFound  
  26.     } 
  27.  
  28.     return c.records[offset], nil  
  29.  
  30. type Record struct { 
  31.     Value []byte `json:"value"
  32.     Offset uint64 `json:"offset"
  33.  
  34. var ErrOffsetNotFound = fmt.Errorf("offset not found"

由于我们的日志服务将以http服务器程序的方式接收日志读写请求,因此多个读或写请求会同时执行,所以我们需要对records数组进行互斥操作,因此使用了互斥锁,在每次读取records数组前先获得锁,这样能防止服务在同时接收多个读写请求时破坏掉数据的一致性。

所有的日志读写请求会以http POST 和 GET的方式发起,数据通过json来封装,所以我们下面将创建一个http服务器对象,新建文件http.go,完成如下代码:

  1. package server  
  2.  
  3. import ( 
  4.     "encoding/json" 
  5.     "net/http" 
  6.     "github.com/gorilla/mux" 
  7.  
  8. func NewHttpServer(addr string) *http.Server { 
  9.     httpsrv := newHttpServer() 
  10.     r := mux.NewRouter() 
  11.     r.HandleFunc("/", httpsrv.handleLogWrite).Methods("POST"
  12.     r.HandleFunc("/", httpsrv.hadnleLogRead).Methods("GET"
  13.  
  14.     return &http.Server{ 
  15.         Addr : addr, 
  16.         Handler: r, 
  17.     } 
  18.  
  19. type httpServer struct{ 
  20.     Log *Log  
  21.  
  22. func newHttpServer() *httpServer { 
  23.     return &httpServer { 
  24.         Log: NewLog(), 
  25.     } 
  26.  
  27. type WriteRequest struct { 
  28.     Record Record `json:"record"
  29.  
  30. type WriteResponse struct { 
  31.     Offset uint64 `json:"offset"
  32.  
  33. type ReadRequest struct { 
  34.     Offset uint64 `json:"offset"
  35.  
  36. type ReadResponse struct { 
  37.     Record Record `json:"record"
  38.  
  39. func (s *httpServer) handleLogWrite(w http.ResponseWriter, r * http.Request) { 
  40.     var req WriteRequest  
  41.     //服务以json格式接收请求 
  42.     err := json.NewDecoder(r.Body).Decode(&req) 
  43.     if err != nil { 
  44.         http.Error(w, err.Error(), http.StatusBadRequest) 
  45.         return  
  46.     } 
  47.  
  48.     off, err := s.Log.Append(req.Record) 
  49.     if err != nil { 
  50.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  51.         return  
  52.     } 
  53.  
  54.     res := WriteResponse{Offset: off
  55.     //服务以json格式返回结果 
  56.     err = json.NewEncoder(w).Encode(res) 
  57.     if err != nil { 
  58.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  59.         return  
  60.     } 
  61.  
  62. func (s *httpServer) hadnleLogRead(w http.ResponseWriter, r *http.Request) { 
  63.     var req ReadRequest  
  64.     err := json.NewDecoder(r.Body).Decode(&req) 
  65.     if err != nil { 
  66.         http.Error(w, err.Error(), http.StatusBadRequest) 
  67.         return  
  68.     } 
  69.  
  70.     record, err := s.Log.Read(req.Offset) 
  71.     if err == ErrOffsetNotFound { 
  72.         http.Error(w, err.Error(), http.StatusNotFound) 
  73.         return 
  74.     } 
  75.  
  76.     if err != nil { 
  77.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  78.         return  
  79.     } 
  80.  
  81.     res := ReadResponse{Record: record} 
  82.     err = json.NewEncoder(w).Encode(res) 
  83.     if err != nil { 
  84.         http.Error(w, err.Error(), http.StatusInternalServerError) 
  85.         return 
  86.     } 

上面代码显示出“分布式”,“微服务”的特点。相应的功能代码以单独服务器的形式运行,通过网络来接收服务请求,这对应“分布式”,每个独立模块只完成一个特定任务,这就对应“微服务”,由于这种方式可以同时在不同的机器上运行,于是展示了“可扩展性”。

同时服务既然以http 服务器的形式存在,因此服务的请求和返回也要走Http形式,同时数据以Json方式进行封装。同时实现的逻辑很简单,但有日志写请求时,我们把请求解析成Record结构体后加入到队列末尾,当有读取日志的请求时,我们获得客户端发来的读取偏移,然后取出对应的记录,封装成json格式后返回给客户。

完成了服务器的代码后,我们需要将服务器运行起来,为了达到模块化的目的,我们把服务器的启动放置在另一个地方,在proglog根目录下创建cmd/server,在里面添加main.go:

  1. package main  
  2.  
  3. import ( 
  4.     "log" 
  5.     "internal/server" 
  6.  
  7. func main() { 
  8.     srv := server.NewHttpServer(":8080"
  9.     log.Fatal(srv.ListenAndServe()) 

同时为了能够引用internal/server下面的模块,我们需要在cmd/server下先通过go mod init cmd/server进行初始化,然后在go.mod文件中添加如下一行:

  1. replace internal/server => ../../internal/server 

然后执行命令 go mod tidy,这样本地模块就知道根据给定的目录转换去引用模块,最后使用go run main.go启动日志服务,现在我们要做的是测试服务器的可用性,我们同样在目录下创建server_test.go,然后编写测试代码,基本逻辑就是想服务器发送日志写请求,然后再发送读请求,并比较读到的数据是否和我们写入的数据一致,代码如下:

  1. package main 
  2.  
  3. import( 
  4.     "encoding/json" 
  5.     "net/http" 
  6.     "internal/server" 
  7.     "bytes" 
  8.     "testing" 
  9.     "io/ioutil" 
  10.  
  11. func TestServerLogWrite(t *testing.T) { 
  12.     var tests = []struct { 
  13.         request server.WriteRequest 
  14.         want_response server.WriteResponse  
  15.     } { 
  16.         {request: server.WriteRequest{server.Record{[]byte(`this is log request 1`), 0}},  
  17.          want_response:  server.WriteResponse{Offset: 0, },}, 
  18.          {request: server.WriteRequest{server.Record{[]byte(`this is log request 2`), 0}},  
  19.          want_response:  server.WriteResponse{Offset: 1, },}, 
  20.          {request: server.WriteRequest{server.Record{[]byte(`this is log request 3`), 0}},  
  21.          want_response:  server.WriteResponse{Offset: 2, },}, 
  22.     } 
  23.  
  24.     for _, test := range tests { 
  25.         //将请求转换成json格式并post给日志服务 
  26.         request := &test.request  
  27.         request_json, err := json.Marshal(request) 
  28.         if err != nil { 
  29.             t.Errorf("convert request to json fail"
  30.             return  
  31.         } 
  32.  
  33.         resp, err := http.Post("http://localhost:8080""application/json",bytes.NewBuffer(request_json)) 
  34.         defer resp.Body.Close() 
  35.         if err != nil { 
  36.             t.Errorf("http post request fail: %v", err) 
  37.             return 
  38.         } 
  39.  
  40.         //解析日志服务返回结果 
  41.         body, err := ioutil.ReadAll(resp.Body) 
  42.         var response server.WriteResponse  
  43.         err = json.Unmarshal([]byte(body), &response) 
  44.         if err != nil { 
  45.             t.Errorf("Unmarshal write response fail: %v", err) 
  46.         } 
  47.  
  48.         //检测结果是否与预期一致 
  49.         if response.Offset != test.want_response.Offset { 
  50.             t.Errorf("got offset: %d, but want offset: %d", response.Offset, test.want_response.Offset) 
  51.         } 
  52.  
  53.     } 
  54.  
  55.     var read_tests = []struct { 
  56.         request server.ReadRequest  
  57.         want server.ReadResponse  
  58.     } { 
  59.         {request: server.ReadRequest{Offset : 0,},  
  60.         want: server.ReadResponse{server.Record{[]byte(`this is log request 1`), 0}} }, 
  61.         {request: server.ReadRequest{Offset : 1,},  
  62.         want: server.ReadResponse{server.Record{[]byte(`this is log request 2`), 0}} }, 
  63.         {request: server.ReadRequest{Offset : 2,},  
  64.         want: server.ReadResponse{server.Record{[]byte(`this is log request 3`), 0}} }, 
  65.     } 
  66.  
  67.     for _, test := range read_tests { 
  68.         request := test.request  
  69.         request_json , err := json.Marshal(request) 
  70.         if err != nil { 
  71.             t.Errorf("convert read request to json fail"
  72.             return  
  73.         } 
  74.  
  75.         //将请求转换为json并放入GET请求体 
  76.         client := &http.Client{} 
  77.         req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", bytes.NewBuffer(request_json)) 
  78.         req.Header.Set("Content-Type""application/json"
  79.         resp, err := client.Do(req) 
  80.         if err != nil { 
  81.             t.Errorf("read request fail: %v", err) 
  82.             return  
  83.         } 
  84.  
  85.         //解析读请求返回的结果 
  86.         defer resp.Body.Close() 
  87.         body, err := ioutil.ReadAll(resp.Body) 
  88.         var response server.ReadResponse 
  89.         err = json.Unmarshal([]byte(body), &response) 
  90.         if err != nil { 
  91.             t.Errorf("Unmarshal read response fail: %v", err) 
  92.             return  
  93.         } 
  94.  
  95.         res := bytes.Compare(response.Record.Value, test.want.Record.Value) 
  96.         if res != 0 { 
  97.             t.Errorf("got value: %q, but want value : %q", response.Record.Value, test.want.Record.Value) 
  98.         } 
  99.     } 
  100.  

完成上面代码后,使用go test运行,结果如下图所示:

从结果看到,我们的测试能通过,也就是无论是向日志服务提交写入请求还是读取请求,所得的结果跟我们预想的一致。总结一下,本节我们设计了一个简单的JSON/HTTP 日志服务,它能够接收基于JSON的http写请求和读请求,后面我们还会研究基于gPRC技术的微服务开发技术.

代码获取

https://github.com/wycl16514/golang_distribute_system_log_service.git

 

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/233499.html<

(0)
运维的头像运维
上一篇2025-04-20 16:03
下一篇 2025-04-20 16:04

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注