78 lines
1.4 KiB
Go
78 lines
1.4 KiB
Go
package parser
|
|
|
|
import (
|
|
"log"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
// Queue Youmi的队列
|
|
type Queue struct {
|
|
url string `yaml:"url"`
|
|
exchange string `yaml:"exchange"`
|
|
routekey string `yaml:"routekey"`
|
|
|
|
mutex sync.Mutex
|
|
conn *amqp.Connection
|
|
|
|
contentType string
|
|
}
|
|
|
|
// NewQueue youmi queue队列 mq
|
|
func NewQueue(url string, exchange, routekey string) *Queue {
|
|
que := Queue{url: url, exchange: exchange, routekey: routekey}
|
|
que.contentType = "application/json"
|
|
|
|
if err := que.Connect(); err != nil {
|
|
panic(err)
|
|
}
|
|
return &que
|
|
}
|
|
|
|
// SetContentType 设置发送的类型, 让接收端好判断
|
|
func (que *Queue) SetContentType(ct string) {
|
|
que.mutex.Lock()
|
|
defer que.mutex.Unlock()
|
|
|
|
que.contentType = ct
|
|
}
|
|
|
|
// Connect 链接MQ
|
|
func (que *Queue) Connect() error {
|
|
que.mutex.Lock()
|
|
defer que.mutex.Unlock()
|
|
|
|
conn, err := amqp.Dial(que.url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
que.conn = conn
|
|
runtime.SetFinalizer(que.conn, func(obj *amqp.Connection) {
|
|
que.conn.Close()
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// Push 推送数据
|
|
func (que *Queue) Push(data []byte) error {
|
|
que.mutex.Lock()
|
|
defer que.mutex.Unlock()
|
|
|
|
ch, err := que.conn.Channel()
|
|
if err != nil {
|
|
log.Println(err)
|
|
if err := que.Connect(); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
return ch.Publish(que.exchange, que.routekey, false, false, amqp.Publishing{
|
|
DeliveryMode: amqp.Persistent,
|
|
Timestamp: time.Now(),
|
|
ContentType: que.contentType,
|
|
Body: data,
|
|
})
|
|
}
|