diff --git a/server/auth/internal/logic/useremailconfirmationlogic.go b/server/auth/internal/logic/useremailconfirmationlogic.go index 163ca27b..07c7d72c 100644 --- a/server/auth/internal/logic/useremailconfirmationlogic.go +++ b/server/auth/internal/logic/useremailconfirmationlogic.go @@ -2,6 +2,7 @@ package logic import ( "fmt" + "fusenapi/model/gmodel" "fusenapi/utils/auth" "fusenapi/utils/basic" "fusenapi/utils/wevent" @@ -34,6 +35,56 @@ func NewUserEmailConfirmationLogic(ctx context.Context, svcCtx *svc.ServiceConte // func (l *UserEmailConfirmationLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { // } +func FinishRegister(svcCtx *svc.ServiceContext, user *gmodel.FsUser, token *auth.RegisterToken) error { + // 创建签证 + jwtToken, err := auth.GenerateJwtTokenUint64( + auth.StringToHash(*user.PasswordHash), + svcCtx.Config.Auth.AccessExpire, + time.Now().UTC().Unix(), + user.Id, + 0, + ) + + if err != nil { + + return err + } + + event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId) + event.Data = wevent.DataEmailRegister{ + JwtToken: jwtToken, + } + err = CommonNotify(svcCtx.Config.MainAddress, token.Wid, event) + if err != nil { + // logx.Error(err, token.TraceId) + return err + } + + return nil +} + +func CommonNotify(MainAddress, wid string, event *wevent.WebsocketEvent) error { + tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", MainAddress)) + tp.SetBodyJson(requests.M{ + "wid": wid, + "data": event, + }) + + wresp, err := tp.Execute() + if err != nil { + // logx.Error(err, token.TraceId) + return err + } + + result := wresp.Json() + if result.Get("code").Int() != 200 { + // logx.Error(result.Get("message")) + return fmt.Errorf("%s", result.Get("message").Str) + } + + return nil +} + func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEmailConfirmation, userinfo *auth.UserInfo) (resp *basic.Response) { // 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data) // userinfo 传入值时, 一定不为null @@ -59,43 +110,8 @@ func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEma return resp.SetStatus(basic.CodeDbSqlErr) } - // 创建签证 - jwtToken, err := auth.GenerateJwtTokenUint64( - auth.StringToHash(*user.PasswordHash), - l.svcCtx.Config.Auth.AccessExpire, - time.Now().Unix(), - user.Id, - 0, - ) - - if err != nil { - logx.Error(err, token.TraceId) - return - } - - event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId) - event.Data = wevent.DataEmailRegister{ - JwtToken: jwtToken, - } - - tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", l.svcCtx.Config.MainAddress)) - tp.SetBodyJson(requests.M{ - "wid": token.Wid, - "data": event, - }) - - wresp, err := tp.Execute() - if err != nil { - logx.Error(err, token.TraceId) - return - } - - result := wresp.Json() - if result.Get("code").Int() != 200 { - logx.Error(result.Get("message")) - return - } - logx.Info("success", token.TraceId, jwtToken) + FinishRegister(l.svcCtx, user, token) + logx.Info("success", token.TraceId) case "facebook": case "fusen": @@ -106,38 +122,8 @@ func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEma return resp.SetStatus(basic.CodeDbSqlErr) } - // 创建签证 - jwtToken, err := auth.GenerateJwtTokenUint64( - auth.StringToHash(*user.PasswordHash), - l.svcCtx.Config.Auth.AccessExpire, - time.Now().Unix(), - user.Id, - 0, - ) - - if err != nil { - logx.Error(err, token.TraceId) - return - } - - event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId) - event.Data = wevent.DataEmailRegister{ - JwtToken: jwtToken, - } - tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", l.svcCtx.Config.MainAddress)) - tp.SetBodyJson(requests.M{ - "wid": token.Wid, - "data": event, - }) - wresp, err := tp.Execute() - if err != nil { - logx.Error(err, token.TraceId) - } - result := wresp.Json() - if result.Get("code").Int() != 200 { - logx.Error(result.Get("message")) - } - logx.Info("success", token.TraceId, jwtToken) + FinishRegister(l.svcCtx, user, token) + logx.Info("success", token.TraceId) } default: diff --git a/server/auth/internal/types/types.go b/server/auth/internal/types/types.go index 583cb8bc..5b25891c 100644 --- a/server/auth/internal/types/types.go +++ b/server/auth/internal/types/types.go @@ -16,7 +16,6 @@ type DataUserLogin struct { type RequestUserRegister struct { Wid string `json:"wid"` // websocket的id - GuestId int64 `json:"guest_id"` // 游客id FirstName string `json:"first_name"` // 首名 LastName string `json:"last_name"` // 名 Resetaurant string `json:"resetaurant"` // 餐厅类型 diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 2e5e6e65..72abe5b6 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -37,7 +37,7 @@ type commonConnectionNotFoundDataCacheChanItem struct { } // 放入缓冲队列 -func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCacheChanItem) { +func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { select { case commonConnectionNotFoundDataCacheChan <- data: return @@ -47,45 +47,41 @@ func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCac } // 取出元素 -func (l *CommonNotifyLogic) popCommonCache() (data commonConnectionNotFoundDataCacheChanItem) { +func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { return <-commonConnectionNotFoundDataCacheChan } // 保证处理消息就一个循环在执行 var consumeCommonCacheData sync.Once -// 消费公共通知未处理的消息 +// 消费公共通知未处理的消息(目前是轮巡方式,待优化) func (l *CommonNotifyLogic) consumeCommonCacheData() { //单例 consumeCommonCacheData.Do(func() { - tick := time.Tick(time.Millisecond * 200) for { - select { - case <-tick: //200毫秒触发一次 - info := l.popCommonCache() - //查询websocket连接 - value, ok := mapConnPool.Load(info.data.Wid) - //没有连接 - if !ok { - info.retryTimes-- - //大于0,则放回队列 - if info.retryTimes > 0 { - l.pushCommonCache(info) - continue - } - //否则直接丢弃消息 + time.Sleep(time.Millisecond * 200) + info := l.popCommonNotifyCache() + //查询websocket连接 + value, ok := mapConnPool.Load(info.data.Wid) + //没有连接 + if !ok { + info.retryTimes-- + //大于0,则放回队列 + if info.retryTimes > 0 { + l.pushCommonNotifyCache(info) continue } - //断言连接 - ws, ok := value.(wsConnectItem) - if !ok { - logx.Error("渲染回调断言websocket连接失败") - continue - } - //发送 - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) + //否则直接丢弃消息 + continue } - + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + continue + } + //发送 + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) } }) } @@ -105,7 +101,7 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a value, ok := mapConnPool.Load(req.Wid) if !ok { //没找到连接就放到公共缓冲队列 - go l.pushCommonCache(commonConnectionNotFoundDataCacheChanItem{ + go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ retryTimes: 20, //重试20次 data: types.CommonNotifyReq{ Wid: req.Wid, diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go index bab5d61b..0790024e 100644 --- a/server/websocket/internal/logic/ws_reuse_last_connect.go +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -45,6 +45,8 @@ func (w *wsConnectItem) reuseLastConnect(data []byte) { } //是当前自己占用(无需处理) if obj.uniqueId == w.uniqueId { + rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) + w.sendToOutChan(rsp) return } else { w.reuseLastConnErrResponse("the wid is used by other people") diff --git a/server_api/auth.api b/server_api/auth.api index a9b221d4..564df270 100644 --- a/server_api/auth.api +++ b/server_api/auth.api @@ -12,25 +12,25 @@ import "basic.api" service auth { @handler UserLoginHandler post /api/auth/login(RequestUserLogin) returns (response); - + @handler UserRegisterHandler post /api/auth/register(RequestUserRegister) returns (response); - + @handler AcceptCookieHandler post /api/auth/accept-cookie(request) returns (response); - + @handler UserGoogleLoginHandler get /api/auth/oauth2/login/google(RequestGoogleLogin) returns (response); - + @handler UserEmailConfirmationHandler get /api/auth/email/confirmation(RequestEmailConfirmation) returns (response); - + @handler UserEmailRegisterHandler post /api/auth/oauth2/register(RequestEmailRegister) returns (response); - + @handler UserResetTokenHandler get /api/auth/reset/token(RequestUserResetToken) returns (response); - + @handler UserResetPasswordHandler post /api/auth/reset/password(RequestUserResetPassword) returns (response); } @@ -52,8 +52,7 @@ type ( type ( // UserAddAddressHandler 用户登录请求结构 RequestUserRegister { - Wid string `json:"wid"` // websocket的id - GuestId int64 `json:"guest_id"` // 游客id + Wid string `json:"wid"` // websocket的id FirstName string `json:"first_name"` // 首名 LastName string `json:"last_name"` // 名 diff --git a/utils/curl/client_resty.go b/utils/curl/client_resty.go index 6cb0423f..022892b1 100644 --- a/utils/curl/client_resty.go +++ b/utils/curl/client_resty.go @@ -9,6 +9,10 @@ import ( "github.com/go-resty/resty/v2" "github.com/zeromicro/go-zero/core/logc" "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/trace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + oteltrace "go.opentelemetry.io/otel/trace" ) func NewClient(ctx context.Context, c *Config) Client { @@ -18,6 +22,21 @@ func NewClient(ctx context.Context, c *Config) Client { // 设置超时时间为 5 分钟 client.SetTimeout(5 * time.Minute) + /* 传输链路 */ + tracer := otel.GetTracerProvider().Tracer(trace.TraceName) + spanCtx, span := tracer.Start( + ctx, + "client_resty_send", + oteltrace.WithSpanKind(oteltrace.SpanKindClient), + ) + carrier := &propagation.HeaderCarrier{} + otel.GetTextMapPropagator().Inject(spanCtx, carrier) + for _, cacarrierKey := range carrier.Keys() { + client.SetHeader(cacarrierKey, carrier.Get(cacarrierKey)) + } + defer span.End() + /* 传输链路 */ + if c.HeaderData != nil { for k, v := range c.HeaderData { client = client.SetHeader(k, v)