opensim-development – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | eva | 1 | using System; |
2 | using System.Threading; |
||
3 | using System.Runtime.CompilerServices; |
||
4 | using System.Diagnostics; |
||
5 | |||
6 | namespace Amib.Threading.Internal |
||
7 | { |
||
8 | |||
9 | #region WorkItemsGroup class |
||
10 | |||
11 | /// <summary> |
||
12 | /// Summary description for WorkItemsGroup. |
||
13 | /// </summary> |
||
14 | public class WorkItemsGroup : WorkItemsGroupBase |
||
15 | { |
||
16 | #region Private members |
||
17 | |||
18 | private readonly object _lock = new object(); |
||
19 | |||
20 | /// <summary> |
||
21 | /// A reference to the SmartThreadPool instance that created this |
||
22 | /// WorkItemsGroup. |
||
23 | /// </summary> |
||
24 | private readonly SmartThreadPool _stp; |
||
25 | |||
26 | /// <summary> |
||
27 | /// The OnIdle event |
||
28 | /// </summary> |
||
29 | private event WorkItemsGroupIdleHandler _onIdle; |
||
30 | |||
31 | /// <summary> |
||
32 | /// A flag to indicate if the Work Items Group is now suspended. |
||
33 | /// </summary> |
||
34 | private bool _isSuspended; |
||
35 | |||
36 | /// <summary> |
||
37 | /// Defines how many work items of this WorkItemsGroup can run at once. |
||
38 | /// </summary> |
||
39 | private int _concurrency; |
||
40 | |||
41 | /// <summary> |
||
42 | /// Priority queue to hold work items before they are passed |
||
43 | /// to the SmartThreadPool. |
||
44 | /// </summary> |
||
45 | private readonly PriorityQueue _workItemsQueue; |
||
46 | |||
47 | /// <summary> |
||
48 | /// Indicate how many work items are waiting in the SmartThreadPool |
||
49 | /// queue. |
||
50 | /// This value is used to apply the concurrency. |
||
51 | /// </summary> |
||
52 | private int _workItemsInStpQueue; |
||
53 | |||
54 | /// <summary> |
||
55 | /// Indicate how many work items are currently running in the SmartThreadPool. |
||
56 | /// This value is used with the Cancel, to calculate if we can send new |
||
57 | /// work items to the STP. |
||
58 | /// </summary> |
||
59 | private int _workItemsExecutingInStp = 0; |
||
60 | |||
61 | /// <summary> |
||
62 | /// WorkItemsGroup start information |
||
63 | /// </summary> |
||
64 | private readonly WIGStartInfo _workItemsGroupStartInfo; |
||
65 | |||
66 | /// <summary> |
||
67 | /// Signaled when all of the WorkItemsGroup's work item completed. |
||
68 | /// </summary> |
||
69 | //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); |
||
70 | private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); |
||
71 | |||
72 | /// <summary> |
||
73 | /// A common object for all the work items that this work items group |
||
74 | /// generate so we can mark them to cancel in O(1) |
||
75 | /// </summary> |
||
76 | private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); |
||
77 | |||
78 | #endregion |
||
79 | |||
80 | #region Construction |
||
81 | |||
82 | public WorkItemsGroup( |
||
83 | SmartThreadPool stp, |
||
84 | int concurrency, |
||
85 | WIGStartInfo wigStartInfo) |
||
86 | { |
||
87 | if (concurrency <= 0) |
||
88 | { |
||
89 | throw new ArgumentOutOfRangeException( |
||
90 | "concurrency", |
||
91 | #if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE) |
||
92 | concurrency, |
||
93 | #endif |
||
94 | "concurrency must be greater than zero"); |
||
95 | } |
||
96 | _stp = stp; |
||
97 | _concurrency = concurrency; |
||
98 | _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly(); |
||
99 | _workItemsQueue = new PriorityQueue(); |
||
100 | Name = "WorkItemsGroup"; |
||
101 | |||
102 | // The _workItemsInStpQueue gets the number of currently executing work items, |
||
103 | // because once a work item is executing, it cannot be cancelled. |
||
104 | _workItemsInStpQueue = _workItemsExecutingInStp; |
||
105 | |||
106 | _isSuspended = _workItemsGroupStartInfo.StartSuspended; |
||
107 | } |
||
108 | |||
109 | #endregion |
||
110 | |||
111 | #region WorkItemsGroupBase Overrides |
||
112 | |||
113 | public override int Concurrency |
||
114 | { |
||
115 | get { return _concurrency; } |
||
116 | set |
||
117 | { |
||
118 | Debug.Assert(value > 0); |
||
119 | |||
120 | int diff = value - _concurrency; |
||
121 | _concurrency = value; |
||
122 | if (diff > 0) |
||
123 | { |
||
124 | EnqueueToSTPNextNWorkItem(diff); |
||
125 | } |
||
126 | } |
||
127 | } |
||
128 | |||
129 | public override int WaitingCallbacks |
||
130 | { |
||
131 | get { return _workItemsQueue.Count; } |
||
132 | } |
||
133 | |||
134 | public override object[] GetStates() |
||
135 | { |
||
136 | lock (_lock) |
||
137 | { |
||
138 | object[] states = new object[_workItemsQueue.Count]; |
||
139 | int i = 0; |
||
140 | foreach (WorkItem workItem in _workItemsQueue) |
||
141 | { |
||
142 | states[i] = workItem.GetWorkItemResult().State; |
||
143 | ++i; |
||
144 | } |
||
145 | return states; |
||
146 | } |
||
147 | } |
||
148 | |||
149 | /// <summary> |
||
150 | /// WorkItemsGroup start information |
||
151 | /// </summary> |
||
152 | public override WIGStartInfo WIGStartInfo |
||
153 | { |
||
154 | get { return _workItemsGroupStartInfo; } |
||
155 | } |
||
156 | |||
157 | /// <summary> |
||
158 | /// Start the Work Items Group if it was started suspended |
||
159 | /// </summary> |
||
160 | public override void Start() |
||
161 | { |
||
162 | // If the Work Items Group already started then quit |
||
163 | if (!_isSuspended) |
||
164 | { |
||
165 | return; |
||
166 | } |
||
167 | _isSuspended = false; |
||
168 | |||
169 | EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency)); |
||
170 | } |
||
171 | |||
172 | public override void Cancel(bool abortExecution) |
||
173 | { |
||
174 | lock (_lock) |
||
175 | { |
||
176 | _canceledWorkItemsGroup.IsCanceled = true; |
||
177 | _workItemsQueue.Clear(); |
||
178 | _workItemsInStpQueue = 0; |
||
179 | _canceledWorkItemsGroup = new CanceledWorkItemsGroup(); |
||
180 | } |
||
181 | |||
182 | if (abortExecution) |
||
183 | { |
||
184 | _stp.CancelAbortWorkItemsGroup(this); |
||
185 | } |
||
186 | } |
||
187 | |||
188 | /// <summary> |
||
189 | /// Wait for the thread pool to be idle |
||
190 | /// </summary> |
||
191 | public override bool WaitForIdle(int millisecondsTimeout) |
||
192 | { |
||
193 | SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this); |
||
194 | return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); |
||
195 | } |
||
196 | |||
197 | public override event WorkItemsGroupIdleHandler OnIdle |
||
198 | { |
||
199 | add { _onIdle += value; } |
||
200 | remove { _onIdle -= value; } |
||
201 | } |
||
202 | |||
203 | #endregion |
||
204 | |||
205 | #region Private methods |
||
206 | |||
207 | private void RegisterToWorkItemCompletion(IWorkItemResult wir) |
||
208 | { |
||
209 | IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir; |
||
210 | iwir.OnWorkItemStarted += OnWorkItemStartedCallback; |
||
211 | iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback; |
||
212 | } |
||
213 | |||
214 | public void OnSTPIsStarting() |
||
215 | { |
||
216 | if (_isSuspended) |
||
217 | { |
||
218 | return; |
||
219 | } |
||
220 | |||
221 | EnqueueToSTPNextNWorkItem(_concurrency); |
||
222 | } |
||
223 | |||
224 | public void EnqueueToSTPNextNWorkItem(int count) |
||
225 | { |
||
226 | for (int i = 0; i < count; ++i) |
||
227 | { |
||
228 | EnqueueToSTPNextWorkItem(null, false); |
||
229 | } |
||
230 | } |
||
231 | |||
232 | private object FireOnIdle(object state) |
||
233 | { |
||
234 | FireOnIdleImpl(_onIdle); |
||
235 | return null; |
||
236 | } |
||
237 | |||
238 | [MethodImpl(MethodImplOptions.NoInlining)] |
||
239 | private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle) |
||
240 | { |
||
241 | if(null == onIdle) |
||
242 | { |
||
243 | return; |
||
244 | } |
||
245 | |||
246 | Delegate[] delegates = onIdle.GetInvocationList(); |
||
247 | foreach(WorkItemsGroupIdleHandler eh in delegates) |
||
248 | { |
||
249 | try |
||
250 | { |
||
251 | eh(this); |
||
252 | } |
||
253 | catch { } // Suppress exceptions |
||
254 | } |
||
255 | } |
||
256 | |||
257 | private void OnWorkItemStartedCallback(WorkItem workItem) |
||
258 | { |
||
259 | lock(_lock) |
||
260 | { |
||
261 | ++_workItemsExecutingInStp; |
||
262 | } |
||
263 | } |
||
264 | |||
265 | private void OnWorkItemCompletedCallback(WorkItem workItem) |
||
266 | { |
||
267 | EnqueueToSTPNextWorkItem(null, true); |
||
268 | } |
||
269 | |||
270 | internal override void Enqueue(WorkItem workItem) |
||
271 | { |
||
272 | EnqueueToSTPNextWorkItem(workItem); |
||
273 | } |
||
274 | |||
275 | private void EnqueueToSTPNextWorkItem(WorkItem workItem) |
||
276 | { |
||
277 | EnqueueToSTPNextWorkItem(workItem, false); |
||
278 | } |
||
279 | |||
280 | private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue) |
||
281 | { |
||
282 | lock(_lock) |
||
283 | { |
||
284 | // Got here from OnWorkItemCompletedCallback() |
||
285 | if (decrementWorkItemsInStpQueue) |
||
286 | { |
||
287 | --_workItemsInStpQueue; |
||
288 | |||
289 | if(_workItemsInStpQueue < 0) |
||
290 | { |
||
291 | _workItemsInStpQueue = 0; |
||
292 | } |
||
293 | |||
294 | --_workItemsExecutingInStp; |
||
295 | |||
296 | if(_workItemsExecutingInStp < 0) |
||
297 | { |
||
298 | _workItemsExecutingInStp = 0; |
||
299 | } |
||
300 | } |
||
301 | |||
302 | // If the work item is not null then enqueue it |
||
303 | if (null != workItem) |
||
304 | { |
||
305 | workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup; |
||
306 | |||
307 | RegisterToWorkItemCompletion(workItem.GetWorkItemResult()); |
||
308 | _workItemsQueue.Enqueue(workItem); |
||
309 | //_stp.IncrementWorkItemsCount(); |
||
310 | |||
311 | if ((1 == _workItemsQueue.Count) && |
||
312 | (0 == _workItemsInStpQueue)) |
||
313 | { |
||
314 | _stp.RegisterWorkItemsGroup(this); |
||
315 | IsIdle = false; |
||
316 | _isIdleWaitHandle.Reset(); |
||
317 | } |
||
318 | } |
||
319 | |||
320 | // If the work items queue of the group is empty than quit |
||
321 | if (0 == _workItemsQueue.Count) |
||
322 | { |
||
323 | if (0 == _workItemsInStpQueue) |
||
324 | { |
||
325 | _stp.UnregisterWorkItemsGroup(this); |
||
326 | IsIdle = true; |
||
327 | _isIdleWaitHandle.Set(); |
||
328 | if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0) |
||
329 | { |
||
330 | _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle)); |
||
331 | } |
||
332 | } |
||
333 | return; |
||
334 | } |
||
335 | |||
336 | if (!_isSuspended) |
||
337 | { |
||
338 | if (_workItemsInStpQueue < _concurrency) |
||
339 | { |
||
340 | WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem; |
||
341 | try |
||
342 | { |
||
343 | _stp.Enqueue(nextWorkItem); |
||
344 | } |
||
345 | catch (ObjectDisposedException e) |
||
346 | { |
||
347 | e.GetHashCode(); |
||
348 | // The STP has been shutdown |
||
349 | } |
||
350 | |||
351 | ++_workItemsInStpQueue; |
||
352 | } |
||
353 | } |
||
354 | } |
||
355 | } |
||
356 | |||
357 | #endregion |
||
358 | } |
||
359 | |||
360 | #endregion |
||
361 | } |