Un pequeño cliente MQTT Con Elixir y GenServer
¡Hola a todos! 👋
Como les expliqué en mi anterior post, estoy aprendiendo Elixir y estoy más que contento con lo versátil y bien robusto que es el lenguaje, y algo que me gusta mucho es que se pueden utilizar módulos Erlang de manera intuitiva y sin ninguna configuración extraña o compleja 😎.
Y gracias a eso, hoy les voy a escribir un poco de cómo implementar un cliente MQTT con una librería
que se llama emqtt
, que precisamente es una librería Erlang y nos va
permitir, con un GenServer, tener un proceso que esté escuchando de forma constante un tópico de
un broker MQTT.
Antes de empezar, la librería que vamos a usar acá requiere una versión específica de Erlang/OTP: 25.x
,
porque hay un issue donde se confirma que no funciona bien
con la última versión de Erlang/OTP.
¿MQTT?
Pero antes ¿Qué es MQTT? 🤔 pues nada más y nada menos que un protocolo M2M (Machine to Machine), ligero y muy robusto que se utiliza muchísimo en aplicaciones IoT, debido a su relativa simplicidad y su forma de controlar el flujo de información.
Los invito a que se peguen una pasada por este excelente recurso que HiveMQ nos ofrece al respecto de MQTT y todas sus características y ventajas: HiveMQ MQTT Essentials.
Lo básico
Bueno, para empezar por lo básico, necesitamos una aplicación con un Supervisor, que es un proceso que se encarga de monitorear otros procesos y se encarga de la gestión de los hijos.
Es importante tener el supervisor, porque este será el encargado de controlar nuestro GenServer, y garantizar que cuando el proceso termine, la aplicación también lo haga.
Debido a que este patrón es casi que estándar a la hora de escribir programas en Elixir, las herramientas base ya incluyen templates para crear el Supervisor por nosotros, por lo que sólo es necesario ejecutar el siguiente comando:
mix new <nombre-del-aplicativo> --sup
Donde --sup
es la opción que le dice a Mix que cree un proyecto con un Supervisor configurado.
¡Ahora sí, manos a la obra!
El GenServer
GenServer hace referencia a Generic Server o servidor genérico, que nos permite tener un proceso principal encargado de recibir y enviar señales o respuestas a otros procesos, de forma sincrónica o asincrónica, dependiendo de cómo el proceso externo se comunique con nosotros.
Veamos los que vamos a necesitar para nuestro GenServer:
defmodule MqttClient do
@docmodule false
use GenServer
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
@impl true
def init(_) do
Process.flag(:trap_exit, true)
# Stuff
{:ok, state, {:continue, :startMQTT}}
end
@impl true
def handle_continue(:startMQTT, state) do
# stuff
{:noreply, state}
end
@impl true
def handle_info({:publish, publish}, state) do
# stuff
{:noreply, state}
end
@impl true
def handle_call(_data, _from, _state), do: {:ok, state}
@impl true
def handle_cast(_data, _state), do: {:noreply, state}
end
En teoría, es un GenServer clásico, pero vemos que la información va a ser manejada vía handle_info
,
porque la librería MQTT que utilizaremos, no tiene noción de un servidor, por lo que no puede usar
cast (llamada asincrónica a un servidor) o call (llamada sincrónica) y lo que haremos entonces es
utilizar la comunicación entre procesos.
Otro handler que también llama la atención es handle_continue
, que simplemente lo vamos a utilizar
para inicializar el cliente MQTT sin bloquear la inicialización del GenServer, porque todo lo que
se hace en init
bloquea el proceso principal hasta que termine con una respuesta.
Ahora, si nos centramos en handle_info
, vamos a notar que espera un mensaje con la estructura {:publish, _}
,
esto es debido al nombre de los paquetes que utiliza MQTT para enviar información: PUBLISH
. Si deseamos
capturar un paquete diferente, como CONNECT
, entonces sólo es crear una nueva firma para handle_info
con el mensaje de estructura {:connect, _}
.
¿Cómo saber si un método es async o no en un GenServer? Pues si retorna una tupla
{:noreply, _}
, significa que es async, de lo contrario, es un proceso que bloquea la ejecución principal.
Comencemos a llenar los métodos poco a poco:
@impl true
def init(_) do
Process.flag(:trap_exit, true)
# Capturamos la configuración desde config/config.exs | config/runtime.exs
emqtt_options = Application.fetch_env!(:mqtt_client, :options)
{:ok, emqtt_pid} = :emqtt.start_link(emqtt_options)
{:ok, %{emqtt_pid: emqtt_pid}, {:continue, :StartMQTT}}
end
En este primer paso, estamos capturando el pid
del proceso que crea la librería :emqtt
y la estamos
asociando a nuestro GenServer para que se puedan comunicar via handle_info
. Después, pasamos el pid
al estado del GenServer y lo delegamos a handle_continue
para que no bloquee el proceso principal.
emqtt_options
es una lista de opciones que nos permite
especificar, entre otras cosas, usuario y contraseña. En nuestro caso, he definido un archivo
llamado config/runtime.exs
que me permite definir llaves/valor y extraerlas con Application.fetch_env!
.
# config/runtime.exs
config :mqtt_client, :options,
host: System.fetch_env!("MQTT_HOST"),
username: System.fetch_env!("MQTT_USER"),
password: System.fetch_env!("MQTT_PASSWORD")
Ahora, miremos el handle_continue
y por qué decidimos hacer lo que hacemos ahí de forma asincrónica:
@impl true
def handle_continue(:startMQTT, state) do
topic = "hello/world"
{:ok, _} = :emqtt.connect(state.emqtt_pid)
# Nos suscribimos al tema `topic` y le especificamos un QoS 2
{:ok, _prop, _reasonCodes} = :emqtt.subscribe(state.emqtt_pid, {topic, 2})
{:noreply, state}
end
-
Por si no lo han notado ya 😛, decidimos hacer el proceso de conexión de forma async, para que no tengamos que esperar a que la librería MQTT se conecte al broker, y tampoco tengamos que esperar a que la librería pueda suscribirse al tema
hello/world
a la hora de inicializar el GenServer. Imagina si tenemos1_000
procesos corriendo y todos estén bloqueados esperando a que la librería se conecte y se suscriba ¡No hay necesidad! y más porque si no se ha suscrito, pues la librería no va a emitir mensajes y no va a pasar nada 😅. -
Vemos que el estado (
state
), es el segundo argumento de la respuesta de un método del GenServer, por lo que nos permite pasar información entre funciones sin problema ¡Recordemos que Todo en Elixir es inmutable! y es por eso que podemos estar seguros queemqtt_pid
es elpid
que obtuvimos eninit
.
Give me the DATA!
Por último ¡Los datos! 🐉
Cuando el broker MQTT manda los datos que llegan a hello/world
, este nos llega en una estructura tipo Map,
donde es una combinación de varias propiedades o data types.
Basicamente algo así:
%{
client_pid: #PID<0.228.0>,
dup: false,
packet_id: :undefined,
payload: "1.5294297312678025",
properties: %{},
qos: 0,
retain: false,
topic: "hello/world",
via: #Port<0.13>
}
El payload o datos siempre serán de tipo iodata, que es un tipo de datos de Erlang (y por extensión Elixir), que evalua o a un Binary string (un string), o a otros elementos binarios.
Pégale un detalle a esta lista si quieres saber un poco más qué es iodata https://www.erlang.org/doc/reference_manual/typespec#builtin_types.
por lo que podemos hacer algo así:
@impl true
def handle_info({:publish, publish_packet}, state) do
publish_properties = publish_packet.propertes # o también publish_packet[:properties] (Sólo en MQTT v5).
IO.puts("Topic: #{publish_packet.topic} - Payload: #{publish_packet.payload}")
{:noreply, state}
end
¡Simple y sencillo! 🎉
Correrlo sólo es cuestión de mix run --no-halt
😁 y ver la magia.
Conclusión
Fue un poco extraño entender cómo funcionaba la libería cuando estaba tratando de implementar mi propio
cliente MQTT con Elixir y :emqtt
, pero sin dudas aprendí mucho en el proceso. Además, después de desglosar
todo el código en pedazos más digeribles, podemos ver que la estregia de GenServer es muy robusta y
nos permite realizar una aplicación reactiva a eventos relativamente fácil.
Luego, en otro post, les contaré cómo integré Ecto para guardar los datos que recibía en un tema en una base de datos SQLite o PostgreSQL sin muchas complicaciones, permitiendo conocer una vez más lo fácil que es utilizar Elixir.
Espero que les haya gustado y aprendido un poco ¡Chau! 🍻