133 lines
3.1 KiB
Go
133 lines
3.1 KiB
Go
package initalize
|
||
|
||
import (
|
||
"context"
|
||
"crypto/tls"
|
||
"errors"
|
||
"fusenapi/constants"
|
||
"fusenapi/utils/mq_consumer_factory"
|
||
"github.com/streadway/amqp"
|
||
"github.com/zeromicro/go-zero/core/logx"
|
||
"log"
|
||
"sync"
|
||
)
|
||
|
||
type RabbitMqHandle struct {
|
||
}
|
||
|
||
// 连接属性
|
||
type queueItem struct {
|
||
ch *amqp.Channel
|
||
queue amqp.Queue
|
||
}
|
||
|
||
// 存储连接
|
||
var mapMq = make(map[constants.RABBIT_MQ]queueItem)
|
||
|
||
func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
|
||
if url == "" {
|
||
return nil
|
||
}
|
||
conn, err := amqp.DialTLS(url, config)
|
||
if err != nil {
|
||
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
|
||
}
|
||
// 创建一个通道
|
||
ch, err := conn.Channel()
|
||
if err != nil {
|
||
log.Fatalf("Failed to open a channel: %v", err)
|
||
}
|
||
//声明队列
|
||
for _, queueName := range constants.MqQueueArr {
|
||
q, err := ch.QueueDeclare(
|
||
string(queueName), // 队列名
|
||
true, // 是否持久化
|
||
false, // 是否自动删除
|
||
false, // 是否排他
|
||
false, // 是否等待服务器响应
|
||
nil, // 其他参数
|
||
)
|
||
if err != nil {
|
||
log.Fatalf("Failed to declare a queue: %v", err)
|
||
}
|
||
mapMq[queueName] = queueItem{
|
||
ch: ch,
|
||
queue: q,
|
||
}
|
||
}
|
||
return &RabbitMqHandle{}
|
||
}
|
||
|
||
// 发送消息
|
||
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
|
||
object, ok := mapMq[queueName]
|
||
if !ok {
|
||
return errors.New("unknown queue")
|
||
}
|
||
// 发送消息到队列
|
||
return object.ch.Publish(
|
||
"", // exchange,如果为空,则使用默认交换机
|
||
object.queue.Name, // routing key,将消息发送到指定队列
|
||
false, // 是否等待服务器响应
|
||
false, // 是否立即发送
|
||
amqp.Publishing{
|
||
ContentType: "text/plain", //普通文本
|
||
Body: message,
|
||
}, // 消息内容
|
||
)
|
||
}
|
||
|
||
// 消费消息
|
||
func (h *RabbitMqHandle) Consume(ctx context.Context, queueName constants.RABBIT_MQ, handle mq_consumer_factory.MqHandle) {
|
||
object, ok := mapMq[queueName]
|
||
if !ok {
|
||
panic("unknown queue")
|
||
}
|
||
go func() {
|
||
select {
|
||
case <-ctx.Done():
|
||
panic("err ctx deadline")
|
||
}
|
||
}()
|
||
msgs, err := object.ch.Consume(
|
||
object.queue.Name, // 队列名
|
||
object.queue.Name, // 消费者名,如果为空,则是随机生成一个
|
||
false, // 自动应答
|
||
false, // 是否排他
|
||
false, // 是否阻塞
|
||
false, // 是否等待服务器响应
|
||
nil, // 其他参数
|
||
)
|
||
if err != nil {
|
||
log.Fatalf("Failed to register a consumer: %v", err)
|
||
}
|
||
//允许20的并发
|
||
limit := make(chan struct{}, 20)
|
||
wait := sync.WaitGroup{}
|
||
defer close(limit)
|
||
// 消费消息
|
||
for msg := range msgs {
|
||
limit <- struct{}{}
|
||
wait.Add(1)
|
||
go func(m amqp.Delivery) {
|
||
if err := recover(); err != nil {
|
||
logx.Error(err)
|
||
}
|
||
defer func() {
|
||
<-limit
|
||
wait.Done()
|
||
}()
|
||
if err = handle.Run(ctx, m.Body); err != nil {
|
||
logx.Error("failed to deal with MQ message:", string(m.Body))
|
||
return
|
||
}
|
||
if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
|
||
logx.Error("failed to ack mq to delete")
|
||
} else {
|
||
log.Printf("Consume Mq message success: %s", m.Body)
|
||
}
|
||
}(msg)
|
||
}
|
||
wait.Wait()
|
||
}
|