Contents

golang-go-zero-个人笔记

本系列为作者跟着Mikaelemmmm的b站教学视频学习时做的笔记

https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/architecture.png

go zero 为一个微服务框架

比gin web框架更强大。

必看资料:

  1. 公众号:微服务实践
  2. go-zero-example: https://github.com/zeromicro/zero-examples
  3. zero-contrib: https://github.com/zeromicro/zero-contrib
  4. 微信社区群
  5. go-zero-issue: https://github.com/zeromicro/go-zero/issues

指令

指令页面

指令大全

https://go-zero.dev/cn/assets/images/goctl-cn-ce8593be6eca578a0fc864abb9369bc9.svg

API

1
goctl api go -api user.api -dir ../ -style goZero

RPC

1
2
3
goctl rpc template -o=user.proto

goctl rpc protoc user.proto --go_out=../ --go-grpc_out=../ --zrpc_out=../ --style=goZero

MODEL

1
2
3
goctl model mysql ddl -src="./*.sql" -dir="./sql/model" -c

$ goctl model mysql datasource -url="user:password@tcp(127.0.0.1:3306)/database" -table="*"  -dir="./model"

TEMPLATE

1
2
3
goctl template init --home $HOME/template

goctl rpc new greet --home $HOME/template

docker

1
goctl docker -go hello.go

protoc

1
protoc -I ./ --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. userModel.proto

小技巧

很多不知道咋写的地方可以直接进入对象源码里面去看看,特别是zero内置的置对象,源码里面有注释,没注释的看看名称也知道是什么

threading包

使用threading包,不要使用原生goroutine,如果goroutine出现panic是无法recover的。而threading会自动在goroutine启动时使用recover。threading包还有很多安全的api,不要自己使用go原生的,如waitgroup等。

该库是go-zero开源项目的一个组件,用于协程的并发安全,解决协程运行中panic无法解决的问题。

常用函数:

  • func GoSafe(fn func()):安全启动一个协程执行fn,在协程panic时自动recover
  • func RunSafe(fn func()):使用本协程安全运行fn,自动recover fn中产生的panic

proto文件注意事项

微服务时间戳可以直接用int64,使用time.Unix()来转换,没有必要使用第三方库。该什么类型就使用什么类型,把proto文件尽量简化(就像看文档一样)。any也尽量少用,类型尽量是具体的。

goland中proto文件import标红时选择它默认的解决办法(小灯泡)就行了。多个proto导入时,先将没有service的用protoc语句先自己生成:protoc -I ./ --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. userModel.proto

然后有service的使用goctl生成:goctl.exe rpc protoc .\user.proto --go_out=../ --go-grpc_out=../ --zrpc_out=../ --style=goZero

坑:yaml里面需要配置Mode:dev才能使用grpcui

微服务一般把model放到rpc里面,api里面只调用接口。如果规模变大,可以把model放外面,api调rpc接口,rpc调model接口。如果想约束数据库操作可以将model放到rpc的internal里面。

api与rpc连接方式:直连,etcd,k8s

template

goctl源码里面有使用templete生成代码,默认使用的是goctl里面tmpl里面的文件,你可以使用命令goctl templete init --home=xxx 来生成自己的tmpl,它会把goctl里面的文件复制到xxx下,如果不使用–home选项默认会生成到用户目录下的.goctl文件夹下,并且在后面如果goctl生成代码时不指定home选项就会使用该文件夹下的tmpl文件。之后就可以修改自己的tmpl文件。注意由于使用的是tmplete包的excute函数,所以每个文件能使用的变量都是不一样的,不要自己造变量。通常情况下这个用法可以解决引入包,或者添加校验等问题。使用的goctl版本号要和tmpl所在文件目录名对应。

go-zero超时配置

超时有三个地方配置:

api,rpc的server还有rpc的client

NonBlock

另外rpc的client的NonBlock选项默认为false,必须等依赖的rpc启动后自己才能启动,设置为true后可以不等依赖启动

yaml文件格式

yaml文件与config.go对应规则遵循匿名字段的规则,及匿名字段的属性可以在结构体上直接访问,另外yaml的短横线-表示切片的元素

Postman的example

Postman的example可以记录一对请求和响应结果

go-zero负载均衡

go-zero底层负载均衡使用p2c算法,使用EWMA计算服务器繁忙负载率

p2c:Pick of 2 choices,从可用节点列表中随机选择两个节点,选择负载率较低的进行请求

简书 千杉沐雪

sqlx中timestamp的空值问题

