package main import ( "encoding/json" "fmt" "log" "net/http" "time" "github.com/bwmarrin/snowflake" "github.com/gin-gonic/gin" ) var taskQueue = NewQueue() var waitQueue = NewQueue() var readyQueue = NewQueue() var snowNode *snowflake.Node func init() { log.SetFlags(log.Llongfile | log.LstdFlags) initOplog() // Create a new Node with a Node number of 1 node, err := snowflake.NewNode(1) if err != nil { panic(err) } snowNode = node engine.GET("/", func(c *gin.Context) { c.JSON(200, Response{Code: 200, Message: "", Data: nil}) }) task := engine.Group("/task") task.GET("/get", GetTask) task.POST("/put", PutTask) task.POST("/content", ContentTask) task.GET("/ready", ReadyTask) } // GetTask 获取当前一条任务列表 func GetTask(c *gin.Context) { if result, ok := taskQueue.Pop(); ok { c.JSON(http.StatusOK, Response{Code: 200, Message: "", Data: result}) return } c.JSON(http.StatusOK, Response{Code: 204, Message: "No Task"}) } // PutTask 把一条任务放入队列 func PutTask(c *gin.Context) { u := c.PostForm("url") if u != "" { data := NewTask() now := time.Now() tid := snowNode.Generate().Base64() data.Store("taskid", tid) data.Store("url", u) data.Store("ts", now.UnixNano()) taskQueue.Push(tid, data) oplog.Write(data) c.JSON(http.StatusOK, Response{Code: 200, Message: "ok", Data: data}) return } c.JSON(http.StatusOK, Response{Code: 400, Message: "url 不错在"}) return } // ContentTask 把一条任务放入队列 func ContentTask(c *gin.Context) { r := c.PostForm("response") response := &Response{} json.Unmarshal([]byte(r), response) if response.Code == 200 { data := response.Data.(gin.H) tid := data["taskid"] if iv, ok := waitQueue.Remove(tid); ok { task := iv.(*Task) task.Store("content", data["content"]) task.Store("is_read", "false") task.Store("status", "ready") readyQueue.Push(tid, task) // 进入回调发送队列.TODO: 内容持久化 } } } // AckTask 确认整个任务流程完成. func AckTask(c *gin.Context) { tid := c.Query("taskid") if tid != "" { if itask, ok := readyQueue.Get(tid); ok { task := itask.(*Task) if status, ok := task.Load("status"); ok { if status.(string) == "readying" { task.Store("status", "readied") c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s readied", tid)}) return } c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s is not readying", tid)}) return } } } else { c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")}) return } c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)}) } // ReadyTask 把一条任务放入队列 func ReadyTask(c *gin.Context) { tid := c.Query("taskid") if tid != "" { if itask, ok := readyQueue.Get(tid); ok { task := itask.(*Task) task.Store("status", "readying") c.JSON(http.StatusOK, Response{Code: 200, Data: task}) return } } else { c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")}) return } c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)}) }