ad_parser/queue.go

78 lines
1.4 KiB
Go
Raw Normal View History

2018-12-19 09:51:39 +00:00
package parser
import (
"log"
"runtime"
"sync"
"time"
"github.com/streadway/amqp"
)
// Queue Youmi的队列
type Queue struct {
url string `yaml:"url"`
exchange string `yaml:"exchange"`
routekey string `yaml:"routekey"`
mutex sync.Mutex
conn *amqp.Connection
contentType string
}
// NewQueue youmi queue队列 mq
func NewQueue(url string, exchange, routekey string) *Queue {
que := Queue{url: url, exchange: exchange, routekey: routekey}
que.contentType = "application/json"
if err := que.Connect(); err != nil {
panic(err)
}
return &que
}
// SetContentType 设置发送的类型, 让接收端好判断
func (que *Queue) SetContentType(ct string) {
que.mutex.Lock()
defer que.mutex.Unlock()
que.contentType = ct
}
// Connect 链接MQ
func (que *Queue) Connect() error {
que.mutex.Lock()
defer que.mutex.Unlock()
conn, err := amqp.Dial(que.url)
if err != nil {
return err
}
que.conn = conn
runtime.SetFinalizer(que.conn, func(obj *amqp.Connection) {
que.conn.Close()
})
return nil
}
// Push 推送数据
func (que *Queue) Push(data []byte) error {
que.mutex.Lock()
defer que.mutex.Unlock()
ch, err := que.conn.Channel()
if err != nil {
log.Println(err)
if err := que.Connect(); err != nil {
panic(err)
}
}
return ch.Publish(que.exchange, que.routekey, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: que.contentType,
Body: data,
})
}