clockwerk-opensim – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 vero 1 /*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27  
28 using System;
29 using System.Collections;
30 using System.Threading;
31 using System.Reflection;
32 using log4net;
33 using HttpServer;
34 using OpenSim.Framework;
35 using OpenSim.Framework.Monitoring;
36 using Amib.Threading;
37 using System.IO;
38 using System.Text;
39 using System.Collections.Generic;
40  
41 namespace OpenSim.Framework.Servers.HttpServer
42 {
43 public class PollServiceRequestManager
44 {
45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
46  
47 /// <summary>
48 /// Is the poll service request manager running?
49 /// </summary>
50 /// <remarks>
51 /// Can be running either synchronously or asynchronously
52 /// </remarks>
53 public bool IsRunning { get; private set; }
54  
55 /// <summary>
56 /// Is the poll service performing responses asynchronously (with its own threads) or synchronously (via
57 /// external calls)?
58 /// </summary>
59 public bool PerformResponsesAsync { get; private set; }
60  
61 /// <summary>
62 /// Number of responses actually processed and sent to viewer (or aborted due to error).
63 /// </summary>
64 public int ResponsesProcessed { get; private set; }
65  
66 private readonly BaseHttpServer m_server;
67  
68 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
69 private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>();
70  
71 private uint m_WorkerThreadCount = 0;
72 private Thread[] m_workerThreads;
73  
74 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
75  
76 // private int m_timeout = 1000; // increase timeout 250; now use the event one
77  
78 public PollServiceRequestManager(
79 BaseHttpServer pSrv, bool performResponsesAsync, uint pWorkerThreadCount, int pTimeout)
80 {
81 m_server = pSrv;
82 PerformResponsesAsync = performResponsesAsync;
83 m_WorkerThreadCount = pWorkerThreadCount;
84 m_workerThreads = new Thread[m_WorkerThreadCount];
85  
86 StatsManager.RegisterStat(
87 new Stat(
88 "QueuedPollResponses",
89 "Number of poll responses queued for processing.",
90 "",
91 "",
92 "httpserver",
93 m_server.Port.ToString(),
94 StatType.Pull,
95 MeasuresOfInterest.AverageChangeOverTime,
96 stat => stat.Value = m_requests.Count(),
97 StatVerbosity.Debug));
98  
99 StatsManager.RegisterStat(
100 new Stat(
101 "ProcessedPollResponses",
102 "Number of poll responses processed.",
103 "",
104 "",
105 "httpserver",
106 m_server.Port.ToString(),
107 StatType.Pull,
108 MeasuresOfInterest.AverageChangeOverTime,
109 stat => stat.Value = ResponsesProcessed,
110 StatVerbosity.Debug));
111 }
112  
113 public void Start()
114 {
115 IsRunning = true;
116  
117 if (PerformResponsesAsync)
118 {
119 //startup worker threads
120 for (uint i = 0; i < m_WorkerThreadCount; i++)
121 {
122 m_workerThreads[i]
123 = Watchdog.StartThread(
124 PoolWorkerJob,
125 string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port),
126 ThreadPriority.Normal,
127 false,
128 false,
129 null,
130 int.MaxValue);
131 }
132  
133 Watchdog.StartThread(
134 this.CheckLongPollThreads,
135 string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
136 ThreadPriority.Normal,
137 false,
138 true,
139 null,
140 1000 * 60 * 10);
141 }
142 }
143  
144 private void ReQueueEvent(PollServiceHttpRequest req)
145 {
146 if (IsRunning)
147 {
148 // delay the enqueueing for 100ms. There's no need to have the event
149 // actively on the queue
150 Timer t = new Timer(self => {
151 ((Timer)self).Dispose();
152 m_requests.Enqueue(req);
153 });
154  
155 t.Change(100, Timeout.Infinite);
156  
157 }
158 }
159  
160 public void Enqueue(PollServiceHttpRequest req)
161 {
162 if (IsRunning)
163 {
164 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll)
165 {
166 lock (m_longPollRequests)
167 m_longPollRequests.Add(req);
168 }
169 else
170 m_requests.Enqueue(req);
171 }
172 }
173  
174 private void CheckLongPollThreads()
175 {
176 // The only purpose of this thread is to check the EQs for events.
177 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
178 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
179 // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
180 // so if they aren't ready to be served by a worker thread (no events), they are placed
181 // directly back in the "ready-to-serve" queue by the worker thread.
182 while (IsRunning)
183 {
184 Thread.Sleep(500);
185 Watchdog.UpdateThread();
186  
187 // List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
188 lock (m_longPollRequests)
189 {
190 if (m_longPollRequests.Count > 0 && IsRunning)
191 {
192 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req =>
193 (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ
194 (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout
195 );
196  
197 ready.ForEach(req =>
198 {
199 m_requests.Enqueue(req);
200 m_longPollRequests.Remove(req);
201 });
202  
203 }
204  
205 }
206 }
207 }
208  
209 public void Stop()
210 {
211 IsRunning = false;
212 // m_timeout = -10000; // cause all to expire
213 Thread.Sleep(1000); // let the world move
214  
215 foreach (Thread t in m_workerThreads)
216 Watchdog.AbortThread(t.ManagedThreadId);
217  
218 PollServiceHttpRequest wreq;
219  
220 lock (m_longPollRequests)
221 {
222 if (m_longPollRequests.Count > 0 && IsRunning)
223 m_longPollRequests.ForEach(req => m_requests.Enqueue(req));
224 }
225  
226 while (m_requests.Count() > 0)
227 {
228 try
229 {
230 wreq = m_requests.Dequeue(0);
231 ResponsesProcessed++;
232 wreq.DoHTTPGruntWork(
233 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
234 }
235 catch
236 {
237 }
238 }
239  
240 m_longPollRequests.Clear();
241 m_requests.Clear();
242 }
243  
244 // work threads
245  
246 private void PoolWorkerJob()
247 {
248 while (IsRunning)
249 {
250 Watchdog.UpdateThread();
251 WaitPerformResponse();
252 }
253 }
254  
255 public void WaitPerformResponse()
256 {
257 PollServiceHttpRequest req = m_requests.Dequeue(5000);
258 // m_log.DebugFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
259  
260 if (req != null)
261 {
262 try
263 {
264 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
265 {
266 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
267  
268 if (responsedata == null)
269 return;
270  
271 // This is the event queue.
272 // Even if we're not running we can still perform responses by explicit request.
273 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll
274 || !PerformResponsesAsync)
275 {
276 try
277 {
278 ResponsesProcessed++;
279 req.DoHTTPGruntWork(m_server, responsedata);
280 }
281 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
282 {
283 // Ignore it, no need to reply
284 m_log.Error(e);
285 }
286 }
287 else
288 {
289 m_threadPool.QueueWorkItem(x =>
290 {
291 try
292 {
293 ResponsesProcessed++;
294 req.DoHTTPGruntWork(m_server, responsedata);
295 }
296 catch (ObjectDisposedException e) // Browser aborted before we could read body, server closed the stream
297 {
298 // Ignore it, no need to reply
299 m_log.Error(e);
300 }
301 catch (Exception e)
302 {
303 m_log.Error(e);
304 }
305  
306 return null;
307 }, null);
308 }
309 }
310 else
311 {
312 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
313 {
314 ResponsesProcessed++;
315 req.DoHTTPGruntWork(
316 m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
317 }
318 else
319 {
320 ReQueueEvent(req);
321 }
322 }
323 }
324 catch (Exception e)
325 {
326 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
327 }
328 }
329 }
330 }
331 }