Updates for separate MQTT server

This commit is contained in:
2024-01-07 13:14:00 -05:00
parent 534c38ec82
commit b14974e5fe
6 changed files with 88 additions and 33 deletions

View File

@@ -1,13 +1,13 @@
using Microsoft.AspNetCore.SignalR.Client;
using MQTTnet;
using MQTTnet.Server;
using MQTTnet.Client;
using System.Text.Json;
namespace Service;
public class MessageHandler : IHostedService
{
private MqttServer? _mqttServer;
private IMqttClient? _mqttClient;
private HubConnection? _hubConnection;
private readonly IConfiguration _configuration;
@@ -37,26 +37,25 @@ public class MessageHandler : IHostedService
var mqttFactory = new MqttFactory();
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();
_mqttClient = mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:Server"]).Build();
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
_mqttServer.InterceptingPublishAsync += OnInterceptingPublishAsync;
_mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync;
await _mqttServer.StartAsync();
await _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("device-status/#");
})
.Build();
await _mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
}
private async Task RequestLatestStatus()
{
WriteLog("RequestLatestStatus");
foreach (var device in _deviceRepository.Values)
{
WriteLog($"RequestLatestStatus: {device.Name}");
await SendDeviceStatus(device);
}
}
private async Task OnInterceptingPublishAsync(InterceptingPublishEventArgs arg)
private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
var topic = arg.ApplicationMessage.Topic;
var payload = arg.ApplicationMessage.ConvertPayloadToString();
@@ -82,6 +81,17 @@ public class MessageHandler : IHostedService
}
}
private async Task RequestLatestStatus()
{
WriteLog("RequestLatestStatus");
foreach (var device in _deviceRepository.Values)
{
WriteLog($"RequestLatestStatus: {device.Name}");
await SendDeviceStatus(device);
}
}
private async void OnDeviceTimer(object? state)
{
var device = (Device)state!;
@@ -135,8 +145,8 @@ public class MessageHandler : IHostedService
if (_hubConnection != null)
await _hubConnection.StopAsync(cancellationToken);
if (_mqttServer != null)
await _mqttServer.StopAsync();
if (_mqttClient != null)
await _mqttClient.DisconnectAsync(new MqttClientDisconnectOptionsBuilder().Build(), CancellationToken.None);
}
private static void WriteLog(string message)

View File

@@ -11,5 +11,8 @@
"Telegram": {
"BotToken": "",
"ChatId": ""
},
"Mqtt": {
"Server": "172.23.10.3"
}
}

View File

@@ -15,5 +15,8 @@
},
"DeviceStatus": {
"DelayTime": "00:01:00"
},
"Mqtt": {
"Server": "mosquitto"
}
}

View File

@@ -56,18 +56,6 @@ spec:
app: device-status-service
type: ClusterIP
---
kind: Service
apiVersion: v1
metadata:
name: device-status-service-mqtt
spec:
ports:
- name: client
port: 1883
selector:
app: device-status-service
type: LoadBalancer
---
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata: