clockwerk-opensim – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 vero 1 /*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27  
28 using System;
29 using System.Collections.Concurrent;
30 using System.Reflection;
31 using System.Threading;
32 using log4net;
33 using OpenSim.Framework;
34 using OpenSim.Framework.Monitoring;
35 using OpenSim.Region.Framework.Scenes;
36  
37 namespace OpenSim.Region.ClientStack.LindenUDP
38 {
39 public struct RefillRequest
40 {
41 public LLUDPClient Client;
42 public ThrottleOutPacketTypeFlags Categories;
43  
44 public RefillRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories)
45 {
46 Client = client;
47 Categories = categories;
48 }
49 }
50  
51 public class OutgoingQueueRefillEngine
52 {
53 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
54  
55 public bool IsRunning { get; private set; }
56  
57 /// <summary>
58 /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping.
59 /// </summary>
60 public int RequestProcessTimeoutOnStop { get; set; }
61  
62 /// <summary>
63 /// Controls whether we need to warn in the log about exceeding the max queue size.
64 /// </summary>
65 /// <remarks>
66 /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in
67 /// order to avoid spamming the log with lots of warnings.
68 /// </remarks>
69 private bool m_warnOverMaxQueue = true;
70  
71 private BlockingCollection<RefillRequest> m_requestQueue;
72  
73 private CancellationTokenSource m_cancelSource = new CancellationTokenSource();
74  
75 private LLUDPServer m_udpServer;
76  
77 private Stat m_oqreRequestsWaitingStat;
78  
79 /// <summary>
80 /// Used to signal that we are ready to complete stop.
81 /// </summary>
82 private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
83  
84 public OutgoingQueueRefillEngine(LLUDPServer server)
85 {
86 RequestProcessTimeoutOnStop = 5000;
87 m_udpServer = server;
88  
89 MainConsole.Instance.Commands.AddCommand(
90 "Debug",
91 false,
92 "debug lludp oqre",
93 "debug lludp oqre <start|stop|status>",
94 "Start, stop or get status of OutgoingQueueRefillEngine.",
95 "If stopped then refill requests are processed directly via the threadpool.",
96 HandleOqreCommand);
97 }
98  
99 public void Start()
100 {
101 lock (this)
102 {
103 if (IsRunning)
104 return;
105  
106 IsRunning = true;
107  
108 m_finishedProcessingAfterStop.Reset();
109  
110 m_requestQueue = new BlockingCollection<RefillRequest>(new ConcurrentQueue<RefillRequest>(), 5000);
111  
112 m_oqreRequestsWaitingStat =
113 new Stat(
114 "OQRERequestsWaiting",
115 "Number of outgong queue refill requests waiting for processing.",
116 "",
117 "",
118 "clientstack",
119 m_udpServer.Scene.Name,
120 StatType.Pull,
121 MeasuresOfInterest.None,
122 stat => stat.Value = m_requestQueue.Count,
123 StatVerbosity.Debug);
124  
125 StatsManager.RegisterStat(m_oqreRequestsWaitingStat);
126  
127 Watchdog.StartThread(
128 ProcessRequests,
129 String.Format("OutgoingQueueRefillEngineThread ({0})", m_udpServer.Scene.Name),
130 ThreadPriority.Normal,
131 false,
132 true,
133 null,
134 int.MaxValue);
135 }
136 }
137  
138 public void Stop()
139 {
140 lock (this)
141 {
142 try
143 {
144 if (!IsRunning)
145 return;
146  
147 IsRunning = false;
148  
149 int requestsLeft = m_requestQueue.Count;
150  
151 if (requestsLeft <= 0)
152 {
153 m_cancelSource.Cancel();
154 }
155 else
156 {
157 m_log.InfoFormat("[OUTGOING QUEUE REFILL ENGINE]: Waiting to write {0} events after stop.", requestsLeft);
158  
159 while (requestsLeft > 0)
160 {
161 if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
162 {
163 // After timeout no events have been written
164 if (requestsLeft == m_requestQueue.Count)
165 {
166 m_log.WarnFormat(
167 "[OUTGOING QUEUE REFILL ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests",
168 RequestProcessTimeoutOnStop, requestsLeft);
169  
170 break;
171 }
172 }
173  
174 requestsLeft = m_requestQueue.Count;
175 }
176 }
177 }
178 finally
179 {
180 m_cancelSource.Dispose();
181 StatsManager.DeregisterStat(m_oqreRequestsWaitingStat);
182 m_oqreRequestsWaitingStat = null;
183 m_requestQueue = null;
184 }
185 }
186 }
187  
188 public bool QueueRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories)
189 {
190 if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
191 {
192 // m_log.DebugFormat(
193 // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
194 // categories, client.AgentID, m_udpServer.Scene.Name);
195  
196 m_requestQueue.Add(new RefillRequest(client, categories));
197  
198 if (!m_warnOverMaxQueue)
199 m_warnOverMaxQueue = true;
200  
201 return true;
202 }
203 else
204 {
205 if (m_warnOverMaxQueue)
206 {
207 m_log.WarnFormat(
208 "[OUTGOING QUEUE REFILL ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}",
209 client.AgentID, m_udpServer.Scene.Name);
210  
211 m_warnOverMaxQueue = false;
212 }
213  
214 return false;
215 }
216 }
217  
218 private void ProcessRequests()
219 {
220 try
221 {
222 while (IsRunning || m_requestQueue.Count > 0)
223 {
224 RefillRequest req = m_requestQueue.Take(m_cancelSource.Token);
225  
226 // QueueEmpty callback = req.Client.OnQueueEmpty;
227 //
228 // if (callback != null)
229 // {
230 // try
231 // {
232 // callback(req.Categories);
233 // }
234 // catch (Exception e)
235 // {
236 // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e);
237 // }
238 // }
239  
240 req.Client.FireQueueEmpty(req.Categories);
241 }
242 }
243 catch (OperationCanceledException)
244 {
245 }
246  
247 m_finishedProcessingAfterStop.Set();
248 }
249  
250 private void HandleOqreCommand(string module, string[] args)
251 {
252 if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
253 return;
254  
255 if (args.Length != 4)
256 {
257 MainConsole.Instance.Output("Usage: debug lludp oqre <stop|start|status>");
258 return;
259 }
260  
261 string subCommand = args[3];
262  
263 if (subCommand == "stop")
264 {
265 Stop();
266 MainConsole.Instance.OutputFormat("Stopped OQRE for {0}", m_udpServer.Scene.Name);
267 }
268 else if (subCommand == "start")
269 {
270 Start();
271 MainConsole.Instance.OutputFormat("Started OQRE for {0}", m_udpServer.Scene.Name);
272 }
273 else if (subCommand == "status")
274 {
275 MainConsole.Instance.OutputFormat("OQRE in {0}", m_udpServer.Scene.Name);
276 MainConsole.Instance.OutputFormat("Running: {0}", IsRunning);
277 MainConsole.Instance.OutputFormat(
278 "Requests waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a");
279 }
280 else
281 {
282 MainConsole.Instance.OutputFormat("Unrecognized OQRE subcommand {0}", subCommand);
283 }
284 }
285 }
286 }