ad_parser/queue.go
huangsimin b8f1940cac 123
2018-12-19 18:28:06 +08:00

78 lines
1.4 KiB
Go

package parser
import (
"log"
"runtime"
"sync"
"time"
"github.com/streadway/amqp"
)
// Queue Youmi的队列
type Queue struct {
Url string `yaml:"url"`
Exchange string `yaml:"exchange"`
Routekey string `yaml:"routekey"`
mutex sync.Mutex
conn *amqp.Connection
contentType string
}
// NewQueue youmi queue队列 mq
func NewQueue(url string, exchange, routekey string) *Queue {
que := Queue{Url: url, Exchange: exchange, Routekey: routekey}
que.contentType = "application/json"
if err := que.Connect(); err != nil {
panic(err)
}
return &que
}
// SetContentType 设置发送的类型, 让接收端好判断
func (que *Queue) SetContentType(ct string) {
que.mutex.Lock()
defer que.mutex.Unlock()
que.contentType = ct
}
// Connect 链接MQ
func (que *Queue) Connect() error {
que.mutex.Lock()
defer que.mutex.Unlock()
conn, err := amqp.Dial(que.Url)
if err != nil {
return err
}
que.conn = conn
runtime.SetFinalizer(que.conn, func(obj *amqp.Connection) {
que.conn.Close()
})
return nil
}
// Push 推送数据
func (que *Queue) Push(data []byte) error {
que.mutex.Lock()
defer que.mutex.Unlock()
ch, err := que.conn.Channel()
if err != nil {
log.Println(err)
if err := que.Connect(); err != nil {
panic(err)
}
}
return ch.Publish(que.Exchange, que.Routekey, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: que.contentType,
Body: data,
})
}