MQTT en Go: Consumidor que Almacena Mensajes en MongoDB

MQTT en Go: Consumidor que Almacena Mensajes en MongoDB

En estos días, he estado migrando algunos de mis scripts de Python a ejecutables escritos en Go. ¿La razón? Para mantener mi habilidad en Go y, en última instancia, porque Python nunca terminó de convencerme por completo. Aprovechando que he configurado mi propio servidor MQTT, he decidido trasladar un script que previamente recibía mensajes en varios tópicos y los reenviaba a otra ubicación. Ahora, en lugar de reenviarlos, los recibo y los almaceno en una base de datos MongoDB.

Uno de los requisitos que me impuse fue que el ejecutable se pudiera lanzar como un servicio utilizando systemd de la siguiente manera:

systemctl start mqttconsumertomongo.service

Para lograr esto, el ejecutable debe leer el archivo de configuración pasándole la ruta como parámetro del comando, de la siguiente manera:

mqttconsumertomongo.exe /path/to/settings.json

Esta configuración permite una integración fluida con systemd y asegura que el programa funcione según lo previsto.

Configuración del servicio systemd

El archivo para lanzar el ejecutable como servicio es realmente muy simple, como pueden ver a continuación:

[Unit]
Description=Consumer mqtt to save mongodb
After=network.target

[Service]
ExecStart=/path/to/mqttconsumertomongo.exe /path/to/settings.json
Type=simple
Restart=on-failure

[Install]
WantedBy=multi-user.target

Si no sabes cómo configurar y lanzar un servicio systemd, puedes consultar esta publicación que escribí.

Código

El código fuente es bastante sencillo, sin más pretensiones que realizar la tarea necesaria: recibir datos y guardarlos en una base de datos MongoDB.

Configuración settings.json

El código está configurado para leer un archivo llamado “settings.json” en la misma ubicación por defecto o para recibir la ruta del archivo como argumento al momento de ejecutarlo, como mencioné anteriormente.

Ejemplo

{
    "mqttSettings": {
        "mqttBrokerURL": "mqtt://localhost:1883",
        "user": "testuser",
        "password": "testpassword",
        "clientId": "testclient",
        "topics": {
            "topic1": 1,
            "topic2": 2
        }
    },
    "mongodbSettings": {
        "url": "mongodb://localhost:27017",
        "authSource": "admin",
        "username": "testuser",
        "password": "testpassword",
        "dbName": "testdb",
        "collection": "testcollection"
    }
}

Una vez configurado el archivo de conexiones y tópicos, este se lee desde main.go y se inician las configuraciones necesarias. La lectura y transformación del archivo a una estructura se realiza en el archivo settings/read_settings.go

Flujo del programa

En el archivo main.gose leen las configuraciones para luego ejecutar tres funciones que se encuentran en el archivos services/srv.go:

  • services.MqttConnection: Función donde se lleva acabo la conexión hacia un servidor MQTT.
  • services.MongoConnection: Función para conectarse a una base de datos MongoDB.
  • services.MqttSubscribe: La función que toma la lista de tópicos y se suscribe a ellos es capaz de recibir comodines del tipo topic/#. El valor numérico representa el nivel de calidad de servicio (QoS) para la suscripción.

Dentro del archivo srv.go, existe una función llamada MessageHandler. Esta función se encarga de manejar los mensajes MQTT que se reciben en los tópicos suscritos y también guárdalos a la base de datos.

func MessageHandler(client mqtt.Client, msg mqtt.Message) {

    var data = map[string]interface{}{}
    err := json.Unmarshal(msg.Payload(), &data)

    if err != nil {
        data["topic"] = msg.Topic()
        data["payload"] = string(msg.Payload())
        mongoClient.Database(settingsValues.MongodbSettings.DbName).Collection(settingsValues.MongodbSettings.Collection).InsertOne(context.Background(), data)

        return
    }
    data["topic"] = msg.Topic()
    collectionValue, isCollection := data["collection"]
    if isCollection {
        collection := fmt.Sprintf("%v", collectionValue)
        mongoClient.Database(settingsValues.MongodbSettings.DbName).Collection(string(collection)).InsertOne(context.Background(), data)
    } else {
        mongoClient.Database(settingsValues.MongodbSettings.DbName).Collection(settingsValues.MongodbSettings.Collection).InsertOne(context.Background(), data)
    }
}

La lógica es sencilla: cuando llega un mensaje al tópico suscrito, se verifica si es de tipo JSON. Si no lo es, se genera un mapa (JSON) con dos propiedades: topic y payload, y se guarda en la colección configurada en mongodbSettings.collection.

En caso de que sea un texto en formato JSON, se verifica la existencia de una propiedad llamada collection. Si esta propiedad existe, el mensaje se guarda en la colección especificada; de lo contrario, se almacena en la colección configurada por defecto.

Test Unitarios

El código incluye algunos test unitarios, aunque no son extensos. En realidad, los creé principalmente para realizar pruebas sencillas sobre cómo hacer mocks para Mongodb y Mqtt, y no tienen la intención de cubrir la mayoría del código ni de ser exhaustivos. Fue más bien un experimento que podría ser útil si necesitas ver cómo se crearon los mocks para conexiones externas utilizando interfaces y extensiones de estructuras (similar a la herencia).

Las interfaces para estos mocks se encuentran en la carpeta interfaces, y los únicos archivos de prueba son aquellos que tienen el prefijo _test.go, siguiendo las convenciones del lenguaje.

Makefile

En la mayoría de mis proyectos, suelo crear archivos Makefile para simplificar el proceso de ejecución de comandos y parámetros repetitivos. Esto facilita enormemente la gestión y automatización de tareas en el desarrollo y la construcción del proyecto.

En el archivo Makefile solo tengo cuatro targets:

  • build.linux
  • build.windows
  • build.darwin
  • run.test

Todos los comandos que dicen “Build” son utilizados para compilar el código en el sistema operativo correspondiente, y el nombre del archivo resultante será mqttconsumertomongo.exe.

Nota: Aunque es posible compilar para Windows desde un sistema que no sea Windows, los archivos Makefile no funcionarán en Windows a menos que se utilice Windows Subsystem for Linux (WSL).

El último objetivo (target) se utiliza únicamente para ejecutar las pruebas unitarias, como mencioné anteriormente.

Todas las compilaciones incluyen ciertos flags para reducir el tamaño del archivo resultante:

go build -ldflags="-s -w" -o mqttconsumertomongo.exe main.go

ldflags se refiere a linker flags y los flags que se le pasan son -s y -w. El primero elimina la tabla de símbolos y la información de depuración, y el segundo elimina la información de depuración DWARF. Esto resulta en la reducción de un archivo de 12MB a aproximadamente 8MB.

-trimpath durante la compilación en Go se utiliza para eliminar las rutas de acceso absolutas de los archivos en la información de depuración incluida en el binario final.

El código pueden clonarlo desde este repositorio.