博客园 jackluo

对应go中的time.Time,空值为nil,但是nil可能会报错,可以直接去掉这个字段。另外通过字段上面加上 .omitempty 标签也可以解决这个问题。

mysql注意事项

一定要配置utf8mb4

1
root:password@/name?parseTime=True&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci

关于返回数组

github api语法

go-zero支持保留golang关键字,不支持time.Time数据类型,用int64表示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
RecommendRequest {
  Cursor int64 `json:"cursor"`
  Ps     int64 `form:"ps,default=20"` // 每页大小
}

RecommendResponse {
  Products      []*Product `json:"products"`
  IsEnd         bool       `json:"is_end"`         // 是否最后一页
  RecommendTime int64      `json:"recommend_time"` // 商品列表最后一个商品的推荐时间
}

接入pprof

go-zero 如何接入 pprof

  1. 内置pprof记录:查询进程id然后发送USR2信号:kill -USR2 [pid],日志文件中会输出 pprof 的文件的保存路径,对文件进行分析即可
  2. web pprof:添加pprof服务,并注册到ServiceGroup
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main
import (
	//...
	_ "net/http/pprof"
	//...
)

var configFile = flag.String("f", "etc/bigdataapi-api.yaml", "the config file")

function main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)

	ctx := svc.NewServiceContext(c)

	svcGroup := service.NewServiceGroup()
	defer svcGroup.Stop()

	server := rest.MustNewServer(c.RestConf)
	svcGroup.Add(server)
	svcGroup.Add(pprofServer{})

	handler.RegisterHandlers(server, ctx)
	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
	svcGroup.Start()
}

type pprofServer struct{}

func (pprofServer) Start() {
	addr := "0.0.0.0:8080"
	fmt.Printf("Start pprof server, listen addr %s\n", addr)
	err := http.ListenAndServe(addr, nil)
	if err != nil {
		log.Fatal(err)
	}
}

func (pprofServer) Stop() {
	fmt.Printf("Stop pprof server\n")
}

dtm

dtm官网

Mikealmmmm dtm保姆级教程核心代码

基础

安装运行DTM

docker安装

1
docker run -itd  --name dtm -p 36789:36789 -p 36790:36790  yedf/dtm:latest

二进制包下载安装

执行命令./dtm 即可运行

快速开始

DTM是什么:DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。

示例详解

