package parser import ( "log" "runtime" "sync" "time" "github.com/streadway/amqp" ) // Queue Youmi的队列 type Queue struct { Url string `yaml:"url"` Exchange string `yaml:"exchange"` Routekey string `yaml:"routekey"` mutex sync.Mutex conn *amqp.Connection contentType string } // NewQueue youmi queue队列 mq func NewQueue(url string, exchange, routekey string) *Queue { que := Queue{Url: url, Exchange: exchange, Routekey: routekey} que.contentType = "application/json" if err := que.Connect(); err != nil { panic(err) } return &que } // SetContentType 设置发送的类型, 让接收端好判断 func (que *Queue) SetContentType(ct string) { que.mutex.Lock() defer que.mutex.Unlock() que.contentType = ct } // Connect 链接MQ func (que *Queue) Connect() error { que.mutex.Lock() defer que.mutex.Unlock() conn, err := amqp.Dial(que.Url) if err != nil { return err } que.conn = conn runtime.SetFinalizer(que.conn, func(obj *amqp.Connection) { que.conn.Close() }) return nil } // Push 推送数据 func (que *Queue) Push(data []byte) error { que.mutex.Lock() defer que.mutex.Unlock() ch, err := que.conn.Channel() if err != nil { log.Println(err) if err := que.Connect(); err != nil { panic(err) } } return ch.Publish(que.Exchange, que.Routekey, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, Timestamp: time.Now(), ContentType: que.contentType, Body: data, }) }