fusenapi/initalize/rabbitmq.go

133 lines
3.1 KiB
Go
Raw Normal View History

2023-07-28 07:10:06 +00:00
package initalize
import (
2023-08-04 09:45:36 +00:00
"context"
2023-07-28 07:10:06 +00:00
"crypto/tls"
2023-07-28 09:15:37 +00:00
"errors"
2023-07-28 07:10:06 +00:00
"fusenapi/constants"
2023-08-07 03:44:54 +00:00
"fusenapi/utils/mq_consumer_factory"
2023-07-28 07:10:06 +00:00
"github.com/streadway/amqp"
2023-07-28 09:15:37 +00:00
"github.com/zeromicro/go-zero/core/logx"
2023-07-28 07:10:06 +00:00
"log"
2023-08-02 03:42:03 +00:00
"sync"
2023-07-28 07:10:06 +00:00
)
2023-07-28 09:15:37 +00:00
type RabbitMqHandle struct {
2023-07-28 07:10:06 +00:00
}
2023-07-28 09:15:37 +00:00
// 连接属性
type queueItem struct {
ch *amqp.Channel
queue amqp.Queue
}
// 存储连接
var mapMq = make(map[constants.RABBIT_MQ]queueItem)
2023-07-28 07:39:15 +00:00
2023-07-28 09:15:37 +00:00
func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
2023-07-28 07:39:15 +00:00
if url == "" {
2023-07-28 09:15:37 +00:00
return nil
2023-07-28 07:39:15 +00:00
}
2023-07-28 07:10:06 +00:00
conn, err := amqp.DialTLS(url, config)
if err != nil {
2023-07-28 07:19:48 +00:00
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
2023-07-28 07:10:06 +00:00
}
// 创建一个通道
ch, err := conn.Channel()
if err != nil {
2023-07-28 07:19:48 +00:00
log.Fatalf("Failed to open a channel: %v", err)
2023-07-28 07:10:06 +00:00
}
//声明队列
2023-08-08 10:35:29 +00:00
for _, queueName := range constants.MqQueueArr {
2023-07-28 07:10:06 +00:00
q, err := ch.QueueDeclare(
string(queueName), // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
2023-07-28 09:15:37 +00:00
mapMq[queueName] = queueItem{
ch: ch,
queue: q,
2023-07-28 07:10:06 +00:00
}
}
2023-07-28 09:15:37 +00:00
return &RabbitMqHandle{}
}
// 发送消息
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
object, ok := mapMq[queueName]
if !ok {
return errors.New("unknown queue")
}
// 发送消息到队列
return object.ch.Publish(
"", // exchange如果为空则使用默认交换机
object.queue.Name, // routing key将消息发送到指定队列
false, // 是否等待服务器响应
false, // 是否立即发送
amqp.Publishing{
ContentType: "text/plain", //普通文本
Body: message,
}, // 消息内容
)
}
// 消费消息
2023-08-07 03:44:54 +00:00
func (h *RabbitMqHandle) Consume(ctx context.Context, queueName constants.RABBIT_MQ, handle mq_consumer_factory.MqHandle) {
2023-07-28 09:15:37 +00:00
object, ok := mapMq[queueName]
if !ok {
2023-08-07 03:44:54 +00:00
panic("unknown queue")
2023-07-28 09:15:37 +00:00
}
2023-08-04 09:45:36 +00:00
go func() {
select {
case <-ctx.Done():
panic("err ctx deadline")
}
}()
2023-07-28 09:15:37 +00:00
msgs, err := object.ch.Consume(
object.queue.Name, // 队列名
2023-07-28 09:43:19 +00:00
object.queue.Name, // 消费者名,如果为空,则是随机生成一个
2023-07-28 09:15:37 +00:00
false, // 自动应答
false, // 是否排他
false, // 是否阻塞
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
//允许20的并发
limit := make(chan struct{}, 20)
2023-08-02 03:42:03 +00:00
wait := sync.WaitGroup{}
2023-07-28 09:15:37 +00:00
defer close(limit)
// 消费消息
for msg := range msgs {
limit <- struct{}{}
2023-08-02 03:42:03 +00:00
wait.Add(1)
2023-07-28 09:15:37 +00:00
go func(m amqp.Delivery) {
if err := recover(); err != nil {
logx.Error(err)
}
defer func() {
<-limit
2023-08-02 03:42:03 +00:00
wait.Done()
2023-07-28 09:15:37 +00:00
}()
2023-08-08 04:22:15 +00:00
if err = handle.Run(ctx, m.Body); err != nil {
2023-07-28 09:15:37 +00:00
logx.Error("failed to deal with MQ message:", string(m.Body))
return
}
if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
logx.Error("failed to ack mq to delete")
} else {
log.Printf("Consume Mq message success: %s", m.Body)
}
}(msg)
}
2023-08-02 03:42:03 +00:00
wait.Wait()
2023-07-28 07:10:06 +00:00
}