This commit is contained in:
laodaming 2023-08-02 11:42:03 +08:00
parent 3388b446fd
commit c08c1e7eba

View File

@ -7,6 +7,7 @@ import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"log" "log"
"sync"
) )
type RabbitMqHandle struct { type RabbitMqHandle struct {
@ -100,16 +101,19 @@ func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(
} }
//允许20的并发 //允许20的并发
limit := make(chan struct{}, 20) limit := make(chan struct{}, 20)
wait := sync.WaitGroup{}
defer close(limit) defer close(limit)
// 消费消息 // 消费消息
for msg := range msgs { for msg := range msgs {
limit <- struct{}{} limit <- struct{}{}
wait.Add(1)
go func(m amqp.Delivery) { go func(m amqp.Delivery) {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error(err) logx.Error(err)
} }
defer func() { defer func() {
<-limit <-limit
wait.Done()
}() }()
if err = handleFunc(m.Body); err != nil { if err = handleFunc(m.Body); err != nil {
logx.Error("failed to deal with MQ message:", string(m.Body)) logx.Error("failed to deal with MQ message:", string(m.Body))
@ -122,5 +126,6 @@ func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(
} }
}(msg) }(msg)
} }
wait.Wait()
return nil return nil
} }