Contents

golang-go-zero-教程-Looklook

go-zero官网

go-zero详细文档

go-zero looklook github

本系列为作者跟着Mikaelemmmm的b站教学视频学习时做的笔记

looklook文档

Mikaelemmmm哥用爱发电,希望大家给looklook项目点个star

项目架构图和业务架构图

https://github.com/Mikaelemmmm/go-zero-looklook/raw/main/doc/chinese/images/1/gozerolooklook.png

https://github.com/Mikaelemmmm/go-zero-looklook/raw/main/doc/chinese/images/1/go-zero-looklook-service.png

zerolooklook使用的中间件

zeromicro/go-queue里面的dq,kq和rabbitmq

dq基于redis,kq基于kafka,rabbitmq基于rabbitmq。都是封装好的消息队列使用很方便。

dq的asynq可以做延时任务,基于cron库,调用schedule模块来布置定时任务。用于分布式高可用,防止单点故障(相对直接使用cron布置任务)

looklook架构讲解

线上部署

cdn=》防火墙(可以是云服务自带的防火墙直接配)=》负载均衡=》nginx集群=》k8s集群(api:使用goctl生成的k8s yaml文件会暴露出访问端口,直接把端口配置到nginx里面,另外api内部调用rpc的服务;rpc服务不暴露端口给外面,仅在k8s集群内部的网段开放服务,供api调用和调用内部中间件;mq,job,schedule也只被rpc调用不暴露给外面)=》使用filebeat来获取日志并发给kafka=》kafka将存储日志消息=》go-stash消费并过滤日志消息并push到elasticsearch

jeager做链路监控,prometheus做服务器监控,grafana查看Prometheus监控信息

项目目录讲解

app里面是所有的业务

common里面放常用的定制组件

deploy是部署相关的文件

doc是项目的文档

docker-compose.yaml和docker-compose-env.yaml分别是业务容器和环境容器的docker-compose文件

部署过程

项目文档

启动过程中es可能会启动比较慢,依赖es的kibana,jeager也起不来。

mysql在windows可能因为权限起不来

日志收集使用filebeat搜集容器文件夹内容,然后输出给kafka,所以需要进入kafka创建3个topic,对应业务架构里面支付成功通知所有订阅者(2个)和项目架构的log搜集(1个)。

1
2
3
4
docker exec -it kafka /bin/sh
cd /opt/kafka/bin/
./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 -partitions 1 --topic looklook-log
./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 -partitions 1 --topic payment-update-paystatus-topic

mysql的容器初次使用时需要配置允许远程连接

1
2
3
4
5
docker exec -it mysql mysql -uroot -p
##输入密码:PXDN93VRKUm8TeE7
use mysql;
update user set host='%' where user='root';
FLUSH PRIVILEGES;

并在mysql中导入sql文件来构建表

modd的img需要你配置modd.conf,配置很简单

1
2
3
4
5
#usercenter
app/usercenter/cmd/rpc/**/*.go {
    prep: go build -o data/server/usercenter-rpc  -v app/usercenter/cmd/rpc/usercenter.go
    daemon +sigkill: ./data/server/usercenter-rpc -f app/usercenter/cmd/rpc/etc/usercenter.yaml
}

app/usercenter/cmd/rpc/**/*.go表示这些文件变化就出发下面的代码块

代码块里面按格式写就可以重新构建项目,实现热重载

查看looklook容器日志

1
docker logs -f looklook

go-stash可能会比kafka启动的早所以如果日志收集出问题可以选择重启一下go-stash

定时任务

app/mqueue/scheduler/logic/settleRecordJob.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (l *MqueueScheduler) settleRecordScheduler()  {

	task := asynq.NewTask(jobtype.ScheduleSettleRecord, nil)
	// every one minute exec
	entryID, err := l.svcCtx.Scheduler.Register("*/1 * * * *", task)
	if err != nil {
		logx.WithContext(l.ctx).Errorf("!!!MqueueSchedulerErr!!! ====> 【settleRecordScheduler】 registered  err:%+v , task:%+v",err,task)
	}
	fmt.Printf("【settleRecordScheduler】 registered an  entry: %q \n", entryID)
}

自定义jwt验证不通过响应

