Merge branch 'develop' of https://gitee.com/fusenpack/fusenapi into develop

This commit is contained in:
eson 2023-08-28 10:30:44 +08:00
commit 9b6396dfae
6 changed files with 108 additions and 107 deletions

View File

@ -2,6 +2,7 @@ package logic
import (
"fmt"
"fusenapi/model/gmodel"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
"fusenapi/utils/wevent"
@ -34,6 +35,56 @@ func NewUserEmailConfirmationLogic(ctx context.Context, svcCtx *svc.ServiceConte
// func (l *UserEmailConfirmationLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) {
// }
func FinishRegister(svcCtx *svc.ServiceContext, user *gmodel.FsUser, token *auth.RegisterToken) error {
// 创建签证
jwtToken, err := auth.GenerateJwtTokenUint64(
auth.StringToHash(*user.PasswordHash),
svcCtx.Config.Auth.AccessExpire,
time.Now().UTC().Unix(),
user.Id,
0,
)
if err != nil {
return err
}
event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId)
event.Data = wevent.DataEmailRegister{
JwtToken: jwtToken,
}
err = CommonNotify(svcCtx.Config.MainAddress, token.Wid, event)
if err != nil {
// logx.Error(err, token.TraceId)
return err
}
return nil
}
func CommonNotify(MainAddress, wid string, event *wevent.WebsocketEvent) error {
tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", MainAddress))
tp.SetBodyJson(requests.M{
"wid": wid,
"data": event,
})
wresp, err := tp.Execute()
if err != nil {
// logx.Error(err, token.TraceId)
return err
}
result := wresp.Json()
if result.Get("code").Int() != 200 {
// logx.Error(result.Get("message"))
return fmt.Errorf("%s", result.Get("message").Str)
}
return nil
}
func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEmailConfirmation, userinfo *auth.UserInfo) (resp *basic.Response) {
// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data)
// userinfo 传入值时, 一定不为null
@ -59,43 +110,8 @@ func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEma
return resp.SetStatus(basic.CodeDbSqlErr)
}
// 创建签证
jwtToken, err := auth.GenerateJwtTokenUint64(
auth.StringToHash(*user.PasswordHash),
l.svcCtx.Config.Auth.AccessExpire,
time.Now().Unix(),
user.Id,
0,
)
if err != nil {
logx.Error(err, token.TraceId)
return
}
event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId)
event.Data = wevent.DataEmailRegister{
JwtToken: jwtToken,
}
tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", l.svcCtx.Config.MainAddress))
tp.SetBodyJson(requests.M{
"wid": token.Wid,
"data": event,
})
wresp, err := tp.Execute()
if err != nil {
logx.Error(err, token.TraceId)
return
}
result := wresp.Json()
if result.Get("code").Int() != 200 {
logx.Error(result.Get("message"))
return
}
logx.Info("success", token.TraceId, jwtToken)
FinishRegister(l.svcCtx, user, token)
logx.Info("success", token.TraceId)
case "facebook":
case "fusen":
@ -106,38 +122,8 @@ func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEma
return resp.SetStatus(basic.CodeDbSqlErr)
}
// 创建签证
jwtToken, err := auth.GenerateJwtTokenUint64(
auth.StringToHash(*user.PasswordHash),
l.svcCtx.Config.Auth.AccessExpire,
time.Now().Unix(),
user.Id,
0,
)
if err != nil {
logx.Error(err, token.TraceId)
return
}
event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId)
event.Data = wevent.DataEmailRegister{
JwtToken: jwtToken,
}
tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", l.svcCtx.Config.MainAddress))
tp.SetBodyJson(requests.M{
"wid": token.Wid,
"data": event,
})
wresp, err := tp.Execute()
if err != nil {
logx.Error(err, token.TraceId)
}
result := wresp.Json()
if result.Get("code").Int() != 200 {
logx.Error(result.Get("message"))
}
logx.Info("success", token.TraceId, jwtToken)
FinishRegister(l.svcCtx, user, token)
logx.Info("success", token.TraceId)
}
default:

View File

