Hush – Blame information for rev 2
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | using System; |
2 | using System.IO; |
||
3 | using System.Threading; |
||
4 | using System.Threading.Tasks; |
||
5 | using Hush.Communication; |
||
6 | using ProtoBuf; |
||
7 | using WingMan.Communication; |
||
8 | |||
2 | office | 9 | namespace Hush.Chat |
1 | office | 10 | { |
2 | office | 11 | public class ChatMessageSynchronizer : IDisposable |
1 | office | 12 | { |
2 | office | 13 | public delegate void MessageReceived(object sender, ChatMessageReceivedEventArgs e); |
1 | office | 14 | |
2 | office | 15 | public ChatMessageSynchronizer(string topic, MqttCommunication mqttCommunication, TaskScheduler taskScheduler, |
1 | office | 16 | CancellationToken cancellationToken) |
17 | { |
||
2 | office | 18 | Topic = topic; |
1 | office | 19 | MqttCommunication = mqttCommunication; |
20 | CancellationToken = cancellationToken; |
||
21 | TaskScheduler = taskScheduler; |
||
22 | |||
23 | mqttCommunication.OnMessageReceived += MqttCommunicationOnOnMessageReceived; |
||
24 | } |
||
25 | |||
2 | office | 26 | private string Topic { get; } |
27 | |||
1 | office | 28 | private MqttCommunication MqttCommunication { get; } |
29 | |||
30 | private CancellationToken CancellationToken { get; } |
||
31 | private TaskScheduler TaskScheduler { get; } |
||
32 | |||
33 | public void Dispose() |
||
34 | { |
||
35 | MqttCommunication.OnMessageReceived -= MqttCommunicationOnOnMessageReceived; |
||
36 | } |
||
37 | |||
2 | office | 38 | public event MessageReceived OnMessageReceived; |
1 | office | 39 | |
40 | private async void MqttCommunicationOnOnMessageReceived(object sender, |
||
41 | MqttCommunicationMessageReceivedEventArgs e) |
||
42 | { |
||
2 | office | 43 | if (e.Topic != Topic) |
1 | office | 44 | return; |
45 | |||
46 | using (var memoryStream = new MemoryStream()) |
||
47 | { |
||
48 | await e.PayloadStream.CopyToAsync(memoryStream); |
||
49 | |||
50 | memoryStream.Position = 0L; |
||
51 | |||
2 | office | 52 | var lobbyMessage = Serializer.Deserialize<ChatMessage>(memoryStream); |
1 | office | 53 | |
54 | await Task.Delay(0, CancellationToken) |
||
55 | .ContinueWith( |
||
2 | office | 56 | _ => OnMessageReceived?.Invoke(sender, |
57 | new ChatMessageReceivedEventArgs(lobbyMessage.Nick, lobbyMessage.Data)), |
||
1 | office | 58 | CancellationToken, TaskContinuationOptions.None, TaskScheduler); |
59 | } |
||
60 | } |
||
61 | |||
62 | public async Task Broadcast(string message) |
||
63 | { |
||
64 | using (var memoryStream = new MemoryStream()) |
||
65 | { |
||
2 | office | 66 | Serializer.Serialize(memoryStream, new ChatMessage(MqttCommunication.Nick, message)); |
1 | office | 67 | |
68 | memoryStream.Position = 0L; |
||
69 | |||
2 | office | 70 | await MqttCommunication.Broadcast(Topic, memoryStream.ToArray()); |
1 | office | 71 | } |
72 | } |
||
73 | } |
||
74 | } |