package initalize import ( "context" "crypto/tls" "errors" "fusenapi/constants" "fusenapi/utils/mq_consumer_factory" "log" "strings" "sync" "github.com/streadway/amqp" "github.com/zeromicro/go-zero/core/logx" ) type RabbitMqHandle struct { } // 连接属性 type queueItem struct { ch *amqp.Channel queue amqp.Queue } // 存储连接 var mapMq = make(map[constants.RABBIT_MQ]queueItem) func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle { url = strings.Trim(url, " ") if url == "" { return nil } conn, err := amqp.DialTLS(url, config) if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } // 创建一个通道 ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } //声明队列 for _, queueName := range constants.MqQueueArr { q, err := ch.QueueDeclare( string(queueName), // 队列名 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待服务器响应 nil, // 其他参数 ) if err != nil { log.Fatalf("Failed to declare a queue: %v", err) } mapMq[queueName] = queueItem{ ch: ch, queue: q, } } return &RabbitMqHandle{} } // 发送消息 func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error { object, ok := mapMq[queueName] if !ok { return errors.New("unknown queue") } // 发送消息到队列 return object.ch.Publish( "", // exchange,如果为空,则使用默认交换机 object.queue.Name, // routing key,将消息发送到指定队列 false, // 是否等待服务器响应 false, // 是否立即发送 amqp.Publishing{ ContentType: "text/plain", //普通文本 Body: message, }, // 消息内容 ) } // 消费消息 func (h *RabbitMqHandle) Consume(ctx context.Context, queueName constants.RABBIT_MQ, handle mq_consumer_factory.MqHandle) { object, ok := mapMq[queueName] if !ok { panic("unknown queue") } go func() { select { case <-ctx.Done(): panic("err ctx deadline") } }() msgs, err := object.ch.Consume( object.queue.Name, // 队列名 object.queue.Name, // 消费者名,如果为空,则是随机生成一个 false, // 自动应答 false, // 是否排他 false, // 是否阻塞 false, // 是否等待服务器响应 nil, // 其他参数 ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) } //允许20的并发 limit := make(chan struct{}, 20) wait := sync.WaitGroup{} defer close(limit) // 消费消息 for msg := range msgs { limit <- struct{}{} wait.Add(1) go func(m amqp.Delivery) { if err := recover(); err != nil { logx.Error(err) } defer func() { <-limit wait.Done() }() if err = handle.Run(ctx, m.Body); err != nil { logx.Error("failed to deal with MQ message:", string(m.Body)) return } if err = object.ch.Ack(m.DeliveryTag, false); err != nil { logx.Error("failed to ack mq to delete") } else { log.Printf("Consume Mq message success: %s", m.Body) } }(msg) } wait.Wait() }