Merge branch 'develop' of https://gitee.com/fusenpack/fusenapi into feature/auth

This commit is contained in:
eson 2023-07-30 15:54:34 +08:00
commit 12f85aecd7
12 changed files with 170 additions and 14 deletions

17
constants/rabbitmq.go Normal file
View File

@ -0,0 +1,17 @@
package constants
type RABBIT_MQ string
// 消息队列队列名
const (
//组装渲染数据队列
RABBIT_MQ_ASSEMBLE_RENDER_DATA RABBIT_MQ = "RABBIT_MQ_ASSEMBLE_RENDER_DATA"
//渲染结果数据队列
RABBIT_MQ_RENDER_RESULT_DATA RABBIT_MQ = "RABBIT_MQ_RENDER_RESULT_DATA"
)
// 队列列表
var MQ_QUEUE_ARR = []RABBIT_MQ{
RABBIT_MQ_ASSEMBLE_RENDER_DATA,
RABBIT_MQ_RENDER_RESULT_DATA,
}

1
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/hashicorp/raft v1.5.0
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/streadway/amqp v1.1.0
github.com/stripe/stripe-go/v74 v74.26.0
github.com/zeromicro/go-zero v1.5.4
golang.org/x/image v0.0.0-20190802002840-cff245a6509b

2
go.sum
View File

@ -295,6 +295,8 @@ github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDq
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=

View File

@ -6,6 +6,7 @@ type Config struct {
rest.RestConf
SourceMysql string
Auth types.Auth
SourceRabbitMq string
{{.auth}}
{{.jwtTrans}}
}

View File

@ -21,6 +21,7 @@ type ServiceContext struct {
SharedState *fsm.StateCluster
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen
RabbitMq *initalize.RabbitMqHandle
}
func NewServiceContext(c {{.config}}) *ServiceContext {
@ -32,6 +33,7 @@ func NewServiceContext(c {{.config}}) *ServiceContext {
MysqlConn: conn,
SharedState: StateServer,
AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)),
RabbitMq:initalize.InitRabbitMq(c.SourceRabbitMq, nil),
{{.middlewareAssignment}}
}
}

View File

@ -2,7 +2,8 @@ Name: {{.serviceName}}
Host: {{.host}}
Port: {{.port}}
SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest
SourceRabbitMq: amqp://rabbit001:rabbit001129@110.41.19.98:5672
Auth:
AccessSecret: fusen2023
AccessExpire: 2592000
RefreshAfter: 1592000
RefreshAfter: 1592000

120
initalize/rabbitmq.go Normal file
View File

@ -0,0 +1,120 @@
package initalize
import (
"crypto/tls"
"errors"
"fusenapi/constants"
"github.com/streadway/amqp"
"github.com/zeromicro/go-zero/core/logx"
"log"
)
type RabbitMqHandle struct {
}
// 连接属性
type queueItem struct {
ch *amqp.Channel
queue amqp.Queue
}
// 存储连接
var mapMq = make(map[constants.RABBIT_MQ]queueItem)
func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
if url == "" {
return nil
}
conn, err := amqp.DialTLS(url, config)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
// 创建一个通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
//声明队列
for _, queueName := range constants.MQ_QUEUE_ARR {
q, err := ch.QueueDeclare(
string(queueName), // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
mapMq[queueName] = queueItem{
ch: ch,
queue: q,
}
}
return &RabbitMqHandle{}
}
// 发送消息
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
object, ok := mapMq[queueName]
if !ok {
return errors.New("unknown queue")
}
// 发送消息到队列
return object.ch.Publish(
"", // exchange如果为空则使用默认交换机
object.queue.Name, // routing key将消息发送到指定队列
false, // 是否等待服务器响应
false, // 是否立即发送
amqp.Publishing{
ContentType: "text/plain", //普通文本
Body: message,
}, // 消息内容
)
}
// 消费消息
func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(data []byte) error) error {
object, ok := mapMq[queueName]
if !ok {
return errors.New("unknown queue")
}
msgs, err := object.ch.Consume(
object.queue.Name, // 队列名
object.queue.Name, // 消费者名,如果为空,则是随机生成一个
false, // 自动应答
false, // 是否排他
false, // 是否阻塞
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
//允许20的并发
limit := make(chan struct{}, 20)
defer close(limit)
// 消费消息
for msg := range msgs {
limit <- struct{}{}
go func(m amqp.Delivery) {
if err := recover(); err != nil {
logx.Error(err)
}
defer func() {
<-limit
}()
if err = handleFunc(m.Body); err != nil {
logx.Error("failed to deal with MQ message:", string(m.Body))
return
}
if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
logx.Error("failed to ack mq to delete")
} else {
log.Printf("Consume Mq message success: %s", m.Body)
}
}(msg)
}
return nil
}

