Message Queue Example using RabbitMQ in Go

go dev.to

Introduction

A message queue is an essential component in a distributed system, allowing asynchronous communication across services for better performance, scalability, and availability.

RabbitMQ is one of the services that can be implemented as a message queue. Especially with the Go programming language.

RabbitMQ is an open source message broker service that provides essential features for versatile and reliable communication for applications, including microservices, IoT, and real-time data.

Project Overview

In this example, we will build a simple advertisement platform. This platform will send a notification if the advertisement is created. The message queue is utilized as a buffer between the advertisement service and the notification service.

There are several reasons why message queue is implemented:

  1. The Message queue allows the messages stored persistently. For instance, if the notification service is down or offline, the messages are still persistent inside the queue. When the notification service is up again, the messages are consumed from the queue.

  2. A message queue allows for reducing coupling between the related services.

This is the architecture diagram of this project.

Develop the Project

Project Setup

The project is implemented using microservices architecture. These are the key components of the project:

  • Advertisement service: the service to handle advertisement feature including create a new advertisement.
  • Notification service: the service for handling the advertisement notification.
  • API Gateway: the main entrypoint for client to accessing the available services.

The project structure is implemented using Go workspace. This is the overview of the project structure.

📦go-mini-ads
 ┣ 📂ads
 ┣ 📂common
 ┃ ┣ 📂models
 ┃ ┣ 📂rabbitmq
 ┃ ┣ 📂utils
 ┣ 📂gateway
 ┣ 📂notification
 ┣ 📜.env
 ┣ 📜.gitignore
 ┣ 📜docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

The very first step is to create a Go workspace. Initialize the Go Workspace using this command.

go work init
Enter fullscreen mode Exit fullscreen mode

After creating the workspace, notice that the go.work file is created. Adjust the go.work file to include the required packages.

go 1.25.5

use (
    ./ads
    ./gateway
    ./notification
    ./common
)
Enter fullscreen mode Exit fullscreen mode

Create the .env file to store the required configurations.

APP_MODE="production"

ADS_SERVICE_BASE_URL="http://ads-service:1323"

RABBITMQ_USER="admin"
RABBITMQ_PASS="secretpassword123"
RABBITMQ_HOST="rabbitmq-service"
RABBITMQ_PORT="5672"
Enter fullscreen mode Exit fullscreen mode

If not using Docker compose, ensure to change the app mode to dev and change ads-service & RABBITMQ_HOST to localhost.

Create Common Utilities

Create a new directory called common. After that, initialize a go module for this package.

go mod init common
Enter fullscreen mode Exit fullscreen mode

Install the required packages.

go get github.com/spf13/viper
go get github.com/rabbitmq/amqp091-go
Enter fullscreen mode Exit fullscreen mode

Inside the common directory, create a new directory called models to store required entities for the services.

Inside the models directory, create a new file called ad.go to store the data model of the advertisement.

package models

type Ad struct {
    ID       string `json:"id"`
    Title    string `json:"title"`
    Content  string `json:"content"`
    Category string `json:"category"`
}

Enter fullscreen mode Exit fullscreen mode

Inside the models directory, create a new file called request.go to store the request entity for creating a new advertisement.

package models

type AdRequest struct {
    Title    string `json:"title"`
    Content  string `json:"content"`
    Category string `json:"category"`
}

Enter fullscreen mode Exit fullscreen mode

Inside the models directory, create a new file called response.go to store the response entity. This entity is using generic which means the type of the data can be adjusted dynamically.

package models

type Response[T any] struct {
    Success bool   `json:"success"`
    Message string `json:"message"`
    Data    T      `json:"data"`
}

Enter fullscreen mode Exit fullscreen mode

After the entities are created, create a new directory called utils to store the utility functions and constants. The utils package will be used across the services.

Create a file called constants.go to store the constant values. The constant values are used for ensuring the consistency of the value.

package utils

const ADS_QUEUE = "ads"

// env value keys
const (
    APP_MODE             = "APP_MODE"
    ADS_SERVICE_BASE_URL = "ADS_SERVICE_BASE_URL"
    RABBITMQ_USER        = "RABBITMQ_USER"
    RABBITMQ_PASS        = "RABBITMQ_PASS"
    RABBITMQ_HOST        = "RABBITMQ_HOST"
    RABBITMQ_PORT        = "RABBITMQ_PORT"
)

const (
    APP_MODE_PRODUCTION = "production"
)