@ -16,7 +16,6 @@ type DataUserLogin struct {
type RequestUserRegister struct {
Wid string `json:"wid"` // websocket的id
GuestId int64 `json:"guest_id"` // 游客id
FirstName string `json:"first_name"` // 首名
LastName string `json:"last_name"` // 名
Resetaurant string `json:"resetaurant"` // 餐厅类型

View File

@ -37,7 +37,7 @@ type commonConnectionNotFoundDataCacheChanItem struct {
}
// 放入缓冲队列
func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCacheChanItem) {
func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) {
select {
case commonConnectionNotFoundDataCacheChan <- data:
return
@ -47,45 +47,41 @@ func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCac
}
// 取出元素
func (l *CommonNotifyLogic) popCommonCache() (data commonConnectionNotFoundDataCacheChanItem) {
func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) {
return <-commonConnectionNotFoundDataCacheChan
}
// 保证处理消息就一个循环在执行
var consumeCommonCacheData sync.Once
// 消费公共通知未处理的消息
// 消费公共通知未处理的消息(目前是轮巡方式,待优化)
func (l *CommonNotifyLogic) consumeCommonCacheData() {
//单例
consumeCommonCacheData.Do(func() {
tick := time.Tick(time.Millisecond * 200)
for {
select {
case <-tick: //200毫秒触发一次
info := l.popCommonCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
l.pushCommonCache(info)
continue
}
//否则直接丢弃消息
time.Sleep(time.Millisecond * 200)
info := l.popCommonNotifyCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
l.pushCommonNotifyCache(info)
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
}
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
//否则直接丢弃消息
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
}
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
}
})
}
@ -105,7 +101,7 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a
value, ok := mapConnPool.Load(req.Wid)
if !ok {
//没找到连接就放到公共缓冲队列
go l.pushCommonCache(commonConnectionNotFoundDataCacheChanItem{
go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{
retryTimes: 20, //重试20次
data: types.CommonNotifyReq{
Wid: req.Wid,

View File

@ -45,6 +45,8 @@ func (w *wsConnectItem) reuseLastConnect(data []byte) {
}
//是当前自己占用(无需处理)
if obj.uniqueId == w.uniqueId {
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)
w.sendToOutChan(rsp)
return
} else {
w.reuseLastConnErrResponse("the wid is used by other people")

View File

@ -12,25 +12,25 @@ import "basic.api"
service auth {
@handler UserLoginHandler
post /api/auth/login(RequestUserLogin) returns (response);
@handler UserRegisterHandler
post /api/auth/register(RequestUserRegister) returns (response);
@handler AcceptCookieHandler
post /api/auth/accept-cookie(request) returns (response);
@handler UserGoogleLoginHandler
get /api/auth/oauth2/login/google(RequestGoogleLogin) returns (response);
@handler UserEmailConfirmationHandler
get /api/auth/email/confirmation(RequestEmailConfirmation) returns (response);
@handler UserEmailRegisterHandler
post /api/auth/oauth2/register(RequestEmailRegister) returns (response);
@handler UserResetTokenHandler
get /api/auth/reset/token(RequestUserResetToken) returns (response);
@handler UserResetPasswordHandler
post /api/auth/reset/password(RequestUserResetPassword) returns (response);
}
@ -52,8 +52,7 @@ type (
type (
// UserAddAddressHandler 用户登录请求结构
RequestUserRegister {
Wid string `json:"wid"` // websocket的id
GuestId int64 `json:"guest_id"` // 游客id
Wid string `json:"wid"` // websocket的id
FirstName string `json:"first_name"` // 首名
LastName string `json:"last_name"` // 名

View File

@ -9,6 +9,10 @@ import (
"github.com/go-resty/resty/v2"
"github.com/zeromicro/go-zero/core/logc"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
)
func NewClient(ctx context.Context, c *Config) Client {
@ -18,6 +22,21 @@ func NewClient(ctx context.Context, c *Config) Client {
// 设置超时时间为 5 分钟
client.SetTimeout(5 * time.Minute)
/* 传输链路 */
tracer := otel.GetTracerProvider().Tracer(trace.TraceName)
spanCtx, span := tracer.Start(
ctx,
"client_resty_send",
oteltrace.WithSpanKind(oteltrace.SpanKindClient),
)
carrier := &propagation.HeaderCarrier{}
otel.GetTextMapPropagator().Inject(spanCtx, carrier)
for _, cacarrierKey := range carrier.Keys() {
client.SetHeader(cacarrierKey, carrier.Get(cacarrierKey))
}
defer span.End()
/* 传输链路 */
if c.HeaderData != nil {
for k, v := range c.HeaderData {
client = client.SetHeader(k, v)