opensim-development – Rev 1

Subversion Repositories:
Rev:
using System;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Diagnostics;

namespace Amib.Threading.Internal
{

        #region WorkItemsGroup class 

        /// <summary>
        /// Summary description for WorkItemsGroup.
        /// </summary>
        public class WorkItemsGroup : WorkItemsGroupBase
        {
                #region Private members

                private readonly object _lock = new object();

                /// <summary>
                /// A reference to the SmartThreadPool instance that created this 
                /// WorkItemsGroup.
                /// </summary>
                private readonly SmartThreadPool _stp;

                /// <summary>
                /// The OnIdle event
                /// </summary>
                private event WorkItemsGroupIdleHandler _onIdle;

        /// <summary>
        /// A flag to indicate if the Work Items Group is now suspended.
        /// </summary>
        private bool _isSuspended;

                /// <summary>
                /// Defines how many work items of this WorkItemsGroup can run at once.
                /// </summary>
                private int _concurrency;

                /// <summary>
                /// Priority queue to hold work items before they are passed 
                /// to the SmartThreadPool.
                /// </summary>
                private readonly PriorityQueue _workItemsQueue;

                /// <summary>
                /// Indicate how many work items are waiting in the SmartThreadPool
                /// queue.
                /// This value is used to apply the concurrency.
                /// </summary>
                private int _workItemsInStpQueue;

                /// <summary>
                /// Indicate how many work items are currently running in the SmartThreadPool.
                /// This value is used with the Cancel, to calculate if we can send new 
                /// work items to the STP.
                /// </summary>
                private int _workItemsExecutingInStp = 0;

                /// <summary>
                /// WorkItemsGroup start information
                /// </summary>
                private readonly WIGStartInfo _workItemsGroupStartInfo;

                /// <summary>
                /// Signaled when all of the WorkItemsGroup's work item completed.
                /// </summary>
        //private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
        private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);

                /// <summary>
                /// A common object for all the work items that this work items group
                /// generate so we can mark them to cancel in O(1)
                /// </summary>
                private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();

                #endregion 

                #region Construction

            public WorkItemsGroup(
                        SmartThreadPool stp, 
                        int concurrency, 
                        WIGStartInfo wigStartInfo)
                {
                        if (concurrency <= 0)
                        {
                                throw new ArgumentOutOfRangeException(
                    "concurrency",
#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
                    concurrency,
#endif
 "concurrency must be greater than zero");
                        }
                        _stp = stp;
                        _concurrency = concurrency;
                        _workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
                        _workItemsQueue = new PriorityQueue();
                Name = "WorkItemsGroup";

                        // The _workItemsInStpQueue gets the number of currently executing work items,
                        // because once a work item is executing, it cannot be cancelled.
                        _workItemsInStpQueue = _workItemsExecutingInStp;

            _isSuspended = _workItemsGroupStartInfo.StartSuspended;
                }

                #endregion 

        #region WorkItemsGroupBase Overrides

        public override int Concurrency
        {
            get { return _concurrency; }
            set
            {
                Debug.Assert(value > 0);

                int diff = value - _concurrency;
                _concurrency = value;
                if (diff > 0)
                {
                    EnqueueToSTPNextNWorkItem(diff);
                }
            }
        }

        public override int WaitingCallbacks
        {
            get { return _workItemsQueue.Count; }
        }

        public override object[] GetStates()
        {
            lock (_lock)
            {
                object[] states = new object[_workItemsQueue.Count];
                int i = 0;
                foreach (WorkItem workItem in _workItemsQueue)
                {
                    states[i] = workItem.GetWorkItemResult().State;
                    ++i;
                }
                return states;
            }
        }

            /// <summary>
        /// WorkItemsGroup start information
        /// </summary>
        public override WIGStartInfo WIGStartInfo
        {
            get { return _workItemsGroupStartInfo; }
        }

            /// <summary>
            /// Start the Work Items Group if it was started suspended
            /// </summary>
            public override void Start()
            {
                // If the Work Items Group already started then quit
                if (!_isSuspended)
                {
                    return;
                }
                _isSuspended = false;
            
                EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
            }