View File

@ -5,4 +5,5 @@ SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest
Auth:
AccessSecret: fusen2023
AccessExpire: 2592000
RefreshAfter: 1592000
RefreshAfter: 1592000
SourceRabbitMq: amqp://rabbit001:rabbit001129@110.41.19.98:5672

View File

@ -8,6 +8,7 @@ import (
type Config struct {
rest.RestConf
SourceMysql string
Auth types.Auth
SourceMysql string
Auth types.Auth
SourceRabbitMq string
}

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"fusenapi/constants"
"fusenapi/initalize"
"fusenapi/server/websocket/internal/types"
"fusenapi/utils/auth"
"fusenapi/utils/id_generator"
@ -64,13 +65,14 @@ var (
// 每个连接的连接基本属性
type wsConnectItem struct {
conn *websocket.Conn //websocket的连接
closeChan chan struct{} //ws连接关闭chan
isClose bool //是否已经关闭
uniqueId uint64 //ws连接唯一标识
inChan chan []byte //接受消息缓冲通道
outChan chan []byte //发送回客户端的消息
mutex sync.Mutex //互斥锁
renderProperty renderProperty //扩展云渲染属性
rabbitMq *initalize.RabbitMqHandle
closeChan chan struct{} //ws连接关闭chan
isClose bool //是否已经关闭
uniqueId uint64 //ws连接唯一标识
inChan chan []byte //接受消息缓冲通道
outChan chan []byte //发送回客户端的消息
mutex sync.Mutex //互斥锁
renderProperty renderProperty //扩展云渲染属性
}
func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.ResponseWriter, r *http.Request) {
@ -100,6 +102,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
uniqueId := websocketIdGenerator.Get()
ws := wsConnectItem{
conn: conn,
rabbitMq: l.svcCtx.RabbitMq,
uniqueId: uniqueId,
closeChan: make(chan struct{}, 1),
inChan: make(chan []byte, 1000),
@ -272,7 +275,7 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) {
switch parseInfo.T {
//图片渲染
case constants.WEBSOCKET_RENDER_IMAGE:
go w.SendToCloudRender(d)
w.SendToCloudRender(d)
default:
}

View File

@ -59,7 +59,12 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) {
Option: 1, //0删除 1添加
Key: key,
}
// TODO 数据发送给云渲染服务器
//发送给对应的流水线组装数据
if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil {
logx.Error("发送渲染任务数据到MQ失败:", string(data))
continue
}
logx.Info("发送渲染数据到rabbitmq成功:", string(data))
}
}
}

View File

@ -21,6 +21,7 @@ type ServiceContext struct {
MysqlConn *gorm.DB
AllModels *gmodel.AllModelsGen
RabbitMq *initalize.RabbitMqHandle
}
func NewServiceContext(c config.Config) *ServiceContext {
@ -29,9 +30,10 @@ func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
MysqlConn: conn,
SharedState: StateServer,
MysqlConn: initalize.InitMysql(c.SourceMysql),
AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)),
RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil),
}
}
func (svcCtx *ServiceContext) ParseJwtToken(r *http.Request) (jwt.MapClaims, error) {