Un blog personal

Un pequeño cliente MQTT Con Elixir y GenServer

· Esteban Zapata Rojas

¡Hola a todos! 👋

Como les expliqué en mi anterior post, estoy aprendiendo Elixir y estoy más que contento con lo versátil y bien robusto que es el lenguaje, y algo que me gusta mucho es que se pueden utilizar módulos Erlang de manera intuitiva y sin ninguna configuración extraña o compleja 😎.

Y gracias a eso, hoy les voy a escribir un poco de cómo implementar un cliente MQTT con una librería que se llama emqtt, que precisamente es una librería Erlang y nos va permitir, con un GenServer, tener un proceso que esté escuchando de forma constante un tópico de un broker MQTT.

Antes de empezar, la librería que vamos a usar acá requiere una versión específica de Erlang/OTP: 25.x, porque hay un issue donde se confirma que no funciona bien con la última versión de Erlang/OTP.

¿MQTT?

Pero antes ¿Qué es MQTT? 🤔 pues nada más y nada menos que un protocolo M2M (Machine to Machine), ligero y muy robusto que se utiliza muchísimo en aplicaciones IoT, debido a su relativa simplicidad y su forma de controlar el flujo de información.

Los invito a que se peguen una pasada por este excelente recurso que HiveMQ nos ofrece al respecto de MQTT y todas sus características y ventajas: HiveMQ MQTT Essentials.

Lo básico

Bueno, para empezar por lo básico, necesitamos una aplicación con un Supervisor, que es un proceso que se encarga de monitorear otros procesos y se encarga de la gestión de los hijos.

Es importante tener el supervisor, porque este será el encargado de controlar nuestro GenServer, y garantizar que cuando el proceso termine, la aplicación también lo haga.

Debido a que este patrón es casi que estándar a la hora de escribir programas en Elixir, las herramientas base ya incluyen templates para crear el Supervisor por nosotros, por lo que sólo es necesario ejecutar el siguiente comando:

mix new <nombre-del-aplicativo> --sup

Donde --sup es la opción que le dice a Mix que cree un proyecto con un Supervisor configurado.

¡Ahora sí, manos a la obra!

El GenServer

GenServer hace referencia a Generic Server o servidor genérico, que nos permite tener un proceso principal encargado de recibir y enviar señales o respuestas a otros procesos, de forma sincrónica o asincrónica, dependiendo de cómo el proceso externo se comunique con nosotros.

Veamos los que vamos a necesitar para nuestro GenServer:

defmodule MqttClient do
  @docmodule false

  use GenServer

  def start_link(_), do: GenServer.start_link(__MODULE__, %{})

  @impl true
  def init(_) do
    Process.flag(:trap_exit, true)

    # Stuff
    {:ok, state, {:continue, :startMQTT}}
  end

  @impl true
  def handle_continue(:startMQTT, state) do
    # stuff
    {:noreply, state}
  end

  @impl true
  def handle_info({:publish, publish}, state) do
    # stuff
    {:noreply, state}
  end

  @impl true
  def handle_call(_data, _from, _state), do: {:ok, state}

  @impl true
  def handle_cast(_data, _state), do: {:noreply, state}
end

En teoría, es un GenServer clásico, pero vemos que la información va a ser manejada vía handle_info, porque la librería MQTT que utilizaremos, no tiene noción de un servidor, por lo que no puede usar cast (llamada asincrónica a un servidor) o call (llamada sincrónica) y lo que haremos entonces es utilizar la comunicación entre procesos.

Otro handler que también llama la atención es handle_continue, que simplemente lo vamos a utilizar para inicializar el cliente MQTT sin bloquear la inicialización del GenServer, porque todo lo que se hace en init bloquea el proceso principal hasta que termine con una respuesta.

Ahora, si nos centramos en handle_info, vamos a notar que espera un mensaje con la estructura {:publish, _}, esto es debido al nombre de los paquetes que utiliza MQTT para enviar información: PUBLISH. Si deseamos capturar un paquete diferente, como CONNECT, entonces sólo es crear una nueva firma para handle_info con el mensaje de estructura {:connect, _}.

¿Cómo saber si un método es async o no en un GenServer? Pues si retorna una tupla {:noreply, _}, significa que es async, de lo contrario, es un proceso que bloquea la ejecución principal.

Comencemos a llenar los métodos poco a poco:

