go-zero官网
go-zero详细文档
go-zero looklook github
本系列为作者跟着Mikaelemmmm的b站教学视频学习时做的笔记
looklook文档
Mikaelemmmm哥用爱发电,希望大家给looklook项目点个star
项目架构图和业务架构图
zerolooklook使用的中间件
zeromicro/go-queue里面的dq,kq和rabbitmq
dq基于redis,kq基于kafka,rabbitmq基于rabbitmq。都是封装好的消息队列使用很方便。
dq的asynq可以做延时任务,基于cron库,调用schedule模块来布置定时任务。用于分布式高可用,防止单点故障(相对直接使用cron布置任务)
looklook架构讲解
线上部署
cdn=》防火墙(可以是云服务自带的防火墙直接配)=》负载均衡=》nginx集群=》k8s集群(api:使用goctl生成的k8s yaml文件会暴露出访问端口,直接把端口配置到nginx里面,另外api内部调用rpc的服务;rpc服务不暴露端口给外面,仅在k8s集群内部的网段开放服务,供api调用和调用内部中间件;mq,job,schedule也只被rpc调用不暴露给外面)=》使用filebeat来获取日志并发给kafka=》kafka将存储日志消息=》go-stash消费并过滤日志消息并push到elasticsearch
jeager做链路监控,prometheus做服务器监控,grafana查看Prometheus监控信息
项目目录讲解
app里面是所有的业务
common里面放常用的定制组件
deploy是部署相关的文件
doc是项目的文档
docker-compose.yaml和docker-compose-env.yaml分别是业务容器和环境容器的docker-compose文件
部署过程
见项目文档
启动过程中es可能会启动比较慢,依赖es的kibana,jeager也起不来。
mysql在windows可能因为权限起不来
日志收集使用filebeat搜集容器文件夹内容,然后输出给kafka,所以需要进入kafka创建3个topic,对应业务架构里面支付成功通知所有订阅者(2个)和项目架构的log搜集(1个)。
1
2
3
4
|
docker exec -it kafka /bin/sh
cd /opt/kafka/bin/
./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 -partitions 1 --topic looklook-log
./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 -partitions 1 --topic payment-update-paystatus-topic
|
mysql的容器初次使用时需要配置允许远程连接
1
2
3
4
5
|
docker exec -it mysql mysql -uroot -p
##输入密码:PXDN93VRKUm8TeE7
use mysql;
update user set host='%' where user='root';
FLUSH PRIVILEGES;
|
并在mysql中导入sql文件来构建表
modd的img需要你配置modd.conf,配置很简单
1
2
3
4
5
|
#usercenter
app/usercenter/cmd/rpc/**/*.go {
prep: go build -o data/server/usercenter-rpc -v app/usercenter/cmd/rpc/usercenter.go
daemon +sigkill: ./data/server/usercenter-rpc -f app/usercenter/cmd/rpc/etc/usercenter.yaml
}
|
app/usercenter/cmd/rpc/**/*.go表示这些文件变化就出发下面的代码块
代码块里面按格式写就可以重新构建项目,实现热重载
查看looklook容器日志
1
|
docker logs -f looklook
|
go-stash可能会比kafka启动的早所以如果日志收集出问题可以选择重启一下go-stash
定时任务
app/mqueue/scheduler/logic/settleRecordJob.go
1
2
3
4
5
6
7
8
9
10
|
func (l *MqueueScheduler) settleRecordScheduler() {
task := asynq.NewTask(jobtype.ScheduleSettleRecord, nil)
// every one minute exec
entryID, err := l.svcCtx.Scheduler.Register("*/1 * * * *", task)
if err != nil {
logx.WithContext(l.ctx).Errorf("!!!MqueueSchedulerErr!!! ====> 【settleRecordScheduler】 registered err:%+v , task:%+v",err,task)
}
fmt.Printf("【settleRecordScheduler】 registered an entry: %q \n", entryID)
}
|
自定义jwt验证不通过响应
在生成server的语句里面添加选项WithUnauthorizedCallback
1
2
3
|
server := rest.MustNewServer(c.RestConf, rest.WithUnauthorizedCallback(func(w http.ResponseWriter, r *http.Request, err error) {
w.WriteHeader(200)
}))
|
但是通常前端来处理返回给用户的结果,后端只需要直接将错误代码发给前端。
微信支付api
这个api可以直接用,线上测试没问题
技术栈过多的问题
Mikaelmmmm希望做一个大而全的后端项目。
如果觉得技术栈太多,可以在docker-compose里面将不熟悉的内容都注释掉,不跑一些中间件。looklook的模块划分的很好,把相关的代码注释掉即可。
官方文档笔记
1.开发环境部署
项目介绍
本项目开发环境推荐docker-compose,使用直链方式,测试、线上部署使用k8s
- app:所有业务代码包含api、rpc以及mq(消息队列、延迟队列、定时任务)
- common:通用组件 error、middleware、interceptor、tool、ctxdata等
- data:该项目包含该目录依赖所有中间件(mysql、es、redis、grafana等)产生的数据。
- deploy:
- filebeat: docker部署filebeat配置
- go-stash:go-stash配置
- nginx: nginx网关配置
- prometheus : prometheus配置
- goctl: 该项目goctl的template
- doc : 该项目系列文档
- modd.conf : modd热加载配置文件
技术栈
k8s,go-zero,nginx网关,filebeat,kafka,go-stash,elasticsearch,kibana,prometheus,grafana,jaeger,go-queue,asynq,asynqmon,dtm,docker,docker-compose,mysql,redis,modd,jenkins,gitlab,harbor
nginx网关
容器内部nginx端口是8081,使用docker暴露出去8888映射端口8081,这样外部通过8888来访问网关
使用location来匹配每个服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
server{
listen 8081;
access_log /var/log/nginx/looklook.com_access.log;
error_log /var/log/nginx/looklook.com_error.log;
location ~ /order/ {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header REMOTE-HOST $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://looklook:1001;
}
}
|
通过go-zero自带的jwt鉴权之后,我们可以在ctx中拿到userId,
1
2
3
4
|
func (l *DetailLogic) Detail(req types.UserInfoReq) (*types.UserInfoResp, error) {
userId := ctxdata.GetUidFromCtx(l.ctx)
......
}
|
鉴权服务
go-zero官方文档
go-zero从jwt token解析后会将用户生成token时传入的kv原封不动的放在http.Request的Context中,因此我们可以通过Context就可以拿到你想要的值
在desc/api文件中定义go-zero自带的jwt中间件
@server(
...
jwt: JwtAuth
)
service usercenter {
......
}
会在 go-zero-looklook/app/usercenter/cmd/api/internal/handler/routes.go里面添加一句
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Code generated by goctl. DO NOT EDIT.
package handler
........
server.AddRoutes(
[]rest.Route{
...
},
rest.WithJwt(serverCtx.Config.JwtAuth.AccessSecret),
...
)
}
|
用户发起请求资源 -> nginx网关->匹配到对应服务模块 -> auth模块->identity-api ->identity-rpc -> 用户请求的资源
用户服务
model中定义了Trans方法暴露事务给logic
在usercenter-rpc注册成功之后,需要请求token给前端登陆,直接在rigister内部生成获取token的logic实例并调用它的方法
1
2
3
4
5
6
7
8
|
//2、Generate the token, so that the service doesn't call rpc internally
generateTokenLogic :=NewGenerateTokenLogic(l.ctx,l.svcCtx)
tokenResp,err:=generateTokenLogic.GenerateToken(&usercenter.GenerateTokenReq{
UserId: userId,
})
if err != nil {
return nil, errors.Wrapf(ErrGenerateTokenError, "GenerateToken userId : %d", userId)
}
|
详见官网文档
民宿服务
详见官网文档
【小技巧】 mapreduce
【小技巧】model cache、singleflight
订单服务
详见官网文档
这里写了延迟队列的用法,rpc创建订单的同时生成一个asynq延迟消息,延迟消息时间到了就在go-zero-looklook/app/mqueue/cmd/job/internal/logic/closeOrder.go中进行处理,这里会查询order rpc该订单的状态,如果已经支付则消费消息直接返回nil,否则修改订单消息为取消。
支付服务
详见官网文档
这里查询order-rpc获取订单详情,并根据返回的totalPrice,description和openid(注册的时候获取,userId对应openid)生成微信预支付订单并返回给前端
当前端拿着微信预处理订单发起支付,用户输入密码支付成功后,微信服务器会回调服务器,回调地址在配置中填写
1
2
3
|
WxPayConf:
...
NotifyUrl : http://xxx.xxx.com/payment/v1/thirdPayment/thirdPaymentWxPayCallback
|
这里生成微信订单使用的时第三方库wechatpay-apiv3/wechatpay-go
github.com/wechatpay-apiv3/wechatpay-go/services/payments/jsapi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// 3、create wechat pay pre pay order
wxPayClient, err := svc.NewWxPayClientV3(l.svcCtx.Config)
if err != nil {
return nil, err
}
jsApiSvc := jsapi.JsapiApiService{Client: wxPayClient}
// Get the prepay_id, as well as the parameters and signatures needed to invoke the payment
resp, _, err := jsApiSvc.PrepayWithRequestPayment(l.ctx,
jsapi.PrepayRequest{
Appid: core.String(l.svcCtx.Config.WxMiniConf.AppId),
Mchid: core.String(l.svcCtx.Config.WxPayConf.MchId),
Description: core.String(description),
OutTradeNo: core.String(createPaymentResp.Sn),
Attach: core.String(description),
NotifyUrl: core.String(l.svcCtx.Config.WxPayConf.NotifyUrl),
Amount: &jsapi.Amount{
Total: core.Int64(totalPrice),
},
Payer: &jsapi.Payer{
Openid: core.String(openId),
},
},
)
if err != nil {
return nil, errors.Wrapf(ErrWxPayError, "Failed to initiate WeChat payment pre-order err : %v , userId: %d , orderSn:%s", err, userId, orderSn)
}
|
微信回调回来之后,调用verifyAndUpdateState 将流水单号改为已支付。
verifyAndUpdateState方法,查询单号是否存在,比对回调回来的金额与创建时候金额是否一致。这里不用在校验签名了,前一步的sdk已经做了处理了
支付回调成功之后,会给用户发送一个入驻码,去了商家那里要展示这个码,商家通过后台核对码,其实就是美团的样子,我们去美团下单,美团会给你个码,用户拿着这个码去入住或者消费等。
verifyAndUpdateState调用了rpc的UpdateTradeState方法,核心做了两件事情,第一是更新支付状态,第二通过go-queue向消息队列(kafka)发送了一条日志消息
UpdateHomestayOrderTradeState更改订单状态,在发送一条asynq给mqueue-job队列,让mqueue-job发送微信小程序模版消息给用户
消息-延迟-定时队列
asynq特点
- 直接基于redis,一般项目都有redis,而asynq本身就是基于redis所以可以少维护一个中间件
- 支持消息队列、延迟队列、定时任务调度 , 因为希望项目支持定时任务而asynq直接就支持
- 有webui界面,每个任务都可以暂停、归档、通过ui界面查看成功失败、监控
go-queue特点:kafka的吞吐量高
如何使用
这里只介绍kq,注意kq的topic需要你自己预先建立好
消费者
自己使用serviceGroup改造,目录结构还是延续api的基本差不多, 将handler改成了listen , 将logic换成了mqs
首先在etc/yaml里面添加kafka消费者客户端配置
1
2
3
4
5
6
7
8
9
10
|
#kq
PaymentUpdateStatusConf:
Name: PaymentUpdateStatus
Brokers:
- kafka:9092
Group: payment-update-paystatus-group
Topic: payment-update-paystatus-topic
Offset: first
Consumers: 1
Processors: 1
|
修改internal/config/config.go
1
2
3
4
5
|
type Config struct {
...
// kq : pub sub
PaymentUpdateStatusConf kq.KqConf
}
|
首先还是在main函数外设置配置文件
1
|
var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")
|
解析flag,新建config.Config对象,并载入文件
1
2
3
4
|
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
|
调用c.SetUp()
1
2
3
4
|
// log、prometheus、trace、metricsUrl.
if err := c.SetUp(); err != nil {
panic(err)
}
|
main中不直接使用rest.MustNewServer方法新建rest服务然后调用其Start()方法,而是通过ServiceGroup来管理服务
1
2
|
serviceGroup := service.NewServiceGroup()
defer serviceGroup.Stop()
|
serviceGroup管理的service是一个接口
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Service is the interface that groups Start and Stop methods.
Service interface {
Starter //Start
Stopper //Stop
}
Starter interface {
Start()
}
Stopper interface {
Stop()
}
|
只要你的服务实现了这2个接口,就可以加入到serviceGroup统一管理,正常api里面的server := rest.MustNewServer(c.RestConf)
也实现了这两个接口
internal/listen/listen.go相当于router.go,可以添加多个kq或者dq
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// back to all consumers
func Mqs(c config.Config) []service.Service {
svcContext := svc.NewServiceContext(c)
ctx := context.Background()
var services []service.Service
//kq :pub sub
services = append(services, KqMqs(c, ctx, svcContext)...)
return services
}
|
KqMqs函数在internal/listen/kqMqs.go里面,这相当于api里面的handler,每个handler可以对应多个kqService
1
2
3
4
5
6
7
8
9
|
//pub sub use kq (kafka)
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
return []service.Service{
//Listening for changes in consumption flow status
kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
//.....
}
}
|
kq.MustNewQueue函数签名为
1
|
func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue
|
ConsumeHandler是一个接口,用于消费消息
1
2
3
|
ConsumeHandler interface {
Consume(key, value string) error
}
|
生产者
首先在etc/yaml配置kq
1
2
3
4
5
|
#pay success notify order-mq for kq(kafka pub sub)
KqPaymentUpdatePayStatusConf:
Brokers:
- kafka:9092
Topic: payment-update-paystatus-topic
|
修改internal/config/config.go
1
2
3
4
|
type Config struct {
...
KqPaymentUpdatePayStatusConf KqConfig
}
|
修改internal/svc/serviceContext.go
1
2
3
4
5
6
7
8
9
10
11
|
type ServiceContext struct {
...
KqueuePaymentUpdatePayStatusClient *kq.Pusher
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
...
KqueuePaymentUpdatePayStatusClient: kq.NewPusher(c.KqPaymentUpdatePayStatusConf.Brokers,c.KqPaymentUpdatePayStatusConf.Topic),
}
}
|
后面就能在其他地方使用该客户端Push消息了,可以预先将需要序列化的消息结构体放到common目录下的子目录
1
2
3
4
|
func (l *UpdateTradeStateLogic) pubKqPaySuccess(orderSn string,payStatus int64) error{
...
return l.svcCtx.KqueuePaymentUpdatePayStatusClient.Push(string(body))
}
|
分布式事务
本项目服务划分相对独立一些,所以目前没有使用到分布式事务
错误处理
grpc的err对应的错误码其实就是一个uint32 , 我们自己定义错误用uint32然后在rpc的全局拦截器返回时候转成grpc的err,就可以了
所以我们自己定义全局错误码在app/common/xerr
日志收集
filebeat,kafka,go-stash和elk直接配置好容器并挂载好容器内配置就好了
链路追踪
go-zero底层已经帮我们把代码跟链路追踪对接的代码已经写好了,默认支持jaeger、zinpink。我们只需要在我们的业务代码配置中,也就是你的业务配置的yaml中配置参数即可。
服务监控
go-zero已经在代码中给我们集成好了prometheus。当我们启动api、rpc都会额外启动一个goroutine 提供prometheus的服务
order-mq这种使用serviceGroup管理的服务,在启动文件main中要显示调用一下SetUp才可以,api、rpc不需要,配置都一样
配置步骤
配置prometheus与grafana:容器和容器内配置
业务配置:只需要在业务配置文件中配置即可
部署环境搭建
gitlab : 放代码,可以做ci
jenkins:做cd发布项目
harbor : 镜像仓库
k8s : 运行服务
发布服务到k8s