在生成server的语句里面添加选项WithUnauthorizedCallback

1
2
3
server := rest.MustNewServer(c.RestConf, rest.WithUnauthorizedCallback(func(w http.ResponseWriter, r *http.Request, err error) {
  w.WriteHeader(200)
}))

但是通常前端来处理返回给用户的结果,后端只需要直接将错误代码发给前端。

微信支付api

这个api可以直接用,线上测试没问题

技术栈过多的问题

Mikaelmmmm希望做一个大而全的后端项目。

如果觉得技术栈太多,可以在docker-compose里面将不熟悉的内容都注释掉,不跑一些中间件。looklook的模块划分的很好,把相关的代码注释掉即可。

官方文档笔记

1.开发环境部署

项目介绍

本项目开发环境推荐docker-compose,使用直链方式,测试、线上部署使用k8s

  • app:所有业务代码包含api、rpc以及mq(消息队列、延迟队列、定时任务)
  • common:通用组件 error、middleware、interceptor、tool、ctxdata等
  • data:该项目包含该目录依赖所有中间件(mysql、es、redis、grafana等)产生的数据。
  • deploy:
    • filebeat: docker部署filebeat配置
    • go-stash:go-stash配置
    • nginx: nginx网关配置
    • prometheus : prometheus配置
    • goctl: 该项目goctl的template
  • doc : 该项目系列文档
  • modd.conf : modd热加载配置文件

技术栈

k8s,go-zero,nginx网关,filebeat,kafka,go-stash,elasticsearch,kibana,prometheus,grafana,jaeger,go-queue,asynq,asynqmon,dtm,docker,docker-compose,mysql,redis,modd,jenkins,gitlab,harbor

nginx网关

容器内部nginx端口是8081,使用docker暴露出去8888映射端口8081,这样外部通过8888来访问网关

使用location来匹配每个服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
server{
      listen 8081;
      access_log /var/log/nginx/looklook.com_access.log;
      error_log /var/log/nginx/looklook.com_error.log;


      location ~ /order/ {
           proxy_set_header Host $http_host;
           proxy_set_header X-Real-IP $remote_addr;
           proxy_set_header REMOTE-HOST $remote_addr;
           proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
           proxy_pass http://looklook:1001;
      }
}

通过go-zero自带的jwt鉴权之后,我们可以在ctx中拿到userId,

1
2
3
4
func (l *DetailLogic) Detail(req types.UserInfoReq) (*types.UserInfoResp, error) {
	userId := ctxdata.GetUidFromCtx(l.ctx)
	......
}

鉴权服务

go-zero官方文档

go-zero从jwt token解析后会将用户生成token时传入的kv原封不动的放在http.Request的Context中,因此我们可以通过Context就可以拿到你想要的值

在desc/api文件中定义go-zero自带的jwt中间件

@server(
	...
	jwt: JwtAuth
)
service usercenter {
	......
}

会在 go-zero-looklook/app/usercenter/cmd/api/internal/handler/routes.go里面添加一句

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Code generated by goctl. DO NOT EDIT.
package handler

	........

	server.AddRoutes(
		[]rest.Route{
			...
		},
		rest.WithJwt(serverCtx.Config.JwtAuth.AccessSecret),
		...
	)
}

用户发起请求资源 -> nginx网关->匹配到对应服务模块 -> auth模块->identity-api ->identity-rpc -> 用户请求的资源

用户服务

model中定义了Trans方法暴露事务给logic

在usercenter-rpc注册成功之后,需要请求token给前端登陆,直接在rigister内部生成获取token的logic实例并调用它的方法

1
2
3
4
5
6
7
8
//2、Generate the token, so that the service doesn't call rpc internally
generateTokenLogic :=NewGenerateTokenLogic(l.ctx,l.svcCtx)
tokenResp,err:=generateTokenLogic.GenerateToken(&usercenter.GenerateTokenReq{
  UserId: userId,
})
if err != nil {
  return nil, errors.Wrapf(ErrGenerateTokenError, "GenerateToken userId : %d", userId)
}

详见官网文档

民宿服务

详见官网文档

【小技巧】 mapreduce

【小技巧】model cache、singleflight

订单服务

详见官网文档