            public override void Cancel(bool abortExecution)
            {
                lock (_lock)
                {
                    _canceledWorkItemsGroup.IsCanceled = true;
                    _workItemsQueue.Clear();
                    _workItemsInStpQueue = 0;
                    _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
                }

                if (abortExecution)
                {
                    _stp.CancelAbortWorkItemsGroup(this);
                }
            }

            /// <summary>
        /// Wait for the thread pool to be idle
        /// </summary>
        public override bool WaitForIdle(int millisecondsTimeout)
        {
            SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
            return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
        }

            public override event WorkItemsGroupIdleHandler OnIdle
                {
                        add { _onIdle += value; }
                        remove { _onIdle -= value; }
                }

            #endregion 

                #region Private methods

            private void RegisterToWorkItemCompletion(IWorkItemResult wir)
                {
                        IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
                        iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
                        iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
                }

            public void OnSTPIsStarting()
                {
            if (_isSuspended)
            {
                return;
            }
                        
            EnqueueToSTPNextNWorkItem(_concurrency);
                }

            public void EnqueueToSTPNextNWorkItem(int count)
        {
            for (int i = 0; i < count; ++i)
            {
                EnqueueToSTPNextWorkItem(null, false);
            }
        }

                private object FireOnIdle(object state)
                {
                        FireOnIdleImpl(_onIdle);
                        return null;
                }

                [MethodImpl(MethodImplOptions.NoInlining)]
                private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
                {
                        if(null == onIdle)
                        {
                                return;
                        }

                        Delegate[] delegates = onIdle.GetInvocationList();
                        foreach(WorkItemsGroupIdleHandler eh in delegates)
                        {
                                try
                                {
                                        eh(this);
                                }
                catch { }  // Suppress exceptions
                        }
                }

                private void OnWorkItemStartedCallback(WorkItem workItem)
                {
                        lock(_lock)
                        {
                                ++_workItemsExecutingInStp;
                        }
                }

                private void OnWorkItemCompletedCallback(WorkItem workItem)
                {
                        EnqueueToSTPNextWorkItem(null, true);
                }

        internal override void Enqueue(WorkItem workItem)
        {
            EnqueueToSTPNextWorkItem(workItem);
        }

            private void EnqueueToSTPNextWorkItem(WorkItem workItem)
                {
                        EnqueueToSTPNextWorkItem(workItem, false);
                }

                private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
                {
                        lock(_lock)
                        {
                                // Got here from OnWorkItemCompletedCallback()
                                if (decrementWorkItemsInStpQueue)
                                {
                                        --_workItemsInStpQueue;

                                        if(_workItemsInStpQueue < 0)
                                        {
                                                _workItemsInStpQueue = 0;
                                        }

                                        --_workItemsExecutingInStp;

                                        if(_workItemsExecutingInStp < 0)
                                        {
                                                _workItemsExecutingInStp = 0;
                                        }
                                }

                                // If the work item is not null then enqueue it
                                if (null != workItem)
                                {
                                        workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;

                                        RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
                                        _workItemsQueue.Enqueue(workItem);
                                        //_stp.IncrementWorkItemsCount();

                                        if ((1 == _workItemsQueue.Count) && 
                                                (0 == _workItemsInStpQueue))
                                        {
                                                _stp.RegisterWorkItemsGroup(this);
                        IsIdle = false;
                        _isIdleWaitHandle.Reset();
                                        }
                                }

                                // If the work items queue of the group is empty than quit
                                if (0 == _workItemsQueue.Count)
                                {
                                        if (0 == _workItemsInStpQueue)
                                        {
                                                _stp.UnregisterWorkItemsGroup(this);
                        IsIdle = true;
                        _isIdleWaitHandle.Set();
                        if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
                        {
                            _stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
                        }
                                        }
                                        return;
                                }

                if (!_isSuspended)
                                {
                                        if (_workItemsInStpQueue < _concurrency)
                                        {
                                                WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
                        try
                        {
                            _stp.Enqueue(nextWorkItem);
                        }
                        catch (ObjectDisposedException e)
                        {
                            e.GetHashCode();
                            // The STP has been shutdown
                        }

                                                ++_workItemsInStpQueue;
                                        }
                                }
                        }
                }

                #endregion
    }

        #endregion
}