mirror of
https://github.com/ckaczor/HomeMonitor.git
synced 2026-01-14 01:25:38 -05:00
Initial setup of environment service
This commit is contained in:
68
Environment/Service/MessageHandler.cs
Normal file
68
Environment/Service/MessageHandler.cs
Normal file
@@ -0,0 +1,68 @@
|
||||
using System.Text.Json;
|
||||
using ChrisKaczor.HomeMonitor.Environment.Service.Data;
|
||||
using MQTTnet;
|
||||
using MQTTnet.Client;
|
||||
|
||||
namespace ChrisKaczor.HomeMonitor.Environment.Service;
|
||||
|
||||
public class MessageHandler : IHostedService
|
||||
{
|
||||
private readonly IConfiguration _configuration;
|
||||
private readonly Database _database;
|
||||
private readonly IMqttClient _mqttClient;
|
||||
|
||||
private readonly MqttFactory _mqttFactory;
|
||||
private readonly string _topic;
|
||||
|
||||
public MessageHandler(IConfiguration configuration, Database database)
|
||||
{
|
||||
_configuration = configuration;
|
||||
_database = database;
|
||||
|
||||
_database.EnsureDatabase();
|
||||
|
||||
_topic = _configuration["Mqtt:Topic"] ?? string.Empty;
|
||||
|
||||
if (string.IsNullOrEmpty(_topic))
|
||||
throw new InvalidOperationException("Topic not set");
|
||||
|
||||
_mqttFactory = new MqttFactory();
|
||||
_mqttClient = _mqttFactory.CreateMqttClient();
|
||||
|
||||
_mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync;
|
||||
}
|
||||
|
||||
public async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:Server"]).Build();
|
||||
await _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
|
||||
|
||||
var mqttSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder().WithTopicFilter($"{_topic}/#").Build();
|
||||
await _mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _mqttClient.DisconnectAsync(new MqttClientDisconnectOptionsBuilder().Build(), CancellationToken.None);
|
||||
}
|
||||
|
||||
private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
|
||||
{
|
||||
var topic = arg.ApplicationMessage.Topic;
|
||||
var payload = arg.ApplicationMessage.ConvertPayloadToString();
|
||||
|
||||
WriteLog($"Topic: {topic} = {payload}");
|
||||
|
||||
var message = JsonSerializer.Deserialize<Message>(payload);
|
||||
|
||||
if (message == null)
|
||||
return;
|
||||
|
||||
await _database.StoreMessageAsync(message);
|
||||
}
|
||||
|
||||
private static void WriteLog(string message)
|
||||
{
|
||||
Console.WriteLine(message);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user