174 lines
4.3 KiB
Go
174 lines
4.3 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/bwmarrin/snowflake"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
var taskQueue = NewQueue()
|
|
var waitQueue = NewQueue()
|
|
var readyQueue = NewQueue()
|
|
var errorQueue = NewQueue()
|
|
|
|
var snowNode *snowflake.Node
|
|
|
|
func initSnowflake() {
|
|
if snowNode == nil {
|
|
node, err := snowflake.NewNode(1)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
snowNode = node
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
|
|
log.SetFlags(log.Llongfile | log.LstdFlags)
|
|
initOplog()
|
|
initSnowflake()
|
|
|
|
engine.GET("/", func(c *gin.Context) {
|
|
c.JSON(200, Response{Code: 200, Message: "Home Page"})
|
|
})
|
|
|
|
task := engine.Group("/task")
|
|
task.GET("/get", GetTask)
|
|
task.POST("/put", PutTask)
|
|
|
|
task.POST("/content", ContentTask)
|
|
|
|
task.POST("/error", ErrorTask)
|
|
|
|
task.GET("/ready", ReadyTask)
|
|
task.POST("/ack", AckTask)
|
|
}
|
|
|
|
// GetTask 获取当前一条任务列表
|
|
func GetTask(c *gin.Context) {
|
|
if itask, ok := taskQueue.Pop(); ok {
|
|
task := itask.(*Task)
|
|
waitQueue.Push(task.data["taskid"], task)
|
|
c.JSON(http.StatusOK, Response{Code: 200, Message: "", Data: task})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, Response{Code: 204, Message: "No Task"})
|
|
}
|
|
|
|
// PutTask 把一条任务放入队列
|
|
func PutTask(c *gin.Context) {
|
|
|
|
u := c.PostForm("url")
|
|
if u != "" {
|
|
data := NewTask()
|
|
now := time.Now()
|
|
tid := snowNode.Generate().Base64()
|
|
label := c.PostForm("label")
|
|
data.Store("taskid", tid)
|
|
data.Store("url", u)
|
|
data.Store("ts", now.UnixNano())
|
|
data.Store("label", label)
|
|
data.Store("content_condition", c.PostForm("content_condition"))
|
|
if callback := c.PostForm("callback"); callback != "" {
|
|
data.Store("callback", callback)
|
|
}
|
|
taskQueue.Push(tid, data)
|
|
oplog.Write(data)
|
|
c.JSON(http.StatusOK, Response{Code: 200, Message: "ok", Data: data})
|
|
return
|
|
}
|
|
|
|
c.JSON(http.StatusOK, Response{Code: 400, Message: "url 不错在"})
|
|
return
|
|
}
|
|
|
|
// ContentTask 把一条任务放入队列
|
|
func ContentTask(c *gin.Context) {
|
|
var err error
|
|
tid, ok := c.GetPostForm("taskid")
|
|
if !ok {
|
|
c.JSON(http.StatusOK, Response{Code: 404, Message: "taskid is not set"})
|
|
}
|
|
|
|
if iv, ok := waitQueue.Remove(tid); ok {
|
|
if content, ok := c.GetPostForm("content"); ok {
|
|
task := iv.(*Task)
|
|
task.Store("content", content)
|
|
task.Store("status", "ready")
|
|
readyQueue.Push(tid, task) // 进入回调发送队列.TODO: 内容持久化
|
|
c.JSON(200, Response{Code: 200, Data: task})
|
|
// log.Println("start callback")
|
|
if label, ok := task.Load("label"); ok {
|
|
log.Println(label.(string), tid)
|
|
}
|
|
go CallbackServer(task)
|
|
return
|
|
}
|
|
}
|
|
|
|
c.JSON(200, Response{Code: 404, Message: fmt.Sprintln("response: ", err)})
|
|
}
|
|
|
|
// AckTask 确认整个任务流程完成.
|
|
func AckTask(c *gin.Context) {
|
|
tid := c.PostForm("taskid")
|
|
if tid != "" {
|
|
if itask, ok := readyQueue.Get(tid); ok {
|
|
task := itask.(*Task)
|
|
if status, ok := task.Load("status"); ok {
|
|
if status.(string) == "readying" {
|
|
task.Store("status", "readied")
|
|
c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s readied", tid)})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s is not readying", tid)})
|
|
return
|
|
}
|
|
}
|
|
} else {
|
|
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)})
|
|
}
|
|
|
|
// ReadyTask 已完成的任务. 读取期间为readying状态.
|
|
func ReadyTask(c *gin.Context) {
|
|
tid := c.Query("taskid")
|
|
if tid != "" {
|
|
if itask, ok := readyQueue.Get(tid); ok {
|
|
task := itask.(*Task)
|
|
task.Store("status", "readying")
|
|
c.JSON(http.StatusOK, Response{Code: 200, Data: task})
|
|
return
|
|
}
|
|
} else {
|
|
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")})
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)})
|
|
}
|
|
|
|
// ErrorTask 任务错误无法完成
|
|
func ErrorTask(c *gin.Context) {
|
|
tid := c.PostForm("taskid")
|
|
errorStr := c.PostForm("error")
|
|
if itask, ok := waitQueue.Remove(tid); ok {
|
|
task := itask.(*Task)
|
|
task.Store("error", errorStr)
|
|
errorQueue.Push(tid, task) // 进入回调发送队列.TODO: 内容持久化
|
|
c.JSON(http.StatusOK, Response{Code: 200})
|
|
// log.Println(errorStr)
|
|
go ErrorCallbackServer(task)
|
|
return
|
|
}
|
|
c.JSON(http.StatusOK, Response{Code: 404})
|
|
return
|
|
}
|