From b14974e5fe792380a5074b8dbc924e52d7facdf4 Mon Sep 17 00:00:00 2001 From: Chris Kaczor Date: Sun, 7 Jan 2024 13:14:00 -0500 Subject: [PATCH] Updates for separate MQTT server --- DeviceStatus/Sender/sender.py | 2 +- DeviceStatus/Service/MessageHandler.cs | 50 ++++++++++-------- .../Service/appsettings.Development.json | 3 ++ DeviceStatus/Service/appsettings.json | 3 ++ DeviceStatus/Service/deploy/manifest.yaml | 12 ----- Mosquitto/mosquitto.yaml | 51 +++++++++++++++++++ 6 files changed, 88 insertions(+), 33 deletions(-) create mode 100644 Mosquitto/mosquitto.yaml diff --git a/DeviceStatus/Sender/sender.py b/DeviceStatus/Sender/sender.py index 260dd6d..659e4b0 100755 --- a/DeviceStatus/Sender/sender.py +++ b/DeviceStatus/Sender/sender.py @@ -64,7 +64,7 @@ while True: device.last_status = pin_status - info = client.publish(device.name, str(pin_status), 1) + info = client.publish("device-status/" % (device.name), str(pin_status), 1) info.wait_for_publish() diff --git a/DeviceStatus/Service/MessageHandler.cs b/DeviceStatus/Service/MessageHandler.cs index 0f8aa11..f668385 100644 --- a/DeviceStatus/Service/MessageHandler.cs +++ b/DeviceStatus/Service/MessageHandler.cs @@ -1,13 +1,13 @@ using Microsoft.AspNetCore.SignalR.Client; using MQTTnet; -using MQTTnet.Server; +using MQTTnet.Client; using System.Text.Json; namespace Service; public class MessageHandler : IHostedService { - private MqttServer? _mqttServer; + private IMqttClient? _mqttClient; private HubConnection? _hubConnection; private readonly IConfiguration _configuration; @@ -37,26 +37,25 @@ public class MessageHandler : IHostedService var mqttFactory = new MqttFactory(); - var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build(); + _mqttClient = mqttFactory.CreateMqttClient(); + var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:Server"]).Build(); - _mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions); - _mqttServer.InterceptingPublishAsync += OnInterceptingPublishAsync; + _mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync; - await _mqttServer.StartAsync(); + await _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None); + + var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder() + .WithTopicFilter( + f => + { + f.WithTopic("device-status/#"); + }) + .Build(); + + await _mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None); } - private async Task RequestLatestStatus() - { - WriteLog("RequestLatestStatus"); - - foreach (var device in _deviceRepository.Values) - { - WriteLog($"RequestLatestStatus: {device.Name}"); - await SendDeviceStatus(device); - } - } - - private async Task OnInterceptingPublishAsync(InterceptingPublishEventArgs arg) + private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { var topic = arg.ApplicationMessage.Topic; var payload = arg.ApplicationMessage.ConvertPayloadToString(); @@ -82,6 +81,17 @@ public class MessageHandler : IHostedService } } + private async Task RequestLatestStatus() + { + WriteLog("RequestLatestStatus"); + + foreach (var device in _deviceRepository.Values) + { + WriteLog($"RequestLatestStatus: {device.Name}"); + await SendDeviceStatus(device); + } + } + private async void OnDeviceTimer(object? state) { var device = (Device)state!; @@ -135,8 +145,8 @@ public class MessageHandler : IHostedService if (_hubConnection != null) await _hubConnection.StopAsync(cancellationToken); - if (_mqttServer != null) - await _mqttServer.StopAsync(); + if (_mqttClient != null) + await _mqttClient.DisconnectAsync(new MqttClientDisconnectOptionsBuilder().Build(), CancellationToken.None); } private static void WriteLog(string message) diff --git a/DeviceStatus/Service/appsettings.Development.json b/DeviceStatus/Service/appsettings.Development.json index 0235418..928518a 100644 --- a/DeviceStatus/Service/appsettings.Development.json +++ b/DeviceStatus/Service/appsettings.Development.json @@ -11,5 +11,8 @@ "Telegram": { "BotToken": "", "ChatId": "" + }, + "Mqtt": { + "Server": "172.23.10.3" } } diff --git a/DeviceStatus/Service/appsettings.json b/DeviceStatus/Service/appsettings.json index d3f854c..c4f0575 100644 --- a/DeviceStatus/Service/appsettings.json +++ b/DeviceStatus/Service/appsettings.json @@ -15,5 +15,8 @@ }, "DeviceStatus": { "DelayTime": "00:01:00" + }, + "Mqtt": { + "Server": "mosquitto" } } diff --git a/DeviceStatus/Service/deploy/manifest.yaml b/DeviceStatus/Service/deploy/manifest.yaml index 220d1c8..cb1a578 100644 --- a/DeviceStatus/Service/deploy/manifest.yaml +++ b/DeviceStatus/Service/deploy/manifest.yaml @@ -56,18 +56,6 @@ spec: app: device-status-service type: ClusterIP --- -kind: Service -apiVersion: v1 -metadata: - name: device-status-service-mqtt -spec: - ports: - - name: client - port: 1883 - selector: - app: device-status-service - type: LoadBalancer ---- apiVersion: traefik.containo.us/v1alpha1 kind: IngressRoute metadata: diff --git a/Mosquitto/mosquitto.yaml b/Mosquitto/mosquitto.yaml new file mode 100644 index 0000000..ac29793 --- /dev/null +++ b/Mosquitto/mosquitto.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mosquitto +spec: + replicas: 1 + selector: + matchLabels: + app: mosquitto + template: + metadata: + labels: + app: mosquitto + spec: + containers: + - name: mosquitto + image: eclipse-mosquitto + ports: + - containerPort: 1883 + - containerPort: 9001 + volumeMounts: + - mountPath: /mosquitto/config/mosquitto.conf + subPath: mosquitto.conf + name: config + volumes: + - name: config + configMap: + name: mosquitto-config +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: mosquitto-config +data: + mosquitto.conf: | + # DO NOT USE IN PRODUCTION + allow_anonymous true + + listener 1883 + protocol mqtt +--- +apiVersion: v1 +kind: Service +metadata: + name: mosquitto +spec: + type: LoadBalancer + selector: + app: mosquitto + ports: + - port: 1883 \ No newline at end of file