这里写了延迟队列的用法,rpc创建订单的同时生成一个asynq延迟消息,延迟消息时间到了就在go-zero-looklook/app/mqueue/cmd/job/internal/logic/closeOrder.go中进行处理,这里会查询order rpc该订单的状态,如果已经支付则消费消息直接返回nil,否则修改订单消息为取消。

支付服务

详见官网文档

这里查询order-rpc获取订单详情,并根据返回的totalPrice,description和openid(注册的时候获取,userId对应openid)生成微信预支付订单并返回给前端

当前端拿着微信预处理订单发起支付,用户输入密码支付成功后,微信服务器会回调服务器,回调地址在配置中填写

1
2
3
WxPayConf:
  ...
  NotifyUrl : http://xxx.xxx.com/payment/v1/thirdPayment/thirdPaymentWxPayCallback

这里生成微信订单使用的时第三方库wechatpay-apiv3/wechatpay-go

github.com/wechatpay-apiv3/wechatpay-go/services/payments/jsapi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 3、create wechat pay pre pay order

wxPayClient, err := svc.NewWxPayClientV3(l.svcCtx.Config)
if err != nil {
  return nil, err
}
jsApiSvc := jsapi.JsapiApiService{Client: wxPayClient}

// Get the prepay_id, as well as the parameters and signatures needed to invoke the payment
resp, _, err := jsApiSvc.PrepayWithRequestPayment(l.ctx,
  jsapi.PrepayRequest{
    Appid:       core.String(l.svcCtx.Config.WxMiniConf.AppId),
    Mchid:       core.String(l.svcCtx.Config.WxPayConf.MchId),
    Description: core.String(description),
    OutTradeNo:  core.String(createPaymentResp.Sn),
    Attach:      core.String(description),
    NotifyUrl:   core.String(l.svcCtx.Config.WxPayConf.NotifyUrl),
    Amount: &jsapi.Amount{
      Total: core.Int64(totalPrice),
    },
    Payer: &jsapi.Payer{
      Openid: core.String(openId),
    },
  },
)
if err != nil {
  return nil, errors.Wrapf(ErrWxPayError, "Failed to initiate WeChat payment pre-order err : %v , userId: %d , orderSn:%s", err, userId, orderSn)
}

微信回调回来之后,调用verifyAndUpdateState 将流水单号改为已支付。

verifyAndUpdateState方法,查询单号是否存在,比对回调回来的金额与创建时候金额是否一致。这里不用在校验签名了,前一步的sdk已经做了处理了

支付回调成功之后,会给用户发送一个入驻码,去了商家那里要展示这个码,商家通过后台核对码,其实就是美团的样子,我们去美团下单,美团会给你个码,用户拿着这个码去入住或者消费等。

verifyAndUpdateState调用了rpc的UpdateTradeState方法,核心做了两件事情,第一是更新支付状态,第二通过go-queue向消息队列(kafka)发送了一条日志消息

UpdateHomestayOrderTradeState更改订单状态,在发送一条asynq给mqueue-job队列,让mqueue-job发送微信小程序模版消息给用户

消息-延迟-定时队列

asynq特点

  • 直接基于redis,一般项目都有redis,而asynq本身就是基于redis所以可以少维护一个中间件
  • 支持消息队列、延迟队列、定时任务调度 , 因为希望项目支持定时任务而asynq直接就支持
  • 有webui界面,每个任务都可以暂停、归档、通过ui界面查看成功失败、监控

go-queue特点:kafka的吞吐量高

如何使用

这里只介绍kq,注意kq的topic需要你自己预先建立好

消费者

自己使用serviceGroup改造,目录结构还是延续api的基本差不多, 将handler改成了listen , 将logic换成了mqs

首先在etc/yaml里面添加kafka消费者客户端配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#kq
PaymentUpdateStatusConf:
  Name: PaymentUpdateStatus
  Brokers:
    - kafka:9092
  Group: payment-update-paystatus-group
  Topic: payment-update-paystatus-topic
  Offset: first
  Consumers: 1
  Processors: 1

修改internal/config/config.go

1
2
3
4
5
type Config struct {
	...
	// kq : pub sub
	PaymentUpdateStatusConf kq.KqConf
}

首先还是在main函数外设置配置文件

1
var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")

解析flag,新建config.Config对象,并载入文件

