本系列为作者跟着Mikaelemmmm 的b站教学视频学习时做的笔记
go zero 为一个微服务框架
比gin web框架更强大。
必看资料:
公众号:微服务实践
go-zero-example: https://github.com/zeromicro/zero-examples
zero-contrib: https://github.com/zeromicro/zero-contrib
微信社区群
go-zero-issue: https://github.com/zeromicro/go-zero/issues
指令
指令页面
指令大全
API
1
goctl api go -api user.api -dir ../ -style goZero
RPC
1
2
3
goctl rpc template -o=user.proto
goctl rpc protoc user.proto --go_out=../ --go-grpc_out=../ --zrpc_out=../ --style=goZero
MODEL
1
2
3
goctl model mysql ddl -src="./*.sql" -dir="./sql/model" -c
$ goctl model mysql datasource -url="user:password@tcp(127.0.0.1:3306)/database" -table="*" -dir="./model"
TEMPLATE
1
2
3
goctl template init --home $HOME /template
goctl rpc new greet --home $HOME /template
docker
1
goctl docker -go hello.go
protoc
1
protoc -I ./ --go_out=paths =source_relative:. --go-grpc_out=paths =source_relative:. userModel.proto
小技巧
很多不知道咋写的地方可以直接进入对象源码里面去看看,特别是zero内置的置对象,源码里面有注释,没注释的看看名称也知道是什么
threading包
使用threading包,不要使用原生goroutine,如果goroutine出现panic是无法recover的。而threading会自动在goroutine启动时使用recover。threading包还有很多安全的api,不要自己使用go原生的,如waitgroup等。
该库是go-zero开源项目的一个组件,用于协程的并发安全,解决协程运行中panic无法解决的问题。
常用函数:
func GoSafe(fn func()):安全启动一个协程执行fn,在协程panic时自动recover
func RunSafe(fn func()):使用本协程安全运行fn,自动recover fn中产生的panic
proto文件注意事项
微服务时间戳可以直接用int64,使用time.Unix()来转换,没有必要使用第三方库。该什么类型就使用什么类型,把proto文件尽量简化(就像看文档一样)。any也尽量少用,类型尽量是具体的。
goland中proto文件import标红时选择它默认的解决办法(小灯泡)就行了。多个proto导入时,先将没有service的用protoc语句先自己生成:protoc -I ./ --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. userModel.proto
然后有service的使用goctl生成:goctl.exe rpc protoc .\user.proto --go_out=../ --go-grpc_out=../ --zrpc_out=../ --style=goZero
坑:yaml里面需要配置Mode:dev
才能使用grpcui
微服务一般把model放到rpc里面,api里面只调用接口。如果规模变大,可以把model放外面,api调rpc接口,rpc调model接口。如果想约束数据库操作可以将model放到rpc的internal里面。
api与rpc连接方式:直连,etcd,k8s
template
goctl源码里面有使用templete生成代码,默认使用的是goctl里面tmpl里面的文件,你可以使用命令goctl templete init --home=xxx
来生成自己的tmpl,它会把goctl里面的文件复制到xxx下,如果不使用–home选项默认会生成到用户目录下的.goctl文件夹下,并且在后面如果goctl生成代码时不指定home选项就会使用该文件夹下的tmpl文件。之后就可以修改自己的tmpl文件。注意由于使用的是tmplete包的excute函数,所以每个文件能使用的变量都是不一样的,不要自己造变量。通常情况下这个用法可以解决引入包,或者添加校验等问题。使用的goctl版本号要和tmpl所在文件目录名对应。
go-zero超时配置
超时有三个地方配置:
api,rpc的server还有rpc的client
NonBlock
另外rpc的client的NonBlock选项默认为false,必须等依赖的rpc启动后自己才能启动,设置为true后可以不等依赖启动
yaml文件格式
yaml文件与config.go对应规则遵循匿名字段的规则,及匿名字段的属性可以在结构体上直接访问,另外yaml的短横线-
表示切片的元素
Postman的example
Postman的example可以记录一对请求和响应结果
go-zero负载均衡
go-zero底层负载均衡使用p2c算法,使用EWMA计算服务器繁忙负载率
p2c:Pick of 2 choices,从可用节点列表中随机选择两个节点,选择负载率较低的进行请求
简书 千杉沐雪
sqlx中timestamp的空值问题
博客园 jackluo
对应go中的time.Time,空值为nil,但是nil可能会报错,可以直接去掉这个字段。另外通过字段上面加上 .omitempty 标签也可以解决这个问题。
mysql注意事项
一定要配置utf8mb4
1
root:password@/name?parseTime=True&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci
关于返回数组
github api语法
go-zero支持保留golang关键字,不支持time.Time数据类型,用int64表示
1
2
3
4
5
6
7
8
9
10
RecommendRequest {
Cursor int64 `json:"cursor"`
Ps int64 `form:"ps,default=20"` // 每页大小
}
RecommendResponse {
Products []*Product `json:"products"`
IsEnd bool `json:"is_end"` // 是否最后一页
RecommendTime int64 `json:"recommend_time"` // 商品列表最后一个商品的推荐时间
}
接入pprof
go-zero 如何接入 pprof
内置pprof记录:查询进程id然后发送USR2信号:kill -USR2 [pid]
,日志文件中会输出 pprof 的文件的保存路径,对文件进行分析即可
web pprof:添加pprof服务,并注册到ServiceGroup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main
import (
//...
_ "net/http/pprof"
//...
)
var configFile = flag.String ("f" , "etc/bigdataapi-api.yaml" , "the config file" )
function main () {
flag.Parse ()
var c config.Config
conf.MustLoad (*configFile, &c)
ctx := svc.NewServiceContext (c)
svcGroup := service.NewServiceGroup ()
defer svcGroup.Stop ()
server := rest.MustNewServer (c.RestConf)
svcGroup.Add (server)
svcGroup.Add (pprofServer{})
handler.RegisterHandlers (server, ctx)
fmt.Printf ("Starting server at %s:%d...\n" , c.Host, c.Port)
svcGroup.Start ()
}
type pprofServer struct {}
func (pprofServer) Start () {
addr := "0.0.0.0:8080"
fmt.Printf ("Start pprof server, listen addr %s\n" , addr)
err := http.ListenAndServe (addr, nil )
if err != nil {
log.Fatal (err)
}
}
func (pprofServer) Stop () {
fmt.Printf ("Stop pprof server\n" )
}
dtm
dtm官网
Mikealmmmm dtm保姆级教程核心代码
基础
安装运行DTM
docker安装
1
docker run -itd --name dtm -p 36789:36789 -p 36790:36790 yedf/dtm:latest
二进制包下载安装
执行命令./dtm 即可运行
快速开始
DTM是什么:DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
示例详解
接入代码
1
2
3
4
5
6
7
8
9
10
11
12
// 具体业务微服务地址
const qsBusi = "http://localhost:8081/api/busi_saga"
req := &gin.H{"amount" : 30 } // 微服务的载荷
// DtmServer为DTM服务的地址,是一个url
DtmServer := "http://localhost:36789/api/dtmsvr"
saga := dtmcli.NewSaga (DtmServer, shortuuid.New ()).
// 添加一个TransOut的子事务,正向操作为url: qsBusi+"/TransOut", 补偿操作为url: qsBusi+"/TransOutCompensate"
Add (qsBusi+"/TransOut" , qsBusi+"/TransOutCompensate" , req).
// 添加一个TransIn的子事务,正向操作为url: qsBusi+"/TransIn", 补偿操作为url: qsBusi+"/TransInCompensate"
Add (qsBusi+"/TransIn" , qsBusi+"/TransInCompensate" , req)
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
err := saga.Submit ()
时序图:
接入参考
go-zero对接
dtm配置,在dtm项目文件夹下生成配置文件
1
cp conf.sample.yml conf.yml
如果使用etcd就将配置中的以下代码解开注释
MicroService:
Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover
Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
EndPoint: 'localhost:36790'
开发接入
main.go
1
2
// 下面这行导入gozero的dtm驱动
import _ "github.com/dtm-labs/driver-gozero"
logic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// dtm已经通过前面的配置,注册到下面这个地址,因此在dtmgrpc中使用该地址
var dtmServer = "etcd://localhost:2379/dtmservice"
// 下面从配置文件中Load配置,然后通过BuildTarget获得业务服务的地址
var c zrpc.RpcClientConf
conf.MustLoad (*configFile, &c)
busiServer, err := c.BuildTarget ()
// 使用dtmgrpc生成一个消息型分布式事务并提交
gid := dtmgrpc.MustGenGid (dtmServer)
msg := dtmgrpc.NewMsgGrpc (dtmServer, gid).
// 事务的第一步为调用trans.TransSvcClient.TransOut
// 可以从trans.pb.go中找到上述方法对应的Method名称为"/trans.TransSvc/TransOut"
// dtm需要从dtm服务器调用该方法,所以不走强类型
// 而是走动态的url: busiServer+"/trans.TransSvc/TransOut"
Add (busiServer+"/trans.TransSvc/TransOut" , &busi.BusiReq{Amount: 30 , UserId: 1 }).
Add (busiServer+"/trans.TransSvc/TransIn" , &busi.BusiReq{Amount: 30 , UserId: 2 })
// 上面这个是两阶段消息的形式,在Mikealm的程序里面的SAGA形式是这么写的:
// saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
// Add(orderRpcBusiServer+"/pb.order/create", orderRpcBusiServer+"/pb.order/createRollback", createOrderReq).
// Add(stockRpcBusiServer+"/pb.stock/deduct", stockRpcBusiServer+"/pb.stock/deductRollback", deductReq)
err := msg.Submit ()
注意事项
rpc服务的pb路径
在order-api的logic中,调用后端rpc服务时候,找到rpc的pb路径时,一定要找pb.go文件中invoke的路径,因为grpc在生成pb文件时候下方也有一堆有路径的,一定要找到invoke那个路径
正确的
dtm的回滚补偿
在使用dtm的grpc时候,在grpc中的服务发生错误了,希望执行后面的rollback必须返回 : status.Error(codes.Aborted, dtmcli.ResultFailure) , 返回其他错误,不会执行你的rollback操作,dtm会一直重试,如下图:
barrier的空补偿、悬挂等
barrier库在准备工作中已经建立了
每个与db交互的服务只要用到了barrier,那要给这个服务使用到的mysql账号分配barrier库的权限
barrier在rpc中本地事务
使用了barrier,model中与db交互时候必须要用事务,并且一定要跟barrier用同一个事务
logic:
model:
go-zero博客
老文档
新文档
企业级RPC框架zRPC
zRPC支持直连和基于etcd服务发现两种方式
comment
discov: 服务发现模块,基于etcd实现服务发现功能
resolver: 服务注册模块,实现了gRPC的resolver.Builder接口并注册到gRPC
interceptor: 拦截器,对请求和响应进行拦截处理(内置拦截器实现自适应熔断(breaker)和prometheus指标收集等)
balancer: 负载均衡模块,实现了p2c负载均衡算法,并注册到gRPC
client: zRPC客户端,负责发起请求
server: zRPC服务端,负责处理请求
threading
该库是go-zero开源项目的一个组件,用于协程的并发安全,解决协程运行中panic无法解决的问题。
常用函数:
func GoSafe(fn func()):安全启动一个协程执行fn,在协程panic时自动recover
func RunSafe(fn func()):使用本协程安全运行fn,自动recover fn中产生的panic
stringx包
go zero 博客 高效的关键词替换和敏感词过滤工具
利用高效的Trie树建立关键词树
函数
func NewReplacer(mapping map[string]string) Replacer
func NewTrie(words []string, opts …TrieOption) Trie:这里的type TrieOption func(trie *trieNode)
看了源码只有func WithMask(mask rune) TrieOption
,用于后面屏蔽词的替换字符
Replacer方法
func (r *replacer) Replace(text string) string:根据mapping替换text
Trie方法
func (n *trieNode) Filter(text string) (sentence string, keywords []string, found bool):返回值第一个是替换屏蔽词后的结果,第二个是找到的屏蔽词,第三个是是否有屏蔽词
func (n *trieNode) FindKeywords(text string) []string:返回找到的屏蔽词
缓存设计
持久层缓存
持久层缓存
缓存只删除不更新
行记录始终只存储一份,即主键对应行记录
唯一索引仅缓存主键值,不直接缓存行记录(参考mysql索引思想)
防缓存穿透设计,缓存中没有的添加*记录,并使用Once查询数据库,默认一分钟
不缓存多行记录
业务层缓存
业务层缓存
进程内缓存工具collection.Cache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 初始化 cache,其中 WithLimit 可以指定最大缓存的数量
c, err := collection.NewCache (time.Minute, collection.WithLimit (10000 ))
if err != nil {
panic (err)
}
// 设置缓存
c.Set ("key" , user)
// 获取缓存,ok:是否存在
v, ok := c.Get ("key" )
// 删除缓存
c.Del ("key" )
// 获取缓存,如果 key 不存在的,则会调用 func 去生成缓存
v, err := c.Take ("key" , func () (interface {}, error ) {
return user, nil
})
并发编程
mapreduce官方文档
简单使用Finish函数
并发执行多个函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func productDetail (uid, pid int64 ) (*ProductDetail, error ) {
var pd ProductDetail
err := mr.Finish (func () (err error ) {
pd.User, err = userRpc.User (uid)
return
}, func () (err error ) {
pd.Store, err = storeRpc.Store (pid)
return
}, func () (err error ) {
pd.Order, err = orderRpc.Order (pid)
return
})
if err != nil {
log.Printf ("product detail error: %v" , err)
return nil , err
}
return &pd, nil
}
复杂使用:MapReduce
并发对多个对象执行mapreduce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func checkLegal (uids []int64 ) ([]int64 , error ) {
r, err := mr.MapReduce (func (source chan <- interface {}) {
for _, uid := range uids {
source <- uid
}
}, func (item interface {}, writer mr.Writer, cancel func (error )) {
uid := item.(int64 )
ok, err := check (uid)
if err != nil {
cancel (err)
}
if ok {
writer.Write (uid)
}
}, func (pipe <-chan interface {}, writer mr.Writer, cancel func (error )) {
var uids []int64
for p := range pipe {
uids = append (uids, p.(int64 ))
}
writer.Write (uids)
})
if err != nil {
log.Printf ("check error: %v" , err)
return nil , err
}
return r.([]int64 ), nil
}
func check (uid int64 ) (bool , error ) {
// do something check user legal
return true , nil
}
调用cancel立即返回nil, error
流处理
fx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func outputStream (ch chan int ) {
fx.From (func (source chan <- interface {}) {
for c := range ch {
source <- c
}
}).Walk (func (item interface {}, pipe chan <- interface {}) {
count := item.(int )
pipe <- count
}).Filter (func (item interface {}) bool {
itemInt := item.(int )
if itemInt%2 == 0 {
return true
}
return false
}).ForEach (func (item interface {}) {
fmt.Println (item)
})
}
mysql
mysql
go-queue
godoc官方文档
github官方文档
example见github
dq
consumer example
1
2
3
4
5
6
7
8
9
consumer := dq.NewConsumer (dq.DqConf{
Redis: redis.RedisConf{
Host: "localhost:6379" ,
Type: redis.NodeType,
},
})
consumer.Consume (func (body []byte ) {
fmt.Println (string (body))
})
producer example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
producer := dq.NewProducer ([]dq.Beanstalk{
{
Endpoint: "localhost:11300" ,
Tube: "tube" ,
},
{
Endpoint: "localhost:11300" ,
Tube: "tube" ,
},
})
for i := 1000 ; i < 1005 ; i++ {
_, err := producer.Delay ([]byte (strconv.Itoa (i)), time.Second*5 )
if err != nil {
fmt.Println (err)
}
}
kq
kq不支持创建topic
consumer example
config.json
1
2
3
4
5
6
7
8
9
Name : kq
Brokers :
- 127.0.0.1 :19092
- 127.0.0.1 :19092
- 127.0.0.1 :19092
Group : adhoc
Topic : kq
Offset : first
Consumers : 1
go文件
1
2
3
4
5
6
7
8
9
var c kq.KqConf
conf.MustLoad ("config.json" , &c)
q := kq.MustNewQueue (c, kq.WithHandle (func (k, v string ) error {
fmt.Printf ("=> %s\n" , v)
return nil
}))
defer q.Stop ()
q.Start ()
producer example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type message struct {
Key string `json:"key"`
Value string `json:"value"`
Payload string `json:"message"`
}
pusher := kq.NewPusher ([]string {
"127.0.0.1:19092" ,
"127.0.0.1:19092" ,
"127.0.0.1:19092" ,
}, "kq" )
ticker := time.NewTicker (time.Millisecond)
for round := 0 ; round < 3 ; round++ {
select {
case <-ticker.C:
count := rand.Intn (100 )
m := message{
Key: strconv.FormatInt (time.Now ().UnixNano (), 10 ),
Value: fmt.Sprintf ("%d,%d" , round, count),
Payload: fmt.Sprintf ("%d,%d" , round, count),
}
body, err := json.Marshal (m)
if err != nil {
log.Fatal (err)
}
fmt.Println (string (body))
if err := pusher.Push (string (body)); err != nil {
log.Fatal (err)
}
}
}
cmdline.EnterToContinue ()