fusenapi/initalize/rabbitmq.go

127 lines
3.1 KiB
Go
Raw Normal View History

2023-07-28 07:10:06 +00:00
package initalize
import (
"crypto/tls"
2023-07-28 09:15:37 +00:00
"errors"
2023-07-28 07:10:06 +00:00
"fusenapi/constants"
"github.com/streadway/amqp"
2023-07-28 09:15:37 +00:00
"github.com/zeromicro/go-zero/core/logx"
2023-07-28 07:10:06 +00:00
"log"
)
2023-07-28 09:15:37 +00:00
type RabbitMqHandle struct {
2023-07-28 07:10:06 +00:00
}
2023-07-28 09:15:37 +00:00
// 连接属性
type queueItem struct {
ch *amqp.Channel
queue amqp.Queue
}
2023-07-31 02:40:54 +00:00
// 队列列表
var mqQueueArr = []constants.RABBIT_MQ{
constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA,
constants.RABBIT_MQ_RENDER_RESULT_DATA,
}
2023-07-28 09:15:37 +00:00
// 存储连接
var mapMq = make(map[constants.RABBIT_MQ]queueItem)
2023-07-28 07:39:15 +00:00
2023-07-28 09:15:37 +00:00
func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
2023-07-28 07:39:15 +00:00
if url == "" {
2023-07-28 09:15:37 +00:00
return nil
2023-07-28 07:39:15 +00:00
}
2023-07-28 07:10:06 +00:00
conn, err := amqp.DialTLS(url, config)
if err != nil {
2023-07-28 07:19:48 +00:00
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
2023-07-28 07:10:06 +00:00
}
// 创建一个通道
ch, err := conn.Channel()
if err != nil {
2023-07-28 07:19:48 +00:00
log.Fatalf("Failed to open a channel: %v", err)
2023-07-28 07:10:06 +00:00
}
//声明队列
2023-07-31 02:40:54 +00:00
for _, queueName := range mqQueueArr {
2023-07-28 07:10:06 +00:00
q, err := ch.QueueDeclare(
string(queueName), // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
2023-07-28 09:15:37 +00:00
mapMq[queueName] = queueItem{
ch: ch,
queue: q,
2023-07-28 07:10:06 +00:00
}
}
2023-07-28 09:15:37 +00:00
return &RabbitMqHandle{}
}
// 发送消息
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
object, ok := mapMq[queueName]
if !ok {
return errors.New("unknown queue")
}
// 发送消息到队列
return object.ch.Publish(
"", // exchange如果为空则使用默认交换机
object.queue.Name, // routing key将消息发送到指定队列
false, // 是否等待服务器响应
false, // 是否立即发送
amqp.Publishing{
ContentType: "text/plain", //普通文本
Body: message,
}, // 消息内容
)
}
// 消费消息
func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(data []byte) error) error {
object, ok := mapMq[queueName]
if !ok {
return errors.New("unknown queue")
}
msgs, err := object.ch.Consume(
object.queue.Name, // 队列名
2023-07-28 09:43:19 +00:00
object.queue.Name, // 消费者名,如果为空,则是随机生成一个
2023-07-28 09:15:37 +00:00
false, // 自动应答
false, // 是否排他
false, // 是否阻塞
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
//允许20的并发
limit := make(chan struct{}, 20)
defer close(limit)
// 消费消息
for msg := range msgs {
limit <- struct{}{}
go func(m amqp.Delivery) {
if err := recover(); err != nil {
logx.Error(err)
}
defer func() {
<-limit
}()
if err = handleFunc(m.Body); err != nil {
logx.Error("failed to deal with MQ message:", string(m.Body))
return
}
if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
logx.Error("failed to ack mq to delete")
} else {
log.Printf("Consume Mq message success: %s", m.Body)
}
}(msg)
}
return nil
2023-07-28 07:10:06 +00:00
}