接入代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 具体业务微服务地址
const qsBusi = "http://localhost:8081/api/busi_saga"
req := &gin.H{"amount": 30} // 微服务的载荷
// DtmServer为DTM服务的地址,是一个url
DtmServer := "http://localhost:36789/api/dtmsvr"
saga := dtmcli.NewSaga(DtmServer, shortuuid.New()).
  // 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 补偿操作为url: qsBusi+"/TransOutCompensate"
  Add(qsBusi+"/TransOut", qsBusi+"/TransOutCompensate", req).
  // 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransIn", 补偿操作为url: qsBusi+"/TransInCompensate"
  Add(qsBusi+"/TransIn", qsBusi+"/TransInCompensate", req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
err := saga.Submit()

时序图:

  • 成功
https://dtm.pub/assets/saga_normal.a2849672.jpg
  • 失败
https://dtm.pub/assets/saga_rollback.8da8593f.jpg

接入参考

go-zero对接

dtm配置,在dtm项目文件夹下生成配置文件

1
cp conf.sample.yml conf.yml

如果使用etcd就将配置中的以下代码解开注释

MicroService:
 Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover
 Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
 EndPoint: 'localhost:36790'

开发接入

main.go

1
2
// 下面这行导入gozero的dtm驱动
import _ "github.com/dtm-labs/driver-gozero"

logic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// dtm已经通过前面的配置,注册到下面这个地址,因此在dtmgrpc中使用该地址
var dtmServer = "etcd://localhost:2379/dtmservice"

// 下面从配置文件中Load配置,然后通过BuildTarget获得业务服务的地址
var c zrpc.RpcClientConf
conf.MustLoad(*configFile, &c)
busiServer, err := c.BuildTarget()

  // 使用dtmgrpc生成一个消息型分布式事务并提交
	gid := dtmgrpc.MustGenGid(dtmServer)
	msg := dtmgrpc.NewMsgGrpc(dtmServer, gid).
    // 事务的第一步为调用trans.TransSvcClient.TransOut
    // 可以从trans.pb.go中找到上述方法对应的Method名称为"/trans.TransSvc/TransOut"
    // dtm需要从dtm服务器调用该方法,所以不走强类型
    // 而是走动态的url: busiServer+"/trans.TransSvc/TransOut"
		Add(busiServer+"/trans.TransSvc/TransOut", &busi.BusiReq{Amount: 30, UserId: 1}).
		Add(busiServer+"/trans.TransSvc/TransIn", &busi.BusiReq{Amount: 30, UserId: 2})
  // 上面这个是两阶段消息的形式,在Mikealm的程序里面的SAGA形式是这么写的:
  // saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
	// 	Add(orderRpcBusiServer+"/pb.order/create", orderRpcBusiServer+"/pb.order/createRollback", createOrderReq).
	// 	Add(stockRpcBusiServer+"/pb.stock/deduct", stockRpcBusiServer+"/pb.stock/deductRollback", deductReq)
	err := msg.Submit()

注意事项

  1. rpc服务的pb路径

在order-api的logic中,调用后端rpc服务时候,找到rpc的pb路径时,一定要找pb.go文件中invoke的路径,因为grpc在生成pb文件时候下方也有一堆有路径的,一定要找到invoke那个路径

正确的

https://github.com/Mikaelemmmm/gozerodtm/raw/main/docimage/pb_url_right.png
  1. dtm的回滚补偿

在使用dtm的grpc时候,在grpc中的服务发生错误了,希望执行后面的rollback必须返回 : status.Error(codes.Aborted, dtmcli.ResultFailure) , 返回其他错误,不会执行你的rollback操作,dtm会一直重试,如下图:

https://github.com/Mikaelemmmm/gozerodtm/raw/main/docimage/rollback.png
  1. barrier的空补偿、悬挂等

barrier库在准备工作中已经建立了

每个与db交互的服务只要用到了barrier,那要给这个服务使用到的mysql账号分配barrier库的权限

  1. barrier在rpc中本地事务

使用了barrier,model中与db交互时候必须要用事务,并且一定要跟barrier用同一个事务

logic:

https://github.com/Mikaelemmmm/gozerodtm/raw/main/docimage/barrier_tx_logic.png

model:

https://github.com/Mikaelemmmm/gozerodtm/raw/main/docimage/barrier_tx_model.png

go-zero博客

老文档

新文档

企业级RPC框架zRPC

zRPC支持直连和基于etcd服务发现两种方式

https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/zrpc.png
comment
  • discov: 服务发现模块,基于etcd实现服务发现功能
  • resolver: 服务注册模块,实现了gRPC的resolver.Builder接口并注册到gRPC
  • interceptor: 拦截器,对请求和响应进行拦截处理(内置拦截器实现自适应熔断(breaker)和prometheus指标收集等)
  • balancer: 负载均衡模块,实现了p2c负载均衡算法,并注册到gRPC
  • client: zRPC客户端,负责发起请求
  • server: zRPC服务端,负责处理请求

threading

该库是go-zero开源项目的一个组件,用于协程的并发安全,解决协程运行中panic无法解决的问题。

常用函数:

  • func GoSafe(fn func()):安全启动一个协程执行fn,在协程panic时自动recover
  • func RunSafe(fn func()):使用本协程安全运行fn,自动recover fn中产生的panic

stringx包

go zero 博客 高效的关键词替换和敏感词过滤工具

利用高效的Trie树建立关键词树

函数

  • func NewReplacer(mapping map[string]string) Replacer
  • func NewTrie(words []string, opts …TrieOption) Trie:这里的type TrieOption func(trie *trieNode)看了源码只有func WithMask(mask rune) TrieOption,用于后面屏蔽词的替换字符

Replacer方法

  • func (r *replacer) Replace(text string) string:根据mapping替换text

Trie方法

  • func (n *trieNode) Filter(text string) (sentence string, keywords []string, found bool):返回值第一个是替换屏蔽词后的结果,第二个是找到的屏蔽词,第三个是是否有屏蔽词
  • func (n *trieNode) FindKeywords(text string) []string:返回找到的屏蔽词

缓存设计

持久层缓存

持久层缓存

https://go-zero.dev/cn/assets/images/redis-cache-11-c2989ee89c9f5f204d294bbed81f1bad.webp

缓存只删除不更新

行记录始终只存储一份,即主键对应行记录

唯一索引仅缓存主键值,不直接缓存行记录(参考mysql索引思想)

防缓存穿透设计,缓存中没有的添加*记录,并使用Once查询数据库,默认一分钟

不缓存多行记录

业务层缓存

业务层缓存

https://go-zero.dev/cn/assets/images/biz-redis-02-c74e333a1fd8a3af9c5f12314a9fb1cd.svg

进程内缓存工具collection.Cache

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 初始化 cache,其中 WithLimit 可以指定最大缓存的数量
c, err := collection.NewCache(time.Minute, collection.WithLimit(10000))
if err != nil {
  panic(err)
}

// 设置缓存
c.Set("key", user)

// 获取缓存,ok:是否存在
v, ok := c.Get("key")

// 删除缓存
c.Del("key")

// 获取缓存,如果 key 不存在的,则会调用 func 去生成缓存
v, err := c.Take("key", func() (interface{}, error) {
  return user, nil
})

并发编程

mapreduce官方文档

https://raw.githubusercontent.com/zeromicro/zero-doc/main/doc/images/mr.png

简单使用Finish函数

并发执行多个函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func productDetail(uid, pid int64) (*ProductDetail, error) {
    var pd ProductDetail
    err := mr.Finish(func() (err error) {
        pd.User, err = userRpc.User(uid)
        return
    }, func() (err error) {
        pd.Store, err = storeRpc.Store(pid)
        return
    }, func() (err error) {
        pd.Order, err = orderRpc.Order(pid)
        return
    })

    if err != nil {
        log.Printf("product detail error: %v", err)
        return nil, err
    }

    return &pd, nil
}

复杂使用:MapReduce

并发对多个对象执行mapreduce

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func checkLegal(uids []int64) ([]int64, error) {
    r, err := mr.MapReduce(func(source chan<- interface{}) {
        for _, uid := range uids {
            source <- uid
        }
    }, func(item interface{}, writer mr.Writer, cancel func(error)) {
        uid := item.(int64)
        ok, err := check(uid)
        if err != nil {
            cancel(err)
        }
        if ok {
            writer.Write(uid)
        }
    }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
        var uids []int64
        for p := range pipe {
            uids = append(uids, p.(int64))
        }
        writer.Write(uids)
    })
    if err != nil {
        log.Printf("check error: %v", err)
        return nil, err
    }

    return r.([]int64), nil
}

