opensim-development – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 eva 1 #region Release History
2  
3 // Smart Thread Pool
4 // 7 Aug 2004 - Initial release
5 //
6 // 14 Sep 2004 - Bug fixes
7 //
8 // 15 Oct 2004 - Added new features
9 // - Work items return result.
10 // - Support waiting synchronization for multiple work items.
11 // - Work items can be cancelled.
12 // - Passage of the caller thread’s context to the thread in the pool.
13 // - Minimal usage of WIN32 handles.
14 // - Minor bug fixes.
15 //
16 // 26 Dec 2004 - Changes:
17 // - Removed static constructors.
18 // - Added finalizers.
19 // - Changed Exceptions so they are serializable.
20 // - Fixed the bug in one of the SmartThreadPool constructors.
21 // - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
22 // The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
23 // - Added PostExecute with options on which cases to call it.
24 // - Added option to dispose of the state objects.
25 // - Added a WaitForIdle() method that waits until the work items queue is empty.
26 // - Added an STPStartInfo class for the initialization of the thread pool.
27 // - Changed exception handling so if a work item throws an exception it
28 // is rethrown at GetResult(), rather then firing an UnhandledException event.
29 // Note that PostExecute exception are always ignored.
30 //
31 // 25 Mar 2005 - Changes:
32 // - Fixed lost of work items bug
33 //
34 // 3 Jul 2005: Changes.
35 // - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
36 //
37 // 16 Aug 2005: Changes.
38 // - Fixed bug where the InUseThreads becomes negative when canceling work items.
39 //
40 // 31 Jan 2006 - Changes:
41 // - Added work items priority
42 // - Removed support of chained delegates in callbacks and post executes (nobody really use this)
43 // - Added work items groups
44 // - Added work items groups idle event
45 // - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
46 // it returns true rather then throwing an exception.
47 // - Added option to start the STP and the WIG as suspended
48 // - Exception behavior changed, the real exception is returned by an
49 // inner exception
50 // - Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
51 // - Added performance counters
52 // - Added priority to the threads in the pool
53 //
54 // 13 Feb 2006 - Changes:
55 // - Added a call to the dispose of the Performance Counter so
56 // their won't be a Performance Counter leak.
57 // - Added exception catch in case the Performance Counters cannot
58 // be created.
59 //
60 // 17 May 2008 - Changes:
61 // - Changed the dispose behavior and removed the Finalizers.
62 // - Enabled the change of the MaxThreads and MinThreads at run time.
63 // - Enabled the change of the Concurrency of a IWorkItemsGroup at run
64 // time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency
65 // refers to the MaxThreads.
66 // - Improved the cancel behavior.
67 // - Added events for thread creation and termination.
68 // - Fixed the HttpContext context capture.
69 // - Changed internal collections so they use generic collections
70 // - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup
71 // - Added support for WinCE
72 // - Added support for Action<T> and Func<T>
73 //
74 // 07 April 2009 - Changes:
75 // - Added support for Silverlight and Mono
76 // - Added Join, Choice, and Pipe to SmartThreadPool.
77 // - Added local performance counters (for Mono, Silverlight, and WindowsCE)
78 // - Changed duration measures from DateTime.Now to Stopwatch.
79 // - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList<T>.
80 //
81 // 21 December 2009 - Changes:
82 // - Added work item timeout (passive)
83 //
84 // 20 August 2012 - Changes:
85 // - Added set name to threads
86 // - Fixed the WorkItemsQueue.Dequeue.
87 // Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... }
88 // - Fixed SmartThreadPool.Pipe
89 // - Added IsBackground option to threads
90 // - Added ApartmentState to threads
91 // - Fixed thread creation when queuing many work items at the same time.
92 //
93 // 24 August 2012 - Changes:
94 // - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan
95 // - Added option to set MaxStackSize of threads
96  
97 #endregion
98  
99 using System;
100 using System.Security;
101 using System.Threading;
102 using System.Collections;
103 using System.Collections.Generic;
104 using System.Diagnostics;
105 using System.Runtime.CompilerServices;
106  
107 using Amib.Threading.Internal;
108  
109 namespace Amib.Threading
110 {
111 #region SmartThreadPool class
112 /// <summary>
113 /// Smart thread pool class.
114 /// </summary>
115 public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable
116 {
117 #region Public Default Constants
118  
119 /// <summary>
120 /// Default minimum number of threads the thread pool contains. (0)
121 /// </summary>
122 public const int DefaultMinWorkerThreads = 0;
123  
124 /// <summary>
125 /// Default maximum number of threads the thread pool contains. (25)
126 /// </summary>
127 public const int DefaultMaxWorkerThreads = 25;
128  
129 /// <summary>
130 /// Default idle timeout in milliseconds. (One minute)
131 /// </summary>
132 public const int DefaultIdleTimeout = 60*1000; // One minute
133  
134 /// <summary>
135 /// Indicate to copy the security context of the caller and then use it in the call. (false)
136 /// </summary>
137 public const bool DefaultUseCallerCallContext = false;
138  
139 /// <summary>
140 /// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
141 /// </summary>
142 public const bool DefaultUseCallerHttpContext = false;
143  
144 /// <summary>
145 /// Indicate to dispose of the state objects if they support the IDispose interface. (false)
146 /// </summary>
147 public const bool DefaultDisposeOfStateObjects = false;
148  
149 /// <summary>
150 /// The default option to run the post execute (CallToPostExecute.Always)
151 /// </summary>
152 public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always;
153  
154 /// <summary>
155 /// The default post execute method to run. (None)
156 /// When null it means not to call it.
157 /// </summary>
158 public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback;
159  
160 /// <summary>
161 /// The default work item priority (WorkItemPriority.Normal)
162 /// </summary>
163 public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal;
164  
165 /// <summary>
166 /// The default is to work on work items as soon as they arrive
167 /// and not to wait for the start. (false)
168 /// </summary>
169 public const bool DefaultStartSuspended = false;
170  
171 /// <summary>
172 /// The default name to use for the performance counters instance. (null)
173 /// </summary>
174 public static readonly string DefaultPerformanceCounterInstanceName;
175  
176 #if !(WINDOWS_PHONE)
177  
178 /// <summary>
179 /// The default thread priority (ThreadPriority.Normal)
180 /// </summary>
181 public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal;
182 #endif
183 /// <summary>
184 /// The default thread pool name. (SmartThreadPool)
185 /// </summary>
186 public const string DefaultThreadPoolName = "SmartThreadPool";
187  
188 /// <summary>
189 /// The default Max Stack Size. (SmartThreadPool)
190 /// </summary>
191 public static readonly int? DefaultMaxStackSize = null;
192  
193 /// <summary>
194 /// The default fill state with params. (false)
195 /// It is relevant only to QueueWorkItem of Action&lt;...&gt;/Func&lt;...&gt;
196 /// </summary>
197 public const bool DefaultFillStateWithArgs = false;
198  
199 /// <summary>
200 /// The default thread backgroundness. (true)
201 /// </summary>
202 public const bool DefaultAreThreadsBackground = true;
203  
204 #if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
205 /// <summary>
206 /// The default apartment state of a thread in the thread pool.
207 /// The default is ApartmentState.Unknown which means the STP will not
208 /// set the apartment of the thread. It will use the .NET default.
209 /// </summary>
210 public const ApartmentState DefaultApartmentState = ApartmentState.Unknown;
211 #endif
212  
213 #endregion
214  
215 #region Member Variables
216  
217 /// <summary>
218 /// Dictionary of all the threads in the thread pool.
219 /// </summary>
220 private readonly SynchronizedDictionary<Thread, ThreadEntry> _workerThreads = new SynchronizedDictionary<Thread, ThreadEntry>();
221  
222 /// <summary>
223 /// Queue of work items.
224 /// </summary>
225 private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue();
226  
227 /// <summary>
228 /// Count the work items handled.
229 /// Used by the performance counter.
230 /// </summary>
231 private int _workItemsProcessed;
232  
233 /// <summary>
234 /// Number of threads that currently work (not idle).
235 /// </summary>
236 private int _inUseWorkerThreads;
237  
238 /// <summary>
239 /// Stores a copy of the original STPStartInfo.
240 /// It is used to change the MinThread and MaxThreads
241 /// </summary>
242 private STPStartInfo _stpStartInfo;
243  
244 /// <summary>
245 /// Total number of work items that are stored in the work items queue
246 /// plus the work items that the threads in the pool are working on.
247 /// </summary>
248 private int _currentWorkItemsCount;
249  
250 /// <summary>
251 /// Signaled when the thread pool is idle, i.e. no thread is busy
252 /// and the work items queue is empty
253 /// </summary>
254 //private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
255 private ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
256  
257 /// <summary>
258 /// An event to signal all the threads to quit immediately.
259 /// </summary>
260 //private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
261 private ManualResetEvent _shuttingDownEvent = EventWaitHandleFactory.CreateManualResetEvent(false);
262  
263 /// <summary>
264 /// A flag to indicate if the Smart Thread Pool is now suspended.
265 /// </summary>
266 private bool _isSuspended;
267  
268 /// <summary>
269 /// A flag to indicate the threads to quit.
270 /// </summary>
271 private bool _shutdown;
272  
273 /// <summary>
274 /// Counts the threads created in the pool.
275 /// It is used to name the threads.
276 /// </summary>
277 private int _threadCounter;
278  
279 /// <summary>
280 /// Indicate that the SmartThreadPool has been disposed
281 /// </summary>
282 private bool _isDisposed;
283  
284 /// <summary>
285 /// Holds all the WorkItemsGroup instaces that have at least one
286 /// work item int the SmartThreadPool
287 /// This variable is used in case of Shutdown
288 /// </summary>
289 private readonly SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup> _workItemsGroups = new SynchronizedDictionary<IWorkItemsGroup, IWorkItemsGroup>();
290  
291 /// <summary>
292 /// A common object for all the work items int the STP
293 /// so we can mark them to cancel in O(1)
294 /// </summary>
295 private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup();
296  
297 /// <summary>
298 /// Windows STP performance counters
299 /// </summary>
300 private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
301  
302 /// <summary>
303 /// Local STP performance counters
304 /// </summary>
305 private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance;
306  
307  
308 #if (WINDOWS_PHONE)
309 private static readonly Dictionary<int, ThreadEntry> _threadEntries = new Dictionary<int, ThreadEntry>();
310 #elif (_WINDOWS_CE)
311 private static LocalDataStoreSlot _threadEntrySlot = Thread.AllocateDataSlot();
312 #else
313 [ThreadStatic]
314 private static ThreadEntry _threadEntry;
315  
316 #endif
317  
318 /// <summary>
319 /// An event to call after a thread is created, but before
320 /// it's first use.
321 /// </summary>
322 private event ThreadInitializationHandler _onThreadInitialization;
323  
324 /// <summary>
325 /// An event to call when a thread is about to exit, after
326 /// it is no longer belong to the pool.
327 /// </summary>
328 private event ThreadTerminationHandler _onThreadTermination;
329  
330 #endregion
331  
332 #region Per thread properties
333  
334 /// <summary>
335 /// A reference to the current work item a thread from the thread pool
336 /// is executing.
337 /// </summary>
338 internal static ThreadEntry CurrentThreadEntry
339 {
340 #if (WINDOWS_PHONE)
341 get
342 {
343 lock(_threadEntries)
344 {
345 ThreadEntry threadEntry;
346 if (_threadEntries.TryGetValue(Thread.CurrentThread.ManagedThreadId, out threadEntry))
347 {
348 return threadEntry;
349 }
350 }
351 return null;
352 }
353 set
354 {
355 lock(_threadEntries)
356 {
357 _threadEntries[Thread.CurrentThread.ManagedThreadId] = value;
358 }
359 }
360 #elif (_WINDOWS_CE)
361 get
362 {
363 //Thread.CurrentThread.ManagedThreadId
364 return Thread.GetData(_threadEntrySlot) as ThreadEntry;
365 }
366 set
367 {
368 Thread.SetData(_threadEntrySlot, value);
369 }
370 #else
371 get
372 {
373 return _threadEntry;
374 }
375 set
376 {
377 _threadEntry = value;
378 }
379 #endif
380 }
381 #endregion
382  
383 #region Construction and Finalization
384  
385 /// <summary>
386 /// Constructor
387 /// </summary>
388 public SmartThreadPool()
389 {
390 _stpStartInfo = new STPStartInfo();
391 Initialize();
392 }
393  
394 /// <summary>
395 /// Constructor
396 /// </summary>
397 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
398 public SmartThreadPool(int idleTimeout)
399 {
400 _stpStartInfo = new STPStartInfo
401 {
402 IdleTimeout = idleTimeout,
403 };
404 Initialize();
405 }
406  
407 /// <summary>
408 /// Constructor
409 /// </summary>
410 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
411 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
412 public SmartThreadPool(
413 int idleTimeout,
414 int maxWorkerThreads)
415 {
416 _stpStartInfo = new STPStartInfo
417 {
418 IdleTimeout = idleTimeout,
419 MaxWorkerThreads = maxWorkerThreads,
420 };
421 Initialize();
422 }
423  
424 /// <summary>
425 /// Constructor
426 /// </summary>
427 /// <param name="idleTimeout">Idle timeout in milliseconds</param>
428 /// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
429 /// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
430 public SmartThreadPool(
431 int idleTimeout,
432 int maxWorkerThreads,
433 int minWorkerThreads)
434 {
435 _stpStartInfo = new STPStartInfo
436 {
437 IdleTimeout = idleTimeout,
438 MaxWorkerThreads = maxWorkerThreads,
439 MinWorkerThreads = minWorkerThreads,
440 };
441 Initialize();
442 }
443  
444 /// <summary>
445 /// Constructor
446 /// </summary>
447 /// <param name="stpStartInfo">A SmartThreadPool configuration that overrides the default behavior</param>
448 public SmartThreadPool(STPStartInfo stpStartInfo)
449 {
450 _stpStartInfo = new STPStartInfo(stpStartInfo);
451 Initialize();
452 }
453  
454 private void Initialize()
455 {
456 Name = _stpStartInfo.ThreadPoolName;
457 ValidateSTPStartInfo();
458  
459 // _stpStartInfoRW stores a read/write copy of the STPStartInfo.
460 // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten
461  
462 _isSuspended = _stpStartInfo.StartSuspended;
463  
464 #if (_WINDOWS_CE) || (_SILVERLIGHT) || (_MONO) || (WINDOWS_PHONE)
465 if (null != _stpStartInfo.PerformanceCounterInstanceName)
466 {
467 throw new NotSupportedException("Performance counters are not implemented for Compact Framework/Silverlight/Mono, instead use StpStartInfo.EnableLocalPerformanceCounters");
468 }
469 #else
470 if (null != _stpStartInfo.PerformanceCounterInstanceName)
471 {
472 try
473 {
474 _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName);
475 }
476 catch (Exception e)
477 {
478 Debug.WriteLine("Unable to create Performance Counters: " + e);
479 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
480 }
481 }
482 #endif
483  
484 if (_stpStartInfo.EnableLocalPerformanceCounters)
485 {
486 _localPCs = new LocalSTPInstancePerformanceCounters();
487 }
488  
489 // If the STP is not started suspended then start the threads.
490 if (!_isSuspended)
491 {
492 StartOptimalNumberOfThreads();
493 }
494 }
495  
496 private void StartOptimalNumberOfThreads()
497 {
498 int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads);
499 threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads);
500 threadsCount -= _workerThreads.Count;
501 if (threadsCount > 0)
502 {
503 StartThreads(threadsCount);
504 }
505 }
506  
507 private void ValidateSTPStartInfo()
508 {
509 if (_stpStartInfo.MinWorkerThreads < 0)
510 {
511 throw new ArgumentOutOfRangeException(
512 "MinWorkerThreads", "MinWorkerThreads cannot be negative");
513 }
514  
515 if (_stpStartInfo.MaxWorkerThreads <= 0)
516 {
517 throw new ArgumentOutOfRangeException(
518 "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero");
519 }
520  
521 if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads)
522 {
523 throw new ArgumentOutOfRangeException(
524 "MinWorkerThreads, maxWorkerThreads",
525 "MaxWorkerThreads must be greater or equal to MinWorkerThreads");
526 }
527 }
528  
529 private static void ValidateCallback(Delegate callback)
530 {
531 if(callback.GetInvocationList().Length > 1)
532 {
533 throw new NotSupportedException("SmartThreadPool doesn't support delegates chains");
534 }
535 }
536  
537 #endregion
538  
539 #region Thread Processing
540  
541 /// <summary>
542 /// Waits on the queue for a work item, shutdown, or timeout.
543 /// </summary>
544 /// <returns>
545 /// Returns the WaitingCallback or null in case of timeout or shutdown.
546 /// </returns>
547 private WorkItem Dequeue()
548 {
549 WorkItem workItem =
550 _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent);
551  
552 return workItem;
553 }
554  
555 /// <summary>
556 /// Put a new work item in the queue
557 /// </summary>
558 /// <param name="workItem">A work item to queue</param>
559 internal override void Enqueue(WorkItem workItem)
560 {
561 // Make sure the workItem is not null
562 Debug.Assert(null != workItem);
563  
564 IncrementWorkItemsCount();
565  
566 workItem.CanceledSmartThreadPool = _canceledSmartThreadPool;
567 _workItemsQueue.EnqueueWorkItem(workItem);
568 workItem.WorkItemIsQueued();
569  
570 // If all the threads are busy then try to create a new one
571 if (_currentWorkItemsCount > _workerThreads.Count)
572 {
573 StartThreads(1);
574 }
575 }
576  
577 private void IncrementWorkItemsCount()
578 {
579 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
580 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
581  
582 int count = Interlocked.Increment(ref _currentWorkItemsCount);
583 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
584 if (count == 1)
585 {
586 IsIdle = false;
587 _isIdleWaitHandle.Reset();
588 }
589 }
590  
591 private void DecrementWorkItemsCount()
592 {
593 int count = Interlocked.Decrement(ref _currentWorkItemsCount);
594 //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
595 if (count == 0)
596 {
597 IsIdle = true;
598 _isIdleWaitHandle.Set();
599 }
600  
601 Interlocked.Increment(ref _workItemsProcessed);
602  
603 if (!_shutdown)
604 {
605 // The counter counts even if the work item was cancelled
606 _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
607 _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed);
608 }
609  
610 }
611  
612 internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
613 {
614 _workItemsGroups[workItemsGroup] = workItemsGroup;
615 }
616  
617 internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup)
618 {
619 if (_workItemsGroups.Contains(workItemsGroup))
620 {
621 _workItemsGroups.Remove(workItemsGroup);
622 }
623 }
624  
625 /// <summary>
626 /// Inform that the current thread is about to quit or quiting.
627 /// The same thread may call this method more than once.
628 /// </summary>
629 private void InformCompleted()
630 {
631 // There is no need to lock the two methods together
632 // since only the current thread removes itself
633 // and the _workerThreads is a synchronized dictionary
634 if (_workerThreads.Contains(Thread.CurrentThread))
635 {
636 _workerThreads.Remove(Thread.CurrentThread);
637 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
638 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
639 }
640 }
641  
642 /// <summary>
643 /// Starts new threads
644 /// </summary>
645 /// <param name="threadsCount">The number of threads to start</param>
646 private void StartThreads(int threadsCount)
647 {
648 if (_isSuspended)
649 {
650 return;
651 }
652  
653 lock(_workerThreads.SyncRoot)
654 {
655 // Don't start threads on shut down
656 if (_shutdown)
657 {
658 return;
659 }
660  
661 for(int i = 0; i < threadsCount; ++i)
662 {
663 // Don't create more threads then the upper limit
664 if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads)
665 {
666 return;
667 }
668  
669 // Create a new thread
670  
671 #if (_SILVERLIGHT) || (WINDOWS_PHONE)
672 Thread workerThread = new Thread(ProcessQueuedItems);
673 #else
674 Thread workerThread =
675 _stpStartInfo.MaxStackSize.HasValue
676 ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value)
677 : new Thread(ProcessQueuedItems);
678 #endif
679 // Configure the new thread and start it
680 workerThread.Name = "STP " + Name + " Thread #" + _threadCounter;
681 workerThread.IsBackground = _stpStartInfo.AreThreadsBackground;
682  
683 #if !(_SILVERLIGHT) && !(_WINDOWS_CE) && !(WINDOWS_PHONE)
684 if (_stpStartInfo.ApartmentState != ApartmentState.Unknown)
685 {
686 workerThread.SetApartmentState(_stpStartInfo.ApartmentState);
687 }
688 #endif
689  
690 #if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
691 workerThread.Priority = _stpStartInfo.ThreadPriority;
692 #endif
693 workerThread.Start();
694 ++_threadCounter;
695  
696 // Add it to the dictionary and update its creation time.
697 _workerThreads[workerThread] = new ThreadEntry(this);
698  
699 _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
700 _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads);
701 }
702 }
703 }
704  
705 /// <summary>
706 /// A worker thread method that processes work items from the work items queue.
707 /// </summary>
708 private void ProcessQueuedItems()
709 {
710 // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
711 // of the dictionary.
712 CurrentThreadEntry = _workerThreads[Thread.CurrentThread];
713  
714 FireOnThreadInitialization();
715  
716 try
717 {
718 bool bInUseWorkerThreadsWasIncremented = false;
719  
720 // Process until shutdown.
721 while(!_shutdown)
722 {
723 // Update the last time this thread was seen alive.
724 // It's good for debugging.
725 CurrentThreadEntry.IAmAlive();
726  
727 // The following block handles the when the MaxWorkerThreads has been
728 // incremented by the user at run-time.
729 // Double lock for quit.
730 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
731 {
732 lock (_workerThreads.SyncRoot)
733 {
734 if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads)
735 {
736 // Inform that the thread is quiting and then quit.
737 // This method must be called within this lock or else
738 // more threads will quit and the thread pool will go
739 // below the lower limit.
740 InformCompleted();
741 break;
742 }
743 }
744 }
745  
746 // Wait for a work item, shutdown, or timeout
747 WorkItem workItem = Dequeue();
748  
749 // Update the last time this thread was seen alive.
750 // It's good for debugging.
751 CurrentThreadEntry.IAmAlive();
752  
753 // On timeout or shut down.
754 if (null == workItem)
755 {
756 // Double lock for quit.
757 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
758 {
759 lock(_workerThreads.SyncRoot)
760 {
761 if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads)
762 {
763 // Inform that the thread is quiting and then quit.
764 // This method must be called within this lock or else
765 // more threads will quit and the thread pool will go
766 // below the lower limit.
767 InformCompleted();
768 break;
769 }
770 }
771 }
772 }
773  
774 // If we didn't quit then skip to the next iteration.
775 if (null == workItem)
776 {
777 continue;
778 }
779  
780 try
781 {
782 // Initialize the value to false
783 bInUseWorkerThreadsWasIncremented = false;
784  
785 // Set the Current Work Item of the thread.
786 // Store the Current Work Item before the workItem.StartingWorkItem() is called,
787 // so WorkItem.Cancel can work when the work item is between InQueue and InProgress
788 // states.
789 // If the work item has been cancelled BEFORE the workItem.StartingWorkItem()
790 // (work item is in InQueue state) then workItem.StartingWorkItem() will return false.
791 // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then
792 // (work item is in InProgress state) then the thread will be aborted
793 CurrentThreadEntry.CurrentWorkItem = workItem;
794  
795 // Change the state of the work item to 'in progress' if possible.
796 // We do it here so if the work item has been canceled we won't
797 // increment the _inUseWorkerThreads.
798 // The cancel mechanism doesn't delete items from the queue,
799 // it marks the work item as canceled, and when the work item
800 // is dequeued, we just skip it.
801 // If the post execute of work item is set to always or to
802 // call when the work item is canceled then the StartingWorkItem()
803 // will return true, so the post execute can run.
804 if (!workItem.StartingWorkItem())
805 {
806 continue;
807 }
808  
809 // Execute the callback. Make sure to accurately
810 // record how many callbacks are currently executing.
811 int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads);
812 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
813 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
814  
815 // Mark that the _inUseWorkerThreads incremented, so in the finally{}
816 // statement we will decrement it correctly.
817 bInUseWorkerThreadsWasIncremented = true;
818  
819 workItem.FireWorkItemStarted();
820  
821 ExecuteWorkItem(workItem);
822 }
823 catch(Exception ex)
824 {
825 ex.GetHashCode();
826 // Do nothing
827 }
828 finally
829 {
830 workItem.DisposeOfState();
831  
832 // Set the CurrentWorkItem to null, since we
833 // no longer run user's code.
834 CurrentThreadEntry.CurrentWorkItem = null;
835  
836 // Decrement the _inUseWorkerThreads only if we had
837 // incremented it. Note the cancelled work items don't
838 // increment _inUseWorkerThreads.
839 if (bInUseWorkerThreadsWasIncremented)
840 {
841 int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads);
842 _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
843 _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads);
844 }
845  
846 // Notify that the work item has been completed.
847 // WorkItemsGroup may enqueue their next work item.
848 workItem.FireWorkItemCompleted();
849  
850 // Decrement the number of work items here so the idle
851 // ManualResetEvent won't fluctuate.
852 DecrementWorkItemsCount();
853 }
854 }
855 }
856 catch(ThreadAbortException tae)
857 {
858 tae.GetHashCode();
859 // Handle the abort exception gracfully.
860 #if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
861 Thread.ResetAbort();
862 #endif
863 }
864 catch(Exception e)
865 {
866 Debug.Assert(null != e);
867 }
868 finally
869 {
870 InformCompleted();
871 FireOnThreadTermination();
872 }
873 }
874  
875 private void ExecuteWorkItem(WorkItem workItem)
876 {
877 _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
878 _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime);
879 try
880 {
881 workItem.Execute();
882 }
883 finally
884 {
885 _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
886 _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime);
887 }
888 }
889  
890  
891 #endregion
892  
893 #region Public Methods
894  
895 private void ValidateWaitForIdle()
896 {
897 if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this)
898 {
899 throw new NotSupportedException(
900 "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
901 }
902 }
903  
904 internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup)
905 {
906 if (null == CurrentThreadEntry)
907 {
908 return;
909 }
910  
911 WorkItem workItem = CurrentThreadEntry.CurrentWorkItem;
912 ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem);
913 if ((null != workItemsGroup) &&
914 (null != workItem) &&
915 CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup))
916 {
917 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
918 }
919 }
920  
921 [MethodImpl(MethodImplOptions.NoInlining)]
922 private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem)
923 {
924 if ((null != workItemsGroup) &&
925 (null != workItem) &&
926 workItem.WasQueuedBy(workItemsGroup))
927 {
928 throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock");
929 }
930 }
931  
932 /// <summary>
933 /// Force the SmartThreadPool to shutdown
934 /// </summary>
935 public void Shutdown()
936 {
937 Shutdown(true, 0);
938 }
939  
940 /// <summary>
941 /// Force the SmartThreadPool to shutdown with timeout
942 /// </summary>
943 public void Shutdown(bool forceAbort, TimeSpan timeout)
944 {
945 Shutdown(forceAbort, (int)timeout.TotalMilliseconds);
946 }
947  
948 /// <summary>
949 /// Empties the queue of work items and abort the threads in the pool.
950 /// </summary>
951 public void Shutdown(bool forceAbort, int millisecondsTimeout)
952 {
953 ValidateNotDisposed();
954  
955 ISTPInstancePerformanceCounters pcs = _windowsPCs;
956  
957 if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs)
958 {
959 // Set the _pcs to "null" to stop updating the performance
960 // counters
961 _windowsPCs = NullSTPInstancePerformanceCounters.Instance;
962  
963 pcs.Dispose();
964 }
965  
966 Thread [] threads;
967 lock(_workerThreads.SyncRoot)
968 {
969 // Shutdown the work items queue
970 _workItemsQueue.Dispose();
971  
972 // Signal the threads to exit
973 _shutdown = true;
974 _shuttingDownEvent.Set();
975  
976 // Make a copy of the threads' references in the pool
977 threads = new Thread [_workerThreads.Count];
978 _workerThreads.Keys.CopyTo(threads, 0);
979 }
980  
981 int millisecondsLeft = millisecondsTimeout;
982 Stopwatch stopwatch = Stopwatch.StartNew();
983 //DateTime start = DateTime.UtcNow;
984 bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout);
985 bool timeout = false;
986  
987 // Each iteration we update the time left for the timeout.
988 foreach(Thread thread in threads)
989 {
990 // Join don't work with negative numbers
991 if (!waitInfinitely && (millisecondsLeft < 0))
992 {
993 timeout = true;
994 break;
995 }
996  
997 // Wait for the thread to terminate
998 bool success = thread.Join(millisecondsLeft);
999 if(!success)
1000 {
1001 timeout = true;
1002 break;
1003 }
1004  
1005 if(!waitInfinitely)
1006 {
1007 // Update the time left to wait
1008 //TimeSpan ts = DateTime.UtcNow - start;
1009 millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds;
1010 }
1011 }
1012  
1013 if (timeout && forceAbort)
1014 {
1015 // Abort the threads in the pool
1016 foreach(Thread thread in threads)
1017 {
1018  
1019 if ((thread != null)
1020 #if !(_WINDOWS_CE)
1021 && thread.IsAlive
1022 #endif
1023 )
1024 {
1025 try
1026 {
1027 thread.Abort(); // Shutdown
1028 }
1029 catch(SecurityException e)
1030 {
1031 e.GetHashCode();
1032 }
1033 catch(ThreadStateException ex)
1034 {
1035 ex.GetHashCode();
1036 // In case the thread has been terminated
1037 // after the check if it is alive.
1038 }
1039 }
1040 }
1041 }
1042 }
1043  
1044 /// <summary>
1045 /// Wait for all work items to complete
1046 /// </summary>
1047 /// <param name="waitableResults">Array of work item result objects</param>
1048 /// <returns>
1049 /// true when every work item in workItemResults has completed; otherwise false.
1050 /// </returns>
1051 public static bool WaitAll(
1052 IWaitableResult [] waitableResults)
1053 {
1054 return WaitAll(waitableResults, Timeout.Infinite, true);
1055 }
1056  
1057 /// <summary>
1058 /// Wait for all work items to complete
1059 /// </summary>
1060 /// <param name="waitableResults">Array of work item result objects</param>
1061 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1062 /// <param name="exitContext">
1063 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1064 /// </param>
1065 /// <returns>
1066 /// true when every work item in workItemResults has completed; otherwise false.
1067 /// </returns>
1068 public static bool WaitAll(
1069 IWaitableResult [] waitableResults,
1070 TimeSpan timeout,
1071 bool exitContext)
1072 {
1073 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1074 }
1075  
1076 /// <summary>
1077 /// Wait for all work items to complete
1078 /// </summary>
1079 /// <param name="waitableResults">Array of work item result objects</param>
1080 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1081 /// <param name="exitContext">
1082 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1083 /// </param>
1084 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1085 /// <returns>
1086 /// true when every work item in workItemResults has completed; otherwise false.
1087 /// </returns>
1088 public static bool WaitAll(
1089 IWaitableResult[] waitableResults,
1090 TimeSpan timeout,
1091 bool exitContext,
1092 WaitHandle cancelWaitHandle)
1093 {
1094 return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1095 }
1096  
1097 /// <summary>
1098 /// Wait for all work items to complete
1099 /// </summary>
1100 /// <param name="waitableResults">Array of work item result objects</param>
1101 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1102 /// <param name="exitContext">
1103 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1104 /// </param>
1105 /// <returns>
1106 /// true when every work item in workItemResults has completed; otherwise false.
1107 /// </returns>
1108 public static bool WaitAll(
1109 IWaitableResult [] waitableResults,
1110 int millisecondsTimeout,
1111 bool exitContext)
1112 {
1113 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null);
1114 }
1115  
1116 /// <summary>
1117 /// Wait for all work items to complete
1118 /// </summary>
1119 /// <param name="waitableResults">Array of work item result objects</param>
1120 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1121 /// <param name="exitContext">
1122 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1123 /// </param>
1124 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1125 /// <returns>
1126 /// true when every work item in workItemResults has completed; otherwise false.
1127 /// </returns>
1128 public static bool WaitAll(
1129 IWaitableResult[] waitableResults,
1130 int millisecondsTimeout,
1131 bool exitContext,
1132 WaitHandle cancelWaitHandle)
1133 {
1134 return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1135 }
1136  
1137  
1138 /// <summary>
1139 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1140 /// </summary>
1141 /// <param name="waitableResults">Array of work item result objects</param>
1142 /// <returns>
1143 /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
1144 /// </returns>
1145 public static int WaitAny(
1146 IWaitableResult [] waitableResults)
1147 {
1148 return WaitAny(waitableResults, Timeout.Infinite, true);
1149 }
1150  
1151 /// <summary>
1152 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1153 /// </summary>
1154 /// <param name="waitableResults">Array of work item result objects</param>
1155 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1156 /// <param name="exitContext">
1157 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1158 /// </param>
1159 /// <returns>
1160 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1161 /// </returns>
1162 public static int WaitAny(
1163 IWaitableResult[] waitableResults,
1164 TimeSpan timeout,
1165 bool exitContext)
1166 {
1167 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext);
1168 }
1169  
1170 /// <summary>
1171 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1172 /// </summary>
1173 /// <param name="waitableResults">Array of work item result objects</param>
1174 /// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
1175 /// <param name="exitContext">
1176 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1177 /// </param>
1178 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1179 /// <returns>
1180 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1181 /// </returns>
1182 public static int WaitAny(
1183 IWaitableResult [] waitableResults,
1184 TimeSpan timeout,
1185 bool exitContext,
1186 WaitHandle cancelWaitHandle)
1187 {
1188 return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle);
1189 }
1190  
1191 /// <summary>
1192 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1193 /// </summary>
1194 /// <param name="waitableResults">Array of work item result objects</param>
1195 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1196 /// <param name="exitContext">
1197 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1198 /// </param>
1199 /// <returns>
1200 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1201 /// </returns>
1202 public static int WaitAny(
1203 IWaitableResult [] waitableResults,
1204 int millisecondsTimeout,
1205 bool exitContext)
1206 {
1207 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null);
1208 }
1209  
1210 /// <summary>
1211 /// Waits for any of the work items in the specified array to complete, cancel, or timeout
1212 /// </summary>
1213 /// <param name="waitableResults">Array of work item result objects</param>
1214 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
1215 /// <param name="exitContext">
1216 /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
1217 /// </param>
1218 /// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
1219 /// <returns>
1220 /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
1221 /// </returns>
1222 public static int WaitAny(
1223 IWaitableResult [] waitableResults,
1224 int millisecondsTimeout,
1225 bool exitContext,
1226 WaitHandle cancelWaitHandle)
1227 {
1228 return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle);
1229 }
1230  
1231 /// <summary>
1232 /// Creates a new WorkItemsGroup.
1233 /// </summary>
1234 /// <param name="concurrency">The number of work items that can be run concurrently</param>
1235 /// <returns>A reference to the WorkItemsGroup</returns>
1236 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency)
1237 {
1238 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo);
1239 return workItemsGroup;
1240 }
1241  
1242 /// <summary>
1243 /// Creates a new WorkItemsGroup.
1244 /// </summary>
1245 /// <param name="concurrency">The number of work items that can be run concurrently</param>
1246 /// <param name="wigStartInfo">A WorkItemsGroup configuration that overrides the default behavior</param>
1247 /// <returns>A reference to the WorkItemsGroup</returns>
1248 public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo)
1249 {
1250 IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo);
1251 return workItemsGroup;
1252 }
1253  
1254 #region Fire Thread's Events
1255  
1256 private void FireOnThreadInitialization()
1257 {
1258 if (null != _onThreadInitialization)
1259 {
1260 foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList())
1261 {
1262 try
1263 {
1264 tih();
1265 }
1266 catch (Exception e)
1267 {
1268 e.GetHashCode();
1269 Debug.Assert(false);
1270 throw;
1271 }
1272 }
1273 }
1274 }
1275  
1276 private void FireOnThreadTermination()
1277 {
1278 if (null != _onThreadTermination)
1279 {
1280 foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList())
1281 {
1282 try
1283 {
1284 tth();
1285 }
1286 catch (Exception e)
1287 {
1288 e.GetHashCode();
1289 Debug.Assert(false);
1290 throw;
1291 }
1292 }
1293 }
1294 }
1295  
1296 #endregion
1297  
1298 /// <summary>
1299 /// This event is fired when a thread is created.
1300 /// Use it to initialize a thread before the work items use it.
1301 /// </summary>
1302 public event ThreadInitializationHandler OnThreadInitialization
1303 {
1304 add { _onThreadInitialization += value; }
1305 remove { _onThreadInitialization -= value; }
1306 }
1307  
1308 /// <summary>
1309 /// This event is fired when a thread is terminating.
1310 /// Use it for cleanup.
1311 /// </summary>
1312 public event ThreadTerminationHandler OnThreadTermination
1313 {
1314 add { _onThreadTermination += value; }
1315 remove { _onThreadTermination -= value; }
1316 }
1317  
1318  
1319 internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig)
1320 {
1321 foreach (ThreadEntry threadEntry in _workerThreads.Values)
1322 {
1323 WorkItem workItem = threadEntry.CurrentWorkItem;
1324 if (null != workItem &&
1325 workItem.WasQueuedBy(wig) &&
1326 !workItem.IsCanceled)
1327 {
1328 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1329 }
1330 }
1331 }
1332  
1333  
1334  
1335 #endregion
1336  
1337 #region Properties
1338  
1339 /// <summary>
1340 /// Get/Set the lower limit of threads in the pool.
1341 /// </summary>
1342 public int MinThreads
1343 {
1344 get
1345 {
1346 ValidateNotDisposed();
1347 return _stpStartInfo.MinWorkerThreads;
1348 }
1349 set
1350 {
1351 Debug.Assert(value >= 0);
1352 Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads);
1353 if (_stpStartInfo.MaxWorkerThreads < value)
1354 {
1355 _stpStartInfo.MaxWorkerThreads = value;
1356 }
1357 _stpStartInfo.MinWorkerThreads = value;
1358 StartOptimalNumberOfThreads();
1359 }
1360 }
1361  
1362 /// <summary>
1363 /// Get/Set the upper limit of threads in the pool.
1364 /// </summary>
1365 public int MaxThreads
1366 {
1367 get
1368 {
1369 ValidateNotDisposed();
1370 return _stpStartInfo.MaxWorkerThreads;
1371 }
1372  
1373 set
1374 {
1375 Debug.Assert(value > 0);
1376 Debug.Assert(value >= _stpStartInfo.MinWorkerThreads);
1377 if (_stpStartInfo.MinWorkerThreads > value)
1378 {
1379 _stpStartInfo.MinWorkerThreads = value;
1380 }
1381 _stpStartInfo.MaxWorkerThreads = value;
1382 StartOptimalNumberOfThreads();
1383 }
1384 }
1385 /// <summary>
1386 /// Get the number of threads in the thread pool.
1387 /// Should be between the lower and the upper limits.
1388 /// </summary>
1389 public int ActiveThreads
1390 {
1391 get
1392 {
1393 ValidateNotDisposed();
1394 return _workerThreads.Count;
1395 }
1396 }
1397  
1398 /// <summary>
1399 /// Get the number of busy (not idle) threads in the thread pool.
1400 /// </summary>
1401 public int InUseThreads
1402 {
1403 get
1404 {
1405 ValidateNotDisposed();
1406 return _inUseWorkerThreads;
1407 }
1408 }
1409  
1410 /// <summary>
1411 /// Returns true if the current running work item has been cancelled.
1412 /// Must be used within the work item's callback method.
1413 /// The work item should sample this value in order to know if it
1414 /// needs to quit before its completion.
1415 /// </summary>
1416 public static bool IsWorkItemCanceled
1417 {
1418 get
1419 {
1420 return CurrentThreadEntry.CurrentWorkItem.IsCanceled;
1421 }
1422 }
1423  
1424 /// <summary>
1425 /// Checks if the work item has been cancelled, and if yes then abort the thread.
1426 /// Can be used with Cancel and timeout
1427 /// </summary>
1428 public static void AbortOnWorkItemCancel()
1429 {
1430 if (IsWorkItemCanceled)
1431 {
1432 Thread.CurrentThread.Abort();
1433 }
1434 }
1435  
1436 /// <summary>
1437 /// Thread Pool start information (readonly)
1438 /// </summary>
1439 public STPStartInfo STPStartInfo
1440 {
1441 get
1442 {
1443 return _stpStartInfo.AsReadOnly();
1444 }
1445 }
1446  
1447 public bool IsShuttingdown
1448 {
1449 get { return _shutdown; }
1450 }
1451  
1452 /// <summary>
1453 /// Return the local calculated performance counters
1454 /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true.
1455 /// </summary>
1456 public ISTPPerformanceCountersReader PerformanceCountersReader
1457 {
1458 get { return (ISTPPerformanceCountersReader)_localPCs; }
1459 }
1460  
1461 #endregion
1462  
1463 #region IDisposable Members
1464  
1465 public void Dispose()
1466 {
1467 if (!_isDisposed)
1468 {
1469 if (!_shutdown)
1470 {
1471 Shutdown();
1472 }
1473  
1474 if (null != _shuttingDownEvent)
1475 {
1476 _shuttingDownEvent.Close();
1477 _shuttingDownEvent = null;
1478 }
1479 _workerThreads.Clear();
1480  
1481 if (null != _isIdleWaitHandle)
1482 {
1483 _isIdleWaitHandle.Close();
1484 _isIdleWaitHandle = null;
1485 }
1486  
1487 _isDisposed = true;
1488 }
1489 }
1490  
1491 private void ValidateNotDisposed()
1492 {
1493 if(_isDisposed)
1494 {
1495 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
1496 }
1497 }
1498 #endregion
1499  
1500 #region WorkItemsGroupBase Overrides
1501  
1502 /// <summary>
1503 /// Get/Set the maximum number of work items that execute cocurrency on the thread pool
1504 /// </summary>
1505 public override int Concurrency
1506 {
1507 get { return MaxThreads; }
1508 set { MaxThreads = value; }
1509 }
1510  
1511 /// <summary>
1512 /// Get the number of work items in the queue.
1513 /// </summary>
1514 public override int WaitingCallbacks
1515 {
1516 get
1517 {
1518 ValidateNotDisposed();
1519 return _workItemsQueue.Count;
1520 }
1521 }
1522  
1523 /// <summary>
1524 /// Get an array with all the state objects of the currently running items.
1525 /// The array represents a snap shot and impact performance.
1526 /// </summary>
1527 public override object[] GetStates()
1528 {
1529 object[] states = _workItemsQueue.GetStates();
1530 return states;
1531 }
1532  
1533 /// <summary>
1534 /// WorkItemsGroup start information (readonly)
1535 /// </summary>
1536 public override WIGStartInfo WIGStartInfo
1537 {
1538 get { return _stpStartInfo.AsReadOnly(); }
1539 }
1540  
1541 /// <summary>
1542 /// Start the thread pool if it was started suspended.
1543 /// If it is already running, this method is ignored.
1544 /// </summary>
1545 public override void Start()
1546 {
1547 if (!_isSuspended)
1548 {
1549 return;
1550 }
1551 _isSuspended = false;
1552  
1553 ICollection workItemsGroups = _workItemsGroups.Values;
1554 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1555 {
1556 workItemsGroup.OnSTPIsStarting();
1557 }
1558  
1559 StartOptimalNumberOfThreads();
1560 }
1561  
1562 /// <summary>
1563 /// Cancel all work items using thread abortion
1564 /// </summary>
1565 /// <param name="abortExecution">True to stop work items by raising ThreadAbortException</param>
1566 public override void Cancel(bool abortExecution)
1567 {
1568 _canceledSmartThreadPool.IsCanceled = true;
1569 _canceledSmartThreadPool = new CanceledWorkItemsGroup();
1570  
1571 ICollection workItemsGroups = _workItemsGroups.Values;
1572 foreach (WorkItemsGroup workItemsGroup in workItemsGroups)
1573 {
1574 workItemsGroup.Cancel(abortExecution);
1575 }
1576  
1577 if (abortExecution)
1578 {
1579 foreach (ThreadEntry threadEntry in _workerThreads.Values)
1580 {
1581 WorkItem workItem = threadEntry.CurrentWorkItem;
1582 if (null != workItem &&
1583 threadEntry.AssociatedSmartThreadPool == this &&
1584 !workItem.IsCanceled)
1585 {
1586 threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true);
1587 }
1588 }
1589 }
1590 }
1591  
1592 /// <summary>
1593 /// Wait for the thread pool to be idle
1594 /// </summary>
1595 public override bool WaitForIdle(int millisecondsTimeout)
1596 {
1597 ValidateWaitForIdle();
1598 return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
1599 }
1600  
1601 /// <summary>
1602 /// This event is fired when all work items are completed.
1603 /// (When IsIdle changes to true)
1604 /// This event only work on WorkItemsGroup. On SmartThreadPool
1605 /// it throws the NotImplementedException.
1606 /// </summary>
1607 public override event WorkItemsGroupIdleHandler OnIdle
1608 {
1609 add
1610 {
1611 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
1612 //_onIdle += value;
1613 }
1614 remove
1615 {
1616 throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature.");
1617 //_onIdle -= value;
1618 }
1619 }
1620  
1621 internal override void PreQueueWorkItem()
1622 {
1623 ValidateNotDisposed();
1624 }
1625  
1626 #endregion
1627  
1628 #region Join, Choice, Pipe, etc.
1629  
1630 /// <summary>
1631 /// Executes all actions in parallel.
1632 /// Returns when they all finish.
1633 /// </summary>
1634 /// <param name="actions">Actions to execute</param>
1635 public void Join(IEnumerable<Action> actions)
1636 {
1637 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1638 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1639 foreach (Action action in actions)
1640 {
1641 workItemsGroup.QueueWorkItem(action);
1642 }
1643 workItemsGroup.Start();
1644 workItemsGroup.WaitForIdle();
1645 }
1646  
1647 /// <summary>
1648 /// Executes all actions in parallel.
1649 /// Returns when they all finish.
1650 /// </summary>
1651 /// <param name="actions">Actions to execute</param>
1652 public void Join(params Action[] actions)
1653 {
1654 Join((IEnumerable<Action>)actions);
1655 }
1656  
1657 private class ChoiceIndex
1658 {
1659 public int _index = -1;
1660 }
1661  
1662 /// <summary>
1663 /// Executes all actions in parallel
1664 /// Returns when the first one completes
1665 /// </summary>
1666 /// <param name="actions">Actions to execute</param>
1667 public int Choice(IEnumerable<Action> actions)
1668 {
1669 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1670 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo);
1671  
1672 ManualResetEvent anActionCompleted = new ManualResetEvent(false);
1673  
1674 ChoiceIndex choiceIndex = new ChoiceIndex();
1675  
1676 int i = 0;
1677 foreach (Action action in actions)
1678 {
1679 Action act = action;
1680 int value = i;
1681 workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); });
1682 ++i;
1683 }
1684 workItemsGroup.Start();
1685 anActionCompleted.WaitOne();
1686  
1687 return choiceIndex._index;
1688 }
1689  
1690 /// <summary>
1691 /// Executes all actions in parallel
1692 /// Returns when the first one completes
1693 /// </summary>
1694 /// <param name="actions">Actions to execute</param>
1695 public int Choice(params Action[] actions)
1696 {
1697 return Choice((IEnumerable<Action>)actions);
1698 }
1699  
1700 /// <summary>
1701 /// Executes actions in sequence asynchronously.
1702 /// Returns immediately.
1703 /// </summary>
1704 /// <param name="pipeState">A state context that passes </param>
1705 /// <param name="actions">Actions to execute in the order they should run</param>
1706 public void Pipe<T>(T pipeState, IEnumerable<Action<T>> actions)
1707 {
1708 WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true };
1709 IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo);
1710 foreach (Action<T> action in actions)
1711 {
1712 Action<T> act = action;
1713 workItemsGroup.QueueWorkItem(() => act(pipeState));
1714 }
1715 workItemsGroup.Start();
1716 workItemsGroup.WaitForIdle();
1717 }
1718  
1719 /// <summary>
1720 /// Executes actions in sequence asynchronously.
1721 /// Returns immediately.
1722 /// </summary>
1723 /// <param name="pipeState"></param>
1724 /// <param name="actions">Actions to execute in the order they should run</param>
1725 public void Pipe<T>(T pipeState, params Action<T>[] actions)
1726 {
1727 Pipe(pipeState, (IEnumerable<Action<T>>)actions);
1728 }
1729 #endregion
1730 }
1731 #endregion
1732 }