Enter fullscreen mode Exit fullscreen mode

This application uses .env file for storing and fetching the configurations. To fetch the values from the .env file, create a new utility function called GetConfig inside the config.go file.

package utils

import (
    "log"
    "os"

    "github.com/spf13/viper"
)

func GetConfig(key string) string {
    isDevelop := os.Getenv(APP_MODE) != APP_MODE_PRODUCTION

    if isDevelop {
        viper.AddConfigPath(".")
        viper.SetConfigFile(".env")

        if err := viper.ReadInConfig(); err != nil {
            log.Fatalf("error when reading configuration file: %s\n", err)
        }

        return viper.GetString(key)
    }

    return os.Getenv(key)
}

Enter fullscreen mode Exit fullscreen mode

To interact with RabbitMQ service conveniently, create a dedicated directory called rabbitmq. Inside this directory, create a new file called rabbitmq.go.

Inside the rabbitmq.go, the RabbitMQ instance will be created, this instance can initialize required setups (like: connection, queue and channel) and provides basic functionalities like publish and consume messages.

First, import the required package and create a struct that act as a RabbitMQ instance.

package rabbitmq

import (
    "common/utils"
    "context"
    "errors"
    "fmt"

    "github.com/rabbitmq/amqp091-go"
    amqp "github.com/rabbitmq/amqp091-go"
)

type RabbitMQ struct {
    Channel *amqp.Channel
    Queue   amqp.Queue
}
Enter fullscreen mode Exit fullscreen mode

Create a function to retrieve the RabbitMQ address.

func GetRabbitMQAddress() string {
    return fmt.Sprintf("amqp://%s:%s@%s:%s/",
        utils.GetConfig(utils.RABBITMQ_USER),
        utils.GetConfig(utils.RABBITMQ_PASS),
        utils.GetConfig(utils.RABBITMQ_HOST),
        utils.GetConfig(utils.RABBITMQ_PORT),
    )
}

Enter fullscreen mode Exit fullscreen mode

Create a function to initialize the RabbitMQ connection. This function returns a connection object that can be used for interacting with the RabbitMQ service.

func InitConnection(address string) (*amqp.Connection, error) {
    conn, err := amqp.Dial(address)
    if err != nil {
        return nil, fmt.Errorf("failed to connect: %s\n", err)
    }

    return conn, nil
}

Enter fullscreen mode Exit fullscreen mode

Create a function to initialize the connection channel. This function returns a connection channel that can be used for using RabbitMQ features like send and consume messages.

func InitChannel(conn *amqp.Connection) (*amqp.Channel, error) {
    ch, err := conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("failed to open channel: %s\n", err)
    }

    return ch, nil
}

Enter fullscreen mode Exit fullscreen mode

Create a function to initialize the queue. The queue uses FIFO principle which means the first message comes in, that message will be retrieved first.

func DeclareQueue(ch *amqp.Channel) (amqp.Queue, error) {
    q, err := ch.QueueDeclare(
        utils.ADS_QUEUE, // name
        true,            // durability
        false,           // delete when unused
        false,           // exclusive
        false,           // no-wait
        amqp.Table{
            amqp.QueueTypeArg: amqp.QueueTypeQuorum,
        },
    )
    if err != nil {
        return amqp.Queue{}, fmt.Errorf("failed to create queue: %s\n", err)
    }

    return q, nil
}

Enter fullscreen mode Exit fullscreen mode

Create a function to initialize the RabbitMQ instance. When initializing this instance, the previous created functions are called and assembled together.

Using this approach makes the code more clean and easy to refactor when there is an additional changes when initializing the instance.

func InitRabbitMQ() (*RabbitMQ, error) {
    address := GetRabbitMQAddress()
    conn, connErr := InitConnection(address)
    ch, chErr := InitChannel(conn)
    q, qErr := DeclareQueue(ch)

    err := errors.Join(connErr, chErr, qErr)

    if err != nil {
        return nil, fmt.Errorf("failed to create RabbitMQ instance: %s\n", err)
    }

    return &RabbitMQ{
        Channel: ch,
        Queue:   q,
    }, nil
}

Enter fullscreen mode Exit fullscreen mode

Create a method for RabbitMQ instance to publish a message to the queue. This method requires context object, queue name and the message content.

func (rq *RabbitMQ) PublishWithCtx(ctx context.Context, queueName string, body []byte) error {
    return rq.Channel.PublishWithContext(ctx,
        "",        // exchange
        queueName, // routing key
        false,     // mandatory
        false,     // immediate
        amqp.Publishing{
            ContentType: "application/json",
            Body:        body,
        })
}

