opensim-development – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 eva 1 /*
2 * Copyright (c) Contributors, http://opensimulator.org/
3 * See CONTRIBUTORS.TXT for a full list of copyright holders.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of the OpenSimulator Project nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
17 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
20 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27  
28 using System;
29 using System.Collections;
30 using System.Threading;
31 using System.Reflection;
32 using log4net;
33 using HttpServer;
34 using OpenSim.Framework;
35 using OpenSim.Framework.Monitoring;
36 using Amib.Threading;
37 using System.IO;
38 using System.Text;
39 using System.Collections.Generic;
40  
41 namespace OpenSim.Framework.Servers.HttpServer
42 {
43 public class PollServiceRequestManager
44 {
45 private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
46  
47 private readonly BaseHttpServer m_server;
48  
49 private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
50 private static List<PollServiceHttpRequest> m_longPollRequests = new List<PollServiceHttpRequest>();
51  
52 private uint m_WorkerThreadCount = 0;
53 private Thread[] m_workerThreads;
54  
55 private bool m_running = true;
56  
57 private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
58  
59 // private int m_timeout = 1000; // increase timeout 250; now use the event one
60  
61 public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
62 {
63 m_server = pSrv;
64 m_WorkerThreadCount = pWorkerThreadCount;
65 m_workerThreads = new Thread[m_WorkerThreadCount];
66 }
67  
68 public void Start()
69 {
70 //startup worker threads
71 for (uint i = 0; i < m_WorkerThreadCount; i++)
72 {
73 m_workerThreads[i]
74 = Watchdog.StartThread(
75 PoolWorkerJob,
76 string.Format("PollServiceWorkerThread{0}:{1}", i, m_server.Port),
77 ThreadPriority.Normal,
78 false,
79 false,
80 null,
81 int.MaxValue);
82 }
83  
84 Watchdog.StartThread(
85 this.CheckLongPollThreads,
86 string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
87 ThreadPriority.Normal,
88 false,
89 true,
90 null,
91 1000 * 60 * 10);
92 }
93  
94 private void ReQueueEvent(PollServiceHttpRequest req)
95 {
96 if (m_running)
97 {
98 // delay the enqueueing for 100ms. There's no need to have the event
99 // actively on the queue
100 Timer t = new Timer(self => {
101 ((Timer)self).Dispose();
102 m_requests.Enqueue(req);
103 });
104  
105 t.Change(100, Timeout.Infinite);
106  
107 }
108 }
109  
110 public void Enqueue(PollServiceHttpRequest req)
111 {
112 if (m_running)
113 {
114 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll)
115 {
116 lock (m_longPollRequests)
117 m_longPollRequests.Add(req);
118 }
119 else
120 m_requests.Enqueue(req);
121 }
122 }
123  
124 private void CheckLongPollThreads()
125 {
126 // The only purpose of this thread is to check the EQs for events.
127 // If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
128 // If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
129 // All other types of tasks (Inventory handlers, http-in, etc) don't have the long-poll nature,
130 // so if they aren't ready to be served by a worker thread (no events), they are placed
131 // directly back in the "ready-to-serve" queue by the worker thread.
132 while (m_running)
133 {
134 Thread.Sleep(500);
135 Watchdog.UpdateThread();
136  
137 // List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
138 lock (m_longPollRequests)
139 {
140 if (m_longPollRequests.Count > 0 && m_running)
141 {
142 List<PollServiceHttpRequest> ready = m_longPollRequests.FindAll(req =>
143 (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ
144 (Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout
145 );
146  
147 ready.ForEach(req =>
148 {
149 m_requests.Enqueue(req);
150 m_longPollRequests.Remove(req);
151 });
152  
153 }
154  
155 }
156 }
157 }
158  
159 public void Stop()
160 {
161 m_running = false;
162 // m_timeout = -10000; // cause all to expire
163 Thread.Sleep(1000); // let the world move
164  
165 foreach (Thread t in m_workerThreads)
166 Watchdog.AbortThread(t.ManagedThreadId);
167  
168 PollServiceHttpRequest wreq;
169  
170 lock (m_longPollRequests)
171 {
172 if (m_longPollRequests.Count > 0 && m_running)
173 m_longPollRequests.ForEach(req => m_requests.Enqueue(req));
174 }
175  
176 while (m_requests.Count() > 0)
177 {
178 try
179 {
180 wreq = m_requests.Dequeue(0);
181 wreq.DoHTTPGruntWork(
182 m_server, wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
183 }
184 catch
185 {
186 }
187 }
188  
189 m_longPollRequests.Clear();
190 m_requests.Clear();
191 }
192  
193 // work threads
194  
195 private void PoolWorkerJob()
196 {
197 while (m_running)
198 {
199 PollServiceHttpRequest req = m_requests.Dequeue(5000);
200 //m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
201  
202 Watchdog.UpdateThread();
203 if (req != null)
204 {
205 try
206 {
207 if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
208 {
209 Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
210  
211 if (responsedata == null)
212 continue;
213  
214 if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
215 {
216 try
217 {
218 req.DoHTTPGruntWork(m_server, responsedata);
219 }
220 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
221 {
222 // Ignore it, no need to reply
223 }
224 }
225 else
226 {
227 m_threadPool.QueueWorkItem(x =>
228 {
229 try
230 {
231 req.DoHTTPGruntWork(m_server, responsedata);
232 }
233 catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
234 {
235 // Ignore it, no need to reply
236 }
237  
238 return null;
239 }, null);
240 }
241 }
242 else
243 {
244 if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
245 {
246 req.DoHTTPGruntWork(
247 m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
248 }
249 else
250 {
251 ReQueueEvent(req);
252 }
253 }
254 }
255 catch (Exception e)
256 {
257 m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
258 }
259 }
260 }
261 }
262 }
263 }