diff --git a/Environment/Service/MessageHandler.cs b/Environment/Service/MessageHandler.cs
index 61dabcc..29e361d 100644
--- a/Environment/Service/MessageHandler.cs
+++ b/Environment/Service/MessageHandler.cs
@@ -2,6 +2,8 @@
using ChrisKaczor.HomeMonitor.Environment.Service.Data;
using MQTTnet;
using MQTTnet.Client;
+using Microsoft.AspNetCore.SignalR.Client;
+using System.Threading;
namespace ChrisKaczor.HomeMonitor.Environment.Service;
@@ -10,6 +12,7 @@ public class MessageHandler : IHostedService
private readonly IConfiguration _configuration;
private readonly Database _database;
private readonly IMqttClient _mqttClient;
+ private HubConnection _hubConnection;
private readonly MqttFactory _mqttFactory;
private readonly string _topic;
@@ -30,20 +33,26 @@ public class MessageHandler : IHostedService
_mqttClient = _mqttFactory.CreateMqttClient();
_mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync;
+
+ _hubConnection = new HubConnectionBuilder().WithUrl(configuration["Environment:Hub:Url"] ?? string.Empty).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
+ await _hubConnection.StartAsync(cancellationToken);
+
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:Server"]).Build();
- await _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
+ await _mqttClient.ConnectAsync(mqttClientOptions, cancellationToken);
var mqttSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder().WithTopicFilter($"{_topic}/#").Build();
- await _mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
+ await _mqttClient.SubscribeAsync(mqttSubscribeOptions, cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
- await _mqttClient.DisconnectAsync(new MqttClientDisconnectOptionsBuilder().Build(), CancellationToken.None);
+ await _mqttClient.DisconnectAsync(new MqttClientDisconnectOptionsBuilder().Build(), cancellationToken);
+
+ await _hubConnection.StopAsync(cancellationToken);
}
private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
@@ -59,6 +68,25 @@ public class MessageHandler : IHostedService
return;
await _database.StoreMessageAsync(message);
+
+ await SendLatestReading(message);
+ }
+
+ private async Task SendLatestReading(Message message)
+ {
+ try
+ {
+ if (_hubConnection.State == HubConnectionState.Disconnected)
+ await _hubConnection.StartAsync();
+
+ var json = JsonSerializer.Serialize(message);
+
+ await _hubConnection.InvokeAsync("SendLatestReading", json);
+ }
+ catch (Exception exception)
+ {
+ WriteLog($"Hub exception: {exception}");
+ }
}
private static void WriteLog(string message)
diff --git a/Environment/Service/Service.csproj b/Environment/Service/Service.csproj
index 1d3735f..0729f44 100644
--- a/Environment/Service/Service.csproj
+++ b/Environment/Service/Service.csproj
@@ -23,6 +23,7 @@
+
diff --git a/Environment/Service/appsettings.Development.json b/Environment/Service/appsettings.Development.json
index 196c711..b51e18b 100644
--- a/Environment/Service/appsettings.Development.json
+++ b/Environment/Service/appsettings.Development.json
@@ -13,8 +13,12 @@
"Host": "localhost",
"User": "sa",
"Password": "newpassword",
+ "Port": 1433,
"Name": "Environment",
"TrustServerCertificate": true
+ },
+ "Hub": {
+ "Url": "http://localhost:8080/environment"
}
}
}
diff --git a/Environment/Service/appsettings.json b/Environment/Service/appsettings.json
index 5daebe4..4826422 100644
--- a/Environment/Service/appsettings.json
+++ b/Environment/Service/appsettings.json
@@ -18,6 +18,9 @@
"Name": "Environment",
"TrustServerCertificate": true,
"Port": 1435
+ },
+ "Hub": {
+ "Url": "http://hub-server/environment"
}
}
}