package main import ( "fmt" "log" "net/http" "strconv" "time" "github.com/bwmarrin/snowflake" "github.com/gin-gonic/gin" ) var taskQueue = NewQueue() var waitQueue = NewQueue() var readyQueue = NewQueue() var errorQueue = NewQueue() var snowNode *snowflake.Node func initSnowflake() { if snowNode == nil { node, err := snowflake.NewNode(1) if err != nil { panic(err) } snowNode = node } } func init() { log.SetFlags(log.Llongfile | log.LstdFlags) initOplog() initSnowflake() engine.GET("/", func(c *gin.Context) { c.JSON(200, Response{Code: 200, Message: "Home Page"}) }) task := engine.Group("/task") task.GET("/get", GetTask) task.POST("/put", PutTask) task.POST("/content", ContentTask) task.POST("/error", ErrorTask) task.GET("/ready", ReadyTask) task.POST("/ack", AckTask) } // GetTask 获取当前一条任务列表 func GetTask(c *gin.Context) { if itask, ok := taskQueue.Pop(); ok { task := itask.(*Task) waitQueue.Push(task.data["taskid"], task) c.JSON(http.StatusOK, Response{Code: 200, Message: "", Data: task}) return } c.JSON(http.StatusOK, Response{Code: 204, Message: "No Task"}) } // PutTask 把一条任务放入队列 func PutTask(c *gin.Context) { u := c.PostForm("url") if u != "" { data := NewTask() now := time.Now() tid := snowNode.Generate().Base64() label := c.PostForm("label") if carrayhash := c.PostForm("carrayhash"); carrayhash != "" { data.Store("carrayhash", carrayhash) } if callback := c.PostForm("callback"); callback != "" { data.Store("callback", callback) } data.Store("taskid", tid) data.Store("url", u) data.Store("ts", now.UnixNano()) data.Store("label", label) data.Store("content_condition", c.PostForm("content_condition")) if waitcapture, err := strconv.Atoi(c.PostForm("waitcapture")); err != nil { data.Store("waitcapture", 1) } else { data.Store("waitcapture", waitcapture) } if retry, err := strconv.Atoi(c.PostForm("retry")); err != nil { data.Store("retry", 1) } else { data.Store("retry", retry) } taskQueue.Push(tid, data) oplog.Write(data) c.JSON(http.StatusOK, Response{Code: 200, Message: "ok", Data: data}) return } c.JSON(http.StatusOK, Response{Code: 400, Message: "url 不错在"}) return } // ContentTask 把一条任务放入队列 func ContentTask(c *gin.Context) { var err error tid, ok := c.GetPostForm("taskid") if !ok { c.JSON(http.StatusOK, Response{Code: 404, Message: "taskid is not set"}) } if iv, ok := waitQueue.Remove(tid); ok { if content, ok := c.GetPostForm("content"); ok { task := iv.(*Task) task.Store("content", content) task.Store("status", "ready") readyQueue.Push(tid, task) // 进入回调发送队列.TODO: 内容持久化 c.JSON(200, Response{Code: 200, Data: task}) if label, ok := task.Load("label"); ok { log.Println(label.(string), tid) } go CallbackServer(task) return } } c.JSON(200, Response{Code: 404, Message: fmt.Sprintln("response: ", err)}) } // AckTask 确认整个任务流程完成. func AckTask(c *gin.Context) { tid := c.PostForm("taskid") if tid != "" { if itask, ok := readyQueue.Get(tid); ok { task := itask.(*Task) if status, ok := task.Load("status"); ok { if status.(string) == "readying" { task.Store("status", "readied") c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s readied", tid)}) return } c.JSON(http.StatusOK, Response{Code: 200, Message: fmt.Sprintf("task %s is not readying", tid)}) return } } } else { c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")}) return } c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)}) } // ReadyTask 已完成的任务. 读取期间为readying状态. func ReadyTask(c *gin.Context) { tid := c.Query("taskid") if tid != "" { if itask, ok := readyQueue.Get(tid); ok { task := itask.(*Task) task.Store("status", "readying") c.JSON(http.StatusOK, Response{Code: 200, Data: task}) return } } else { c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("query taskid params must exist")}) return } c.JSON(http.StatusOK, Response{Code: 404, Message: fmt.Sprintf("taskid: %s is not found", tid)}) } // ErrorTask 任务错误无法完成 func ErrorTask(c *gin.Context) { tid := c.PostForm("taskid") errorStr := c.PostForm("error") if itask, ok := waitQueue.Remove(tid); ok { task := itask.(*Task) task.Store("error", errorStr) errorQueue.Push(tid, task) // 进入回调发送队列.TODO: 内容持久化 c.JSON(http.StatusOK, Response{Code: 200}) // log.Println(errorStr) go ErrorCallbackServer(task) return } c.JSON(http.StatusOK, Response{Code: 404}) return }