From c08c1e7eba21f08f3858e9c859f9948c23b1ef13 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 2 Aug 2023 11:42:03 +0800 Subject: [PATCH] fix --- initalize/rabbitmq.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go index fc92d496..040a3a00 100644 --- a/initalize/rabbitmq.go +++ b/initalize/rabbitmq.go @@ -7,6 +7,7 @@ import ( "github.com/streadway/amqp" "github.com/zeromicro/go-zero/core/logx" "log" + "sync" ) type RabbitMqHandle struct { @@ -100,16 +101,19 @@ func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func( } //允许20的并发 limit := make(chan struct{}, 20) + wait := sync.WaitGroup{} defer close(limit) // 消费消息 for msg := range msgs { limit <- struct{}{} + wait.Add(1) go func(m amqp.Delivery) { if err := recover(); err != nil { logx.Error(err) } defer func() { <-limit + wait.Done() }() if err = handleFunc(m.Body); err != nil { logx.Error("failed to deal with MQ message:", string(m.Body)) @@ -122,5 +126,6 @@ func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func( } }(msg) } + wait.Wait() return nil }