Enter fullscreen mode Exit fullscreen mode

Create a method for RabbitMQ instance to consume the messages from the queue. This method requires queue name.

func (rq *RabbitMQ) Consume(queueName string) (<-chan amqp091.Delivery, error) {
    return rq.Channel.Consume(
        queueName, // queue
        "",        // consumer
        true,      // auto-ack
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
}

Enter fullscreen mode Exit fullscreen mode

Create Ads Service

Create a new directory called ads. After that, initialize a go module for this package.

go mod init ads-service
Enter fullscreen mode Exit fullscreen mode

Install the required packages.

go get github.com/google/uuid
go get github.com/labstack/echo/v5
go get github.com/labstack/echo/v5/middleware
go get github.com/rabbitmq/amqp091-go
Enter fullscreen mode Exit fullscreen mode

Create a new file called main.go to store the main logic inside this service.

Import the required packages.

package main

import (
    "common/models"
    "common/rabbitmq"
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/google/uuid"
    "github.com/labstack/echo/v5"
    "github.com/labstack/echo/v5/middleware"
)

var RMQ *rabbitmq.RabbitMQ
Enter fullscreen mode Exit fullscreen mode

Inside the main function, initialize the Echo application and RabbitMQ instance.

func main() {
    var err error

    e := echo.New()
    e.Use(middleware.RequestLogger())

    RMQ, err = rabbitmq.InitRabbitMQ()

    if err != nil {
        log.Fatalf("failed to init RabbitMQ: %s\n", err)
    }

    // create controller here

    // start the server
    if err := e.Start(":1323"); err != nil {
        e.Logger.Error("failed to start server", "error", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

After the Echo and RabbitMQ instance are initialized, Create a controller for creating the advertisement data.

func main() {
    // other codes...
    e.POST("/ads/create", func(c *echo.Context) error {
        req := new(models.AdRequest)

        // bind request body
        if err := c.Bind(req); err != nil {
            return c.JSON(http.StatusBadRequest, models.Response[any]{
                Success: false,
                Message: "request invalid",
            })
        }

        // create advertisement record
        record := models.Ad{
            ID:       uuid.NewString(),
            Title:    req.Title,
            Content:  req.Content,
            Category: req.Category,
        }

        // send record to the message queue
        if err := sendToQueue(record); err != nil {
            return c.JSON(http.StatusInternalServerError, models.Response[any]{
                Success: false,
                Message: err.Error(),
            })
        }

        // return created advertisement record
        return c.JSON(http.StatusCreated, models.Response[models.Ad]{
            Success: true,
            Message: "ad created",
            Data:    record,
        })
    })
}
Enter fullscreen mode Exit fullscreen mode

Notice that in the create advertisement controller, the sendToQueue is called for sending the advertisement record to the message queue.

Create a function to store the advertisement record to the message queue called sendToQueue. Create this function outside the main function.

func sendToQueue(data models.Ad) error {
    // create context object
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // parse advertisement data
    body, err := json.Marshal(data)
    if err != nil {
        return fmt.Errorf("failed to parse data: %s\n", err)
    }

    // send advertisement data to the message queue
    err = RMQ.PublishWithCtx(ctx, RMQ.Queue.Name, body)
    if err != nil {
        return fmt.Errorf("failed to publish message: %s\n", err)
    }

    return nil
}

Enter fullscreen mode Exit fullscreen mode

The sendToQueue function sends the advertisement record to the message queue by calling the PublishWithCtx method from the RabbitMQ instance.

Create a Dockerfile for running this service as a Docker container.

FROMgolang:1.25ASbuild-stage

WORKDIR /app

COPY . .

WORKDIR /app/ads

RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o /adsapp

FROMgcr.io/distroless/base-debian11ASbuild-release-stage

WORKDIR /

COPY --from=build-stage /adsapp /adsapp

EXPOSE 1323

USER nonroot:nonroot

ENTRYPOINT [ "./adsapp" ]
Enter fullscreen mode Exit fullscreen mode

Create API Gateway Service

Create a new directory called gateway. After that, initialize a go module for this package.

go mod init gateway-service
Enter fullscreen mode Exit fullscreen mode

Install the required packages.

go get github.com/labstack/echo/v5
go get github.com/labstack/echo/v5/middleware
Enter fullscreen mode Exit fullscreen mode

Create a main function inside the main.go file. Then, import the required packages.

package main

import (
    "bytes"
    "common/models"
    "common/utils"
    "encoding/json"
    "fmt"
    "net/http"
    "time"

    "github.com/labstack/echo/v5"
    "github.com/labstack/echo/v5/middleware"
)

Enter fullscreen mode Exit fullscreen mode

Inside the main function, initialize the Echo application and start the server as well.

func main() {
    e := echo.New()
    e.Use(middleware.RequestLogger())

    e.POST("/api/ads", createAd)

    if err := e.Start(":8080"); err != nil {
        e.Logger.Error("failed to start server", "error", err)
    }
}

Enter fullscreen mode Exit fullscreen mode

Create a controller called createAd to create a new advertisement record. This controller sends HTTP request to the advertisement service to create a new advertisement record.

Before sending the HTTP request, parse the request body and initialize the HTTP client.

func createAd(c *echo.Context) error {
    adReq := new(models.AdRequest)

    if err := c.Bind(adReq); err != nil {
        return c.JSON(http.StatusBadRequest, models.Response[any]{
            Success: false,
            Message: "request invalid",
        })
    }

    // create HTTP client with 10-second request timeout
    client := &http.Client{
        Timeout: 10 * time.Second,
    }

    // parse request body as a JSON
    body, err := json.Marshal(adReq)
    if err != nil {
        return c.JSON(http.StatusBadRequest, models.Response[any]{
            Success: false,
            Message: "request invalid",
        })
    }

    adsServiceUrl := utils.GetConfig(utils.ADS_SERVICE_BASE_URL)
    adsEndpoint := fmt.Sprintf("%s/ads/create", adsServiceUrl)

    // create HTTP POST request to the advertisement service
    request, err := http.NewRequest(http.MethodPost, adsEndpoint, bytes.NewBuffer(body))

    if err != nil {
        return c.JSON(http.StatusInternalServerError, models.Response[any]{
            Success: false,
            Message: "failed to create a request",
        })
    }

    request.Header.Set("Content-Type", "application/json")
}

Enter fullscreen mode Exit fullscreen mode

After the request body is parsed and HTTP client is initialized, send the HTTP request and store the response body in a JSON format.

func createAd(c *echo.Context) error {
    // other codes...
    // send HTTP request
    response, err := client.Do(request)
    if err != nil || response.StatusCode != http.StatusCreated {
        return c.JSON(http.StatusInternalServerError, models.Response[any]{
            Success: false,
            Message: "failed to create ad",
        })
    }

    // close the response body after parse process is completed
    defer response.Body.Close()

    // variable to store the response body
    var data models.Response[models.Ad]

    // parse response body and store in the "data" variable
    err = json.NewDecoder(response.Body).Decode(&data)
    if err != nil {
        return c.JSON(http.StatusInternalServerError, models.Response[any]{
            Success: false,
            Message: "failed to parse ad data",
        })
    }

    return c.JSON(http.StatusCreated, data)
}

Enter fullscreen mode Exit fullscreen mode

Create a Dockerfile for running this service as a Docker container.

FROMgolang:1.25ASbuild-stage

WORKDIR /app

COPY . .

WORKDIR /app/gateway

RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o /gatewayapp

FROMgcr.io/distroless/base-debian11ASbuild-release-stage

WORKDIR /

COPY --from=build-stage /gatewayapp /gatewayapp

EXPOSE 1323

USER nonroot:nonroot

ENTRYPOINT [ "./gatewayapp" ]
Enter fullscreen mode Exit fullscreen mode

Create Notification Service

Create a new directory called notification. After that, initialize a go module for this package.

go mod init notification-service
Enter fullscreen mode Exit fullscreen mode

Install the required package.

go get github.com/rabbitmq/amqp091-go
Enter fullscreen mode Exit fullscreen mode

Create a main function inside the main.go file. Then, import the required packages.

package main

import (
    "common/models"
    "common/rabbitmq"
    "encoding/json"
    "fmt"
    "log"
    "strings"
)

var RMQ *rabbitmq.RabbitMQ
Enter fullscreen mode Exit fullscreen mode

In the main function, initialize the RabbitMQ instance and consume the incoming messages from the queue.

The incoming messages are retrieved from the message queue via delivery channel. Consume the messages inside the goroutine.

func main() {
    var err error

    // initialize RabbitMQ instance
    RMQ, err = rabbitmq.InitRabbitMQ()

    if err != nil {
        log.Fatalf("failed to init RabbitMQ: %s\n", err)
    }

    // consume messages
    msgs, err := RMQ.Consume(RMQ.Queue.Name)

    if err != nil {
        log.Fatalf("failed to consume queue: %s\n", err)
    }

    var forever chan struct{}

    // parse the messages
    go func() {
        for d := range msgs {
            parseMessage(d.Body)
        }
    }()

    log.Println("[*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
Enter fullscreen mode Exit fullscreen mode

The parseMessage function parses the incoming messages from the queue. This is the complete implementation of the function.

func parseMessage(body []byte) {
    var record models.Ad

    if err := json.Unmarshal(body, &record); err != nil {
        log.Fatalf("failed to parse message: %s\n", err)
    }

    fmt.Println("send notification...")
    fmt.Println(strings.Repeat("=", 5))
    fmt.Println("title: ", record.Title)
    fmt.Println("content: ", record.Content)
    fmt.Println("category: ", record.Category)
    fmt.Println(strings.Repeat("=", 5))
    fmt.Println("notification sent")
}

Enter fullscreen mode Exit fullscreen mode

Create a Dockerfile for running this service as a Docker container.

FROMgolang:1.25ASbuild-stage

WORKDIR /app

COPY . .

WORKDIR /app/notification

RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o /notifapp

FROMgcr.io/distroless/base-debian11ASbuild-release-stage

WORKDIR /

COPY --from=build-stage /notifapp /notifapp

EXPOSE 1323

USER nonroot:nonroot

ENTRYPOINT [ "./notifapp" ]
Enter fullscreen mode Exit fullscreen mode

Configure the Services

The services can be run and managed easily using Docker compose. This is the complete configuration of the Docker compose for this project.

services:
  ads-service:
    build:
      context: .
      dockerfile: ads/Dockerfile
    container_name: ads-service
    environment:
      - APP_MODE=${APP_MODE}
      - RABBITMQ_USER=${RABBITMQ_USER}
      - RABBITMQ_PASS=${RABBITMQ_PASS}
      - RABBITMQ_HOST=${RABBITMQ_HOST}
      - RABBITMQ_PORT=${RABBITMQ_PORT}
    ports:
      - "1323:1323"
    restart: on-failure
    networks:
      - my-network
    depends_on:
      - rabbitmq-service

  gateway-service:
    build:
      context: .
      dockerfile: gateway/Dockerfile
    container_name: gateway-service
    environment:
      - APP_MODE=${APP_MODE}
      - ADS_SERVICE_BASE_URL=${ADS_SERVICE_BASE_URL}
    ports:
      - 8080:8080
    restart: on-failure
    networks:
      - my-network
    depends_on:
      - ads-service

  notification-service:
    build:
      context: .
      dockerfile: notification/Dockerfile
    container_name: notification-service
    environment:
      - APP_MODE=${APP_MODE}
      - RABBITMQ_USER=${RABBITMQ_USER}
      - RABBITMQ_PASS=${RABBITMQ_PASS}
      - RABBITMQ_HOST=${RABBITMQ_HOST}
      - RABBITMQ_PORT=${RABBITMQ_PORT}
    restart: on-failure
    networks:
      - my-network
    depends_on:
      - rabbitmq-service

  rabbitmq-service:
    image: rabbitmq:4-management
    ports:
      - "5672:5672"
      - "15672:15672"
    container_name: rabbitmq-service
    environment:
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    networks:
      - my-network

volumes:
  rabbitmq_data:

networks:
  my-network:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

Run the Project

The services can be run using this command. This command run all the specified services in the Docker compose file.

docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Ensure all the services are healthy. Check it using docker ps. If all the containers status are Up. you are good to go.

Try to send the HTTP request. This request is sent to the gateway service.

curl -X POST "http://localhost:8080/api/ads" \
  -H "Content-Type: application/json" \
  -d '{"title":"new ad","content":"ad content","category":"anything but new"}'
Enter fullscreen mode Exit fullscreen mode

If successful, the notification service should be able to parse the message from the queue. Check it using docker logs notification-service

send notification...
=====
title:  new ad
content:  ad content
category:  anything but new
=====
notification sent
Enter fullscreen mode Exit fullscreen mode

Sources

⭐️ If you found this post is helpful, don't forget to like and save it.

💬 If you have any feedbacks, please let me know in the comment section down below.

Thank you

Source: dev.to

arrow_back Back to Tutorials