Hush – Blame information for rev 2

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 using System;
2 using System.IO;
3 using System.Threading;
4 using System.Threading.Tasks;
5 using Hush.Communication;
6 using ProtoBuf;
7 using WingMan.Communication;
8  
2 office 9 namespace Hush.Chat
1 office 10 {
2 office 11 public class ChatMessageSynchronizer : IDisposable
1 office 12 {
2 office 13 public delegate void MessageReceived(object sender, ChatMessageReceivedEventArgs e);
1 office 14  
2 office 15 public ChatMessageSynchronizer(string topic, MqttCommunication mqttCommunication, TaskScheduler taskScheduler,
1 office 16 CancellationToken cancellationToken)
17 {
2 office 18 Topic = topic;
1 office 19 MqttCommunication = mqttCommunication;
20 CancellationToken = cancellationToken;
21 TaskScheduler = taskScheduler;
22  
23 mqttCommunication.OnMessageReceived += MqttCommunicationOnOnMessageReceived;
24 }
25  
2 office 26 private string Topic { get; }
27  
1 office 28 private MqttCommunication MqttCommunication { get; }
29  
30 private CancellationToken CancellationToken { get; }
31 private TaskScheduler TaskScheduler { get; }
32  
33 public void Dispose()
34 {
35 MqttCommunication.OnMessageReceived -= MqttCommunicationOnOnMessageReceived;
36 }
37  
2 office 38 public event MessageReceived OnMessageReceived;
1 office 39  
40 private async void MqttCommunicationOnOnMessageReceived(object sender,
41 MqttCommunicationMessageReceivedEventArgs e)
42 {
2 office 43 if (e.Topic != Topic)
1 office 44 return;
45  
46 using (var memoryStream = new MemoryStream())
47 {
48 await e.PayloadStream.CopyToAsync(memoryStream);
49  
50 memoryStream.Position = 0L;
51  
2 office 52 var lobbyMessage = Serializer.Deserialize<ChatMessage>(memoryStream);
1 office 53  
54 await Task.Delay(0, CancellationToken)
55 .ContinueWith(
2 office 56 _ => OnMessageReceived?.Invoke(sender,
57 new ChatMessageReceivedEventArgs(lobbyMessage.Nick, lobbyMessage.Data)),
1 office 58 CancellationToken, TaskContinuationOptions.None, TaskScheduler);
59 }
60 }
61  
62 public async Task Broadcast(string message)
63 {
64 using (var memoryStream = new MemoryStream())
65 {
2 office 66 Serializer.Serialize(memoryStream, new ChatMessage(MqttCommunication.Nick, message));
1 office 67  
68 memoryStream.Position = 0L;
69  
2 office 70 await MqttCommunication.Broadcast(Topic, memoryStream.ToArray());
1 office 71 }
72 }
73 }
74 }