using System.Text.Json; using ChrisKaczor.HomeMonitor.Environment.Service.Data; using MQTTnet; using MQTTnet.Client; namespace ChrisKaczor.HomeMonitor.Environment.Service; public class MessageHandler : IHostedService { private readonly IConfiguration _configuration; private readonly Database _database; private readonly IMqttClient _mqttClient; private readonly MqttFactory _mqttFactory; private readonly string _topic; public MessageHandler(IConfiguration configuration, Database database) { _configuration = configuration; _database = database; _database.EnsureDatabase(); _topic = _configuration["Mqtt:Topic"] ?? string.Empty; if (string.IsNullOrEmpty(_topic)) throw new InvalidOperationException("Topic not set"); _mqttFactory = new MqttFactory(); _mqttClient = _mqttFactory.CreateMqttClient(); _mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceivedAsync; } public async Task StartAsync(CancellationToken cancellationToken) { var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:Server"]).Build(); await _mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None); var mqttSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder().WithTopicFilter($"{_topic}/#").Build(); await _mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None); } public async Task StopAsync(CancellationToken cancellationToken) { await _mqttClient.DisconnectAsync(new MqttClientDisconnectOptionsBuilder().Build(), CancellationToken.None); } private async Task OnApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { var topic = arg.ApplicationMessage.Topic; var payload = arg.ApplicationMessage.ConvertPayloadToString(); WriteLog($"Topic: {topic} = {payload}"); var message = JsonSerializer.Deserialize(payload); if (message == null) return; await _database.StoreMessageAsync(message); } private static void WriteLog(string message) { Console.WriteLine(message); } }