ad_parser/queue.go

78 lines
1.4 KiB
Go
Raw Normal View History

2018-12-19 09:51:39 +00:00
package parser
import (
"log"
"runtime"
"sync"
"time"
"github.com/streadway/amqp"
)
// Queue Youmi的队列
type Queue struct {
2018-12-19 10:28:06 +00:00
Url string `yaml:"url"`
Exchange string `yaml:"exchange"`
Routekey string `yaml:"routekey"`
2018-12-19 09:51:39 +00:00
mutex sync.Mutex
conn *amqp.Connection
contentType string
}
// NewQueue youmi queue队列 mq
func NewQueue(url string, exchange, routekey string) *Queue {
2018-12-19 10:28:06 +00:00
que := Queue{Url: url, Exchange: exchange, Routekey: routekey}
2018-12-19 09:51:39 +00:00
que.contentType = "application/json"
if err := que.Connect(); err != nil {
panic(err)
}
return &que
}
// SetContentType 设置发送的类型, 让接收端好判断
func (que *Queue) SetContentType(ct string) {
que.mutex.Lock()
defer que.mutex.Unlock()
que.contentType = ct
}
// Connect 链接MQ
func (que *Queue) Connect() error {
que.mutex.Lock()
defer que.mutex.Unlock()
2018-12-19 10:28:06 +00:00
conn, err := amqp.Dial(que.Url)
2018-12-19 09:51:39 +00:00
if err != nil {
return err
}
que.conn = conn
runtime.SetFinalizer(que.conn, func(obj *amqp.Connection) {
que.conn.Close()
})
return nil
}
// Push 推送数据
func (que *Queue) Push(data []byte) error {
que.mutex.Lock()
defer que.mutex.Unlock()
ch, err := que.conn.Channel()
if err != nil {
log.Println(err)
if err := que.Connect(); err != nil {
panic(err)
}
}
2018-12-19 10:28:06 +00:00
return ch.Publish(que.Exchange, que.Routekey, false, false, amqp.Publishing{
2018-12-19 09:51:39 +00:00
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: que.contentType,
Body: data,
})
}