@impl true
def init(_) do
  Process.flag(:trap_exit, true)

  # Capturamos la configuración desde config/config.exs | config/runtime.exs
  emqtt_options = Application.fetch_env!(:mqtt_client, :options)

  {:ok, emqtt_pid} = :emqtt.start_link(emqtt_options)

  {:ok, %{emqtt_pid: emqtt_pid}, {:continue, :StartMQTT}}
end

En este primer paso, estamos capturando el pid del proceso que crea la librería :emqtt y la estamos asociando a nuestro GenServer para que se puedan comunicar via handle_info. Después, pasamos el pid al estado del GenServer y lo delegamos a handle_continue para que no bloquee el proceso principal.

emqtt_options es una lista de opciones que nos permite especificar, entre otras cosas, usuario y contraseña. En nuestro caso, he definido un archivo llamado config/runtime.exs que me permite definir llaves/valor y extraerlas con Application.fetch_env!.

# config/runtime.exs
config :mqtt_client, :options,
    host: System.fetch_env!("MQTT_HOST"),
    username: System.fetch_env!("MQTT_USER"),
    password: System.fetch_env!("MQTT_PASSWORD")

Ahora, miremos el handle_continue y por qué decidimos hacer lo que hacemos ahí de forma asincrónica:

@impl true
def handle_continue(:startMQTT, state) do
  topic = "hello/world"

  {:ok, _} = :emqtt.connect(state.emqtt_pid)

  # Nos suscribimos al tema `topic` y le especificamos un QoS 2
  {:ok, _prop, _reasonCodes} = :emqtt.subscribe(state.emqtt_pid, {topic, 2})

  {:noreply, state}
end
  1. Por si no lo han notado ya 😛, decidimos hacer el proceso de conexión de forma async, para que no tengamos que esperar a que la librería MQTT se conecte al broker, y tampoco tengamos que esperar a que la librería pueda suscribirse al tema hello/world a la hora de inicializar el GenServer. Imagina si tenemos 1_000 procesos corriendo y todos estén bloqueados esperando a que la librería se conecte y se suscriba ¡No hay necesidad! y más porque si no se ha suscrito, pues la librería no va a emitir mensajes y no va a pasar nada 😅.

  2. Vemos que el estado (state), es el segundo argumento de la respuesta de un método del GenServer, por lo que nos permite pasar información entre funciones sin problema ¡Recordemos que Todo en Elixir es inmutable! y es por eso que podemos estar seguros que emqtt_pid es el pid que obtuvimos en init.

Give me the DATA!

Por último ¡Los datos! 🐉

Cuando el broker MQTT manda los datos que llegan a hello/world, este nos llega en una estructura tipo Map, donde es una combinación de varias propiedades o data types.

Basicamente algo así:

%{
  client_pid: #PID<0.228.0>,
  dup: false,
  packet_id: :undefined,
  payload: "1.5294297312678025",
  properties: %{},
  qos: 0,
  retain: false,
  topic: "hello/world",
  via: #Port<0.13>

}

El payload o datos siempre serán de tipo iodata, que es un tipo de datos de Erlang (y por extensión Elixir), que evalua o a un Binary string (un string), o a otros elementos binarios.

Pégale un detalle a esta lista si quieres saber un poco más qué es iodata https://www.erlang.org/doc/reference_manual/typespec#builtin_types.

por lo que podemos hacer algo así:

@impl true
def handle_info({:publish, publish_packet}, state) do
  publish_properties = publish_packet.propertes # o también publish_packet[:properties] (Sólo en MQTT v5).

  IO.puts("Topic: #{publish_packet.topic} - Payload: #{publish_packet.payload}")

  {:noreply, state}
end

¡Simple y sencillo! 🎉

Correrlo sólo es cuestión de mix run --no-halt 😁 y ver la magia.

Conclusión

Fue un poco extraño entender cómo funcionaba la libería cuando estaba tratando de implementar mi propio cliente MQTT con Elixir y :emqtt, pero sin dudas aprendí mucho en el proceso. Además, después de desglosar todo el código en pedazos más digeribles, podemos ver que la estregia de GenServer es muy robusta y nos permite realizar una aplicación reactiva a eventos relativamente fácil.

Luego, en otro post, les contaré cómo integré Ecto para guardar los datos que recibía en un tema en una base de datos SQLite o PostgreSQL sin muchas complicaciones, permitiendo conocer una vez más lo fácil que es utilizar Elixir.

Espero que les haya gustado y aprendido un poco ¡Chau! 🍻