func check(uid int64) (bool, error) {
    // do something check user legal
    return true, nil
}

调用cancel立即返回nil, error

流处理

fx

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func outputStream(ch chan int) {
    fx.From(func(source chan<- interface{}) {
        for c := range ch {
            source <- c
        }
    }).Walk(func(item interface{}, pipe chan<- interface{}) {
        count := item.(int)
        pipe <- count
    }).Filter(func(item interface{}) bool {
        itemInt := item.(int)
        if itemInt%2 == 0 {
            return true
        }
        return false
    }).ForEach(func(item interface{}) {
        fmt.Println(item)
    })
}

mysql

mysql

go-queue

godoc官方文档

github官方文档

example见github

dq

consumer example
1
2
3
4
5
6
7
8
9
consumer := dq.NewConsumer(dq.DqConf{
	Redis: redis.RedisConf{
		Host: "localhost:6379",
		Type: redis.NodeType,
	},
})
consumer.Consume(func(body []byte) {
	fmt.Println(string(body))
})
producer example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
producer := dq.NewProducer([]dq.Beanstalk{
	{
		Endpoint: "localhost:11300",
		Tube:     "tube",
	},
	{
		Endpoint: "localhost:11300",
		Tube:     "tube",
	},
})	

for i := 1000; i < 1005; i++ {
	_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
	if err != nil {
		fmt.Println(err)
	}
}

kq

kq不支持创建topic

consumer example

config.json

1
2
3
4
5
6
7
8
9
Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1

go文件

1
2
3
4
5
6
7
8
9
var c kq.KqConf
conf.MustLoad("config.json", &c)

q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
	fmt.Printf("=> %s\n", v)
	return nil
}))
defer q.Stop()
q.Start()
producer example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}


pusher := kq.NewPusher([]string{
	"127.0.0.1:19092",
	"127.0.0.1:19092",
	"127.0.0.1:19092",
}, "kq")

ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
	select {
	case <-ticker.C:
		count := rand.Intn(100)
		m := message{
			Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
			Value:   fmt.Sprintf("%d,%d", round, count),
			Payload: fmt.Sprintf("%d,%d", round, count),
		}
		body, err := json.Marshal(m)
		if err != nil {
			log.Fatal(err)
		}

		fmt.Println(string(body))
		if err := pusher.Push(string(body)); err != nil {
			log.Fatal(err)
		}
	}
}
cmdline.EnterToContinue()
 |