226 lines
4.5 KiB
Go
226 lines
4.5 KiB
Go
package fusenrender
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/474420502/requests"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/lni/dragonboat/v4"
|
|
"github.com/lni/dragonboat/v4/client"
|
|
)
|
|
|
|
var UidCreater = NewUniqueId(1)
|
|
var PopChannel chan *QueueItem = make(chan *QueueItem, 3) // chan *QueueItem = make(chan *QueueItem, 1)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
}
|
|
|
|
func HttpListen(ns *dragonboat.NodeHost, port int) {
|
|
// http.HandleFunc("/api/render/render_notify", callbackHandler)
|
|
http.HandleFunc("/api/render/queue/push", pushRenderTaskHandler)
|
|
http.HandleFunc("/api/ws/render/queue/pop", queueHandler)
|
|
|
|
log.Printf(":%d", port)
|
|
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
|
|
}
|
|
|
|
func queueHandler(w http.ResponseWriter, r *http.Request) {
|
|
var err error
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
log.Println("建立连接", DequeueHandler.RefCountAdd(1))
|
|
defer func() {
|
|
log.Println("退出连接", DequeueHandler.RefCountAdd(-1))
|
|
}()
|
|
|
|
for {
|
|
|
|
item := <-PopChannel
|
|
|
|
if item == nil {
|
|
continue
|
|
}
|
|
|
|
var wdata []byte
|
|
switch data := item.Data.(type) {
|
|
case string:
|
|
wdata = []byte(data)
|
|
case []byte:
|
|
wdata = data
|
|
default:
|
|
wdata, err = json.Marshal(data)
|
|
if err != nil {
|
|
log.Println(err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
err = conn.SetWriteDeadline(time.Now().Add(time.Second * 8))
|
|
if err != nil {
|
|
log.Println(err)
|
|
log.Println("重新回队")
|
|
stateClient.PushItem(nil, item)
|
|
return
|
|
}
|
|
|
|
// 写回消息
|
|
err = conn.WriteMessage(websocket.BinaryMessage, wdata)
|
|
if err != nil {
|
|
log.Println(err)
|
|
log.Println("重新回队")
|
|
stateClient.PushItem(nil, item)
|
|
return
|
|
}
|
|
// 打印消息
|
|
fmt.Printf("%s 处理完成. len\n", conn.RemoteAddr())
|
|
}
|
|
}
|
|
|
|
type RenderCallback struct {
|
|
// 定义表单结构体
|
|
|
|
}
|
|
|
|
type Form struct {
|
|
UserID int `json:"user_id"`
|
|
GuestID int `json:"guest_id"`
|
|
APIType int `json:"api_type"`
|
|
UploadBucket int `json:"upload_bucket"`
|
|
FileKey string `json:"file_key"`
|
|
FileData []byte `json:"file_data"`
|
|
MetaData string `json:"meta_data"`
|
|
}
|
|
|
|
// RequestCallback结构体
|
|
type RequestCallback struct {
|
|
Sign string `json:"sign"`
|
|
Time int64 `json:"time"`
|
|
Info Info `json:"info"`
|
|
}
|
|
|
|
// Info结构体
|
|
type Info struct {
|
|
TaskID string `json:"task_id"`
|
|
Image string `json:"image"`
|
|
}
|
|
|
|
func callbackHandler(w http.ResponseWriter, r *http.Request) {
|
|
var req RequestCallback
|
|
json.NewDecoder(r.Body).Decode(&req)
|
|
|
|
urlstr := "http://localhost:9900/api/upload/upload-file-base"
|
|
|
|
form := requests.M{}
|
|
form["user_id"] = 0
|
|
form["guest_id"] = 0
|
|
form["api_type"] = 2
|
|
form["upload_bucket"] = 2
|
|
form["file_key"] = req.Info.TaskID
|
|
form["file_data"] = req.Info.Image
|
|
form["meta_data"] = ""
|
|
|
|
log.Println(req)
|
|
|
|
resp, err := requests.Post(urlstr).SetBodyFormData(form).Execute()
|
|
log.Println(resp.ContentString(), err)
|
|
|
|
// log.Println(req)
|
|
}
|
|
|
|
func pushRenderTaskHandler(w http.ResponseWriter, r *http.Request) {
|
|
// 1. 读取Body内容
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// 2. 定义结构体
|
|
|
|
item := QueueItem{}
|
|
|
|
// 3. 解析JSON到结构体
|
|
err = json.Unmarshal(body, &item)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
stateClient.PushItem(nil, &item)
|
|
}
|
|
|
|
type StateClient struct {
|
|
nh *dragonboat.NodeHost
|
|
}
|
|
|
|
var stateClient *StateClient
|
|
|
|
func (cli *StateClient) GetNoOPSession() *client.Session {
|
|
return cli.nh.GetNoOPSession(shardID)
|
|
}
|
|
|
|
func (cli *StateClient) PushItem(cs *client.Session, item *QueueItem) {
|
|
if cs == nil {
|
|
cs = cli.GetNoOPSession()
|
|
}
|
|
|
|
cmd := &CmdEnqueue{Command: Command{Group: "unity3d"}}
|
|
cmd.Item = item
|
|
data, err := FsPasser.PackToBytes(cmd)
|
|
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
_, err = cli.nh.SyncPropose(ctx, cs, data)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
|
|
// log.Println("enqueue", len(result.Data))
|
|
cancel()
|
|
}
|
|
|
|
func (cli *StateClient) PopItem(cs *client.Session, group string) (*QueueItem, error) {
|
|
if cs == nil {
|
|
cs = cli.GetNoOPSession()
|
|
}
|
|
|
|
cmd := &CmdDequeue{Command{Group: "unity3d"}}
|
|
data, err := FsPasser.PackToBytes(cmd)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
result, err := cli.nh.SyncPropose(ctx, cs, data)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(result.Data) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var item QueueItem
|
|
err = item.Decode(result.Data)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
return &item, nil
|
|
}
|