Skip to content

Commit 36e8db8

Browse files
Refactoring
1 parent fd0632f commit 36e8db8

4 files changed

Lines changed: 35 additions & 32 deletions

File tree

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
# Go RabbitMQ Consumer
1+
![GitHub go.mod Go version (subdirectory of monorepo)](https://img.shields.io/github/go-mod/go-version/elvin-tacirzade/go-amqp?logo=go)
22
[![Go Reference](https://pkg.go.dev/badge/github.com/elvin-tacirzade/go-amqp.svg)](https://pkg.go.dev/github.com/elvin-tacirzade/go-amqp)
33

4+
# Go RabbitMQ Consumer
5+
46
This package uses the package provided by the [RabbitMQ core team](https://github.com/rabbitmq/amqp091-go).
57
## Goals
68
Provide auto reconnect
@@ -9,14 +11,14 @@ Provide auto reconnect
911
go get -u github.com/elvin-tacirzade/go-amqp
1012
```
1113
## Usage
12-
First we call the Init() function. The init() function takes the following parameters:
14+
First we call a New() function. The function takes the following parameters:
1315
1. `user` - Declare a RabbitMQ user.
1416
2. `password` - Declare a RabbitMQ password
1517
3. `host` - Declare a RabbitMQ host
1618
4. `port` -Declare a RabbitMQ port
1719
5. `reconnectTime` - Declare an auto reconnect time
1820

19-
Init() function returns the RabbitMQ struct and error.
21+
New() function returns the RabbitMQ struct and error.
2022

2123
See the [example](https://github.com/elvin-tacirzade/go-amqp/tree/main/example) subdirectory for simple consumers executables.
2224

example/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func main() {
1818
prefetchCount := 5
1919
var deliveries []string
2020

21-
rabbitMQ, err := amqp.Init(rabbitMQUser, rabbitMQPassword, rabbitMQHost, rabbitMQPort, reconnectTime)
21+
rabbitMQ, err := amqp.New(rabbitMQUser, rabbitMQPassword, rabbitMQHost, rabbitMQPort, reconnectTime)
2222
if err != nil {
2323
log.Fatal(err)
2424
}

rabbitmq.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,21 @@ import (
88
"time"
99
)
1010

11-
// RabbitMQ struct defines Connection, Channel, queue, delay
11+
// RabbitMQ contains Connection and Channel
1212
type RabbitMQ struct {
1313
Connection *amqp.Connection
1414
Channel *amqp.Channel
1515
queue amqp.Queue
16+
dsn string
1617
delay time.Duration
1718
done chan bool
1819
}
1920

20-
// Init function connect to RabbitMQ and open a channel
21-
func Init(user, password, host, port string, reconnectTime time.Duration) (*RabbitMQ, error) {
22-
url := fmt.Sprintf("amqp://%s:%s@%s:%s/", user, password, host, port)
21+
// New connects to RabbitMQ and open a channel
22+
func New(user, password, host, port string, reconnectTime time.Duration) (*RabbitMQ, error) {
23+
dsn := fmt.Sprintf("amqp://%s:%s@%s:%s/", user, password, host, port)
2324

24-
conn, connErr := amqp.Dial(url)
25+
conn, connErr := amqp.Dial(dsn)
2526
if connErr != nil {
2627
return nil, fmt.Errorf("failed to connect to RabbitMQ: %v", connErr)
2728
}
@@ -32,15 +33,16 @@ func Init(user, password, host, port string, reconnectTime time.Duration) (*Rabb
3233
}
3334

3435
done := make(chan bool)
35-
rabbitMQ := &RabbitMQ{Connection: conn, Channel: ch, delay: reconnectTime, done: done}
3636

37-
go reconnectConnection(rabbitMQ, url)
38-
go reconnectChannel(rabbitMQ)
37+
rabbitMQ := &RabbitMQ{Connection: conn, Channel: ch, dsn: dsn, delay: reconnectTime, done: done}
38+
39+
rabbitMQ.reconnectConnection()
40+
rabbitMQ.reconnectChannel()
3941

4042
return rabbitMQ, nil
4143
}
4244

43-
// ExchangeDeclare function declare an exchange
45+
// ExchangeDeclare declare an exchange
4446
func (r *RabbitMQ) ExchangeDeclare(name, exchangeType string, durable, autoDelete, internal, noWait bool, arguments amqp.Table) error {
4547
err := r.Channel.ExchangeDeclare(name, exchangeType, durable, autoDelete, internal, noWait, arguments)
4648
if err != nil {
@@ -49,7 +51,7 @@ func (r *RabbitMQ) ExchangeDeclare(name, exchangeType string, durable, autoDelet
4951
return nil
5052
}
5153

52-
// QueueDeclare function declare a queue
54+
// QueueDeclare declare a queue
5355
func (r *RabbitMQ) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, arguments amqp.Table) error {
5456
queue, queueErr := r.Channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, arguments)
5557
if queueErr != nil {
@@ -59,7 +61,7 @@ func (r *RabbitMQ) QueueDeclare(name string, durable, autoDelete, exclusive, noW
5961
return nil
6062
}
6163

62-
// QueueBind function bind a queue
64+
// QueueBind bind a queue
6365
func (r *RabbitMQ) QueueBind(key, exchangeName string, noWait bool, arguments amqp.Table) error {
6466
err := r.Channel.QueueBind(r.queue.Name, key, exchangeName, noWait, arguments)
6567
if err != nil {
@@ -68,7 +70,7 @@ func (r *RabbitMQ) QueueBind(key, exchangeName string, noWait bool, arguments am
6870
return nil
6971
}
7072

71-
// Qos function controls how many messages or how many bytes the server will try to keep on
73+
// Qos controls how many messages or how many bytes the server will try to keep on
7274
// the network for consumers before receiving delivery ack
7375
func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) error {
7476
err := r.Channel.Qos(prefetchCount, prefetchSize, global)
@@ -78,7 +80,7 @@ func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) error {
7880
return nil
7981
}
8082

81-
// Consume function starts delivering queued messages
83+
// Consume starts delivering queued messages
8284
func (r *RabbitMQ) Consume(consumer string, autoAck, exclusive, noLocal, noWait bool, arguments amqp.Table) (<-chan amqp.Delivery, error) {
8385
deliveries := make(chan amqp.Delivery)
8486

@@ -102,5 +104,4 @@ func (r *RabbitMQ) Consume(consumer string, autoAck, exclusive, noLocal, noWait
102104
}()
103105

104106
return deliveries, nil
105-
106107
}

reconnect.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import (
66
"time"
77
)
88

9-
// reconnectConnection function reconnect a connection
10-
func reconnectConnection(rabbitMQ *RabbitMQ, url string) {
9+
// reconnectConnection reconnect a connection
10+
func (r *RabbitMQ) reconnectConnection() {
1111
for {
1212
chanError := make(chan *amqp.Error)
1313

14-
closeReason, ok := <-rabbitMQ.Connection.NotifyClose(chanError)
14+
closeReason, ok := <-r.Connection.NotifyClose(chanError)
1515
if !ok {
1616
log.Println("connection closed by developer")
1717
break
@@ -20,11 +20,11 @@ func reconnectConnection(rabbitMQ *RabbitMQ, url string) {
2020
log.Printf("reason for the connection closed: %v\n", closeReason)
2121

2222
for {
23-
time.Sleep(rabbitMQ.delay)
23+
time.Sleep(r.delay)
2424

25-
conn, connErr := amqp.Dial(url)
25+
conn, connErr := amqp.Dial(r.dsn)
2626
if connErr == nil {
27-
rabbitMQ.Connection = conn
27+
r.Connection = conn
2828
log.Println("connection successfully reconnected")
2929
break
3030
}
@@ -34,29 +34,29 @@ func reconnectConnection(rabbitMQ *RabbitMQ, url string) {
3434
}
3535
}
3636

37-
// reconnectChannel function reconnect a channel
38-
func reconnectChannel(rabbitMQ *RabbitMQ) {
37+
// reconnectChannel reconnect a channel
38+
func (r *RabbitMQ) reconnectChannel() {
3939
for {
4040
chanError := make(chan *amqp.Error)
4141

42-
closeReason, ok := <-rabbitMQ.Channel.NotifyClose(chanError)
42+
closeReason, ok := <-r.Channel.NotifyClose(chanError)
4343

4444
if !ok {
45-
rabbitMQ.done <- true
45+
r.done <- true
4646
log.Println("channel closed by developer")
4747
break
4848
}
4949

5050
log.Printf("reason for the channel closed: %v\n", closeReason)
5151

5252
for {
53-
time.Sleep(rabbitMQ.delay)
53+
time.Sleep(r.delay)
5454

55-
ch, chErr := rabbitMQ.Connection.Channel()
55+
ch, chErr := r.Connection.Channel()
5656
if chErr == nil {
57-
rabbitMQ.Channel = ch
57+
r.Channel = ch
5858
log.Println("channel successfully reconnected")
59-
rabbitMQ.done <- false
59+
r.done <- false
6060
break
6161
}
6262

0 commit comments

Comments
 (0)