1
2
3
4
flag.Parse()
var c config.Config

conf.MustLoad(*configFile, &c)

调用c.SetUp()

1
2
3
4
// log、prometheus、trace、metricsUrl.
if err := c.SetUp(); err != nil {
  panic(err)
}

main中不直接使用rest.MustNewServer方法新建rest服务然后调用其Start()方法,而是通过ServiceGroup来管理服务

1
2
serviceGroup := service.NewServiceGroup()
defer serviceGroup.Stop()

serviceGroup管理的service是一个接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Service is the interface that groups Start and Stop methods.
Service interface {
  Starter //Start
  Stopper //Stop
}

Starter interface {
  Start()
}

Stopper interface {
  Stop()
}

只要你的服务实现了这2个接口,就可以加入到serviceGroup统一管理,正常api里面的server := rest.MustNewServer(c.RestConf)也实现了这两个接口

internal/listen/listen.go相当于router.go,可以添加多个kq或者dq

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// back to all consumers
func Mqs(c config.Config) []service.Service {

	svcContext := svc.NewServiceContext(c)
	ctx := context.Background()

	var services []service.Service

	//kq :pub sub
	services = append(services, KqMqs(c, ctx, svcContext)...)

	return services
}

KqMqs函数在internal/listen/kqMqs.go里面,这相当于api里面的handler,每个handler可以对应多个kqService

1
2
3
4
5
6
7
8
9
//pub sub use kq (kafka)
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {

	return []service.Service{
		//Listening for changes in consumption flow status
		kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
		//.....
	}
}

kq.MustNewQueue函数签名为

1
func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue

ConsumeHandler是一个接口,用于消费消息

1
2
3
ConsumeHandler interface {
  Consume(key, value string) error
}
生产者

首先在etc/yaml配置kq

1
2
3
4
5
#pay success notify order-mq for kq(kafka pub sub)
KqPaymentUpdatePayStatusConf:
  Brokers:
    - kafka:9092
  Topic: payment-update-paystatus-topic

修改internal/config/config.go

1
2
3
4
type Config struct {
	...
	KqPaymentUpdatePayStatusConf KqConfig
}

修改internal/svc/serviceContext.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type ServiceContext struct {
	...
	KqueuePaymentUpdatePayStatusClient *kq.Pusher
}

func NewServiceContext(c config.Config) *ServiceContext {
	return &ServiceContext{
		...
		KqueuePaymentUpdatePayStatusClient: kq.NewPusher(c.KqPaymentUpdatePayStatusConf.Brokers,c.KqPaymentUpdatePayStatusConf.Topic),
	}
}

后面就能在其他地方使用该客户端Push消息了,可以预先将需要序列化的消息结构体放到common目录下的子目录

1
2
3
4
func (l *UpdateTradeStateLogic) pubKqPaySuccess(orderSn string,payStatus int64) error{
	...
	return  l.svcCtx.KqueuePaymentUpdatePayStatusClient.Push(string(body))
}

分布式事务

本项目服务划分相对独立一些,所以目前没有使用到分布式事务

错误处理

grpc的err对应的错误码其实就是一个uint32 , 我们自己定义错误用uint32然后在rpc的全局拦截器返回时候转成grpc的err,就可以了

所以我们自己定义全局错误码在app/common/xerr

日志收集

filebeat,kafka,go-stash和elk直接配置好容器并挂载好容器内配置就好了

链路追踪

go-zero底层已经帮我们把代码跟链路追踪对接的代码已经写好了,默认支持jaeger、zinpink。我们只需要在我们的业务代码配置中,也就是你的业务配置的yaml中配置参数即可。

服务监控

go-zero已经在代码中给我们集成好了prometheus。当我们启动api、rpc都会额外启动一个goroutine 提供prometheus的服务

order-mq这种使用serviceGroup管理的服务,在启动文件main中要显示调用一下SetUp才可以,api、rpc不需要,配置都一样

配置步骤

配置prometheus与grafana:容器和容器内配置

业务配置:只需要在业务配置文件中配置即可

部署环境搭建

gitlab : 放代码,可以做ci

jenkins:做cd发布项目

harbor : 镜像仓库

k8s : 运行服务

发布服务到k8s

 |