Hush – Rev 2
?pathlinks?
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Hush.Communication;
using ProtoBuf;
using WingMan.Communication;
namespace Hush.Chat
{
public class ChatMessageSynchronizer : IDisposable
{
public delegate void MessageReceived(object sender, ChatMessageReceivedEventArgs e);
public ChatMessageSynchronizer(string topic, MqttCommunication mqttCommunication, TaskScheduler taskScheduler,
CancellationToken cancellationToken)
{
Topic = topic;
MqttCommunication = mqttCommunication;
CancellationToken = cancellationToken;
TaskScheduler = taskScheduler;
mqttCommunication.OnMessageReceived += MqttCommunicationOnOnMessageReceived;
}
private string Topic { get; }
private MqttCommunication MqttCommunication { get; }
private CancellationToken CancellationToken { get; }
private TaskScheduler TaskScheduler { get; }
public void Dispose()
{
MqttCommunication.OnMessageReceived -= MqttCommunicationOnOnMessageReceived;
}
public event MessageReceived OnMessageReceived;
private async void MqttCommunicationOnOnMessageReceived(object sender,
MqttCommunicationMessageReceivedEventArgs e)
{
if (e.Topic != Topic)
return;
using (var memoryStream = new MemoryStream())
{
await e.PayloadStream.CopyToAsync(memoryStream);
memoryStream.Position = 0L;
var lobbyMessage = Serializer.Deserialize<ChatMessage>(memoryStream);
await Task.Delay(0, CancellationToken)
.ContinueWith(
_ => OnMessageReceived?.Invoke(sender,
new ChatMessageReceivedEventArgs(lobbyMessage.Nick, lobbyMessage.Data)),
CancellationToken, TaskContinuationOptions.None, TaskScheduler);
}
}
public async Task Broadcast(string message)
{
using (var memoryStream = new MemoryStream())
{
Serializer.Serialize(memoryStream, new ChatMessage(MqttCommunication.Nick, message));
memoryStream.Position = 0L;
await MqttCommunication.Broadcast(Topic, memoryStream.ToArray());
}
}
}
}
Generated by GNU Enscript 1.6.5.90.