src/corelib/concurrent/qtconcurrentiteratekernel.h
changeset 0 1918ee327afb
child 4 3b1da2848fc7
equal deleted inserted replaced
-1:000000000000 0:1918ee327afb
       
     1 /****************************************************************************
       
     2 **
       
     3 ** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
       
     4 ** All rights reserved.
       
     5 ** Contact: Nokia Corporation (qt-info@nokia.com)
       
     6 **
       
     7 ** This file is part of the QtCore module of the Qt Toolkit.
       
     8 **
       
     9 ** $QT_BEGIN_LICENSE:LGPL$
       
    10 ** No Commercial Usage
       
    11 ** This file contains pre-release code and may not be distributed.
       
    12 ** You may use this file in accordance with the terms and conditions
       
    13 ** contained in the Technology Preview License Agreement accompanying
       
    14 ** this package.
       
    15 **
       
    16 ** GNU Lesser General Public License Usage
       
    17 ** Alternatively, this file may be used under the terms of the GNU Lesser
       
    18 ** General Public License version 2.1 as published by the Free Software
       
    19 ** Foundation and appearing in the file LICENSE.LGPL included in the
       
    20 ** packaging of this file.  Please review the following information to
       
    21 ** ensure the GNU Lesser General Public License version 2.1 requirements
       
    22 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
       
    23 **
       
    24 ** In addition, as a special exception, Nokia gives you certain additional
       
    25 ** rights.  These rights are described in the Nokia Qt LGPL Exception
       
    26 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
       
    27 **
       
    28 ** If you have questions regarding the use of this file, please contact
       
    29 ** Nokia at qt-info@nokia.com.
       
    30 **
       
    31 **
       
    32 **
       
    33 **
       
    34 **
       
    35 **
       
    36 **
       
    37 **
       
    38 ** $QT_END_LICENSE$
       
    39 **
       
    40 ****************************************************************************/
       
    41 
       
    42 #ifndef QTCONCURRENT_ITERATEKERNEL_H
       
    43 #define QTCONCURRENT_ITERATEKERNEL_H
       
    44 
       
    45 #include <QtCore/qglobal.h>
       
    46 
       
    47 #ifndef QT_NO_CONCURRENT
       
    48 
       
    49 #include <QtCore/qatomic.h>
       
    50 #include <QtCore/qtconcurrentmedian.h>
       
    51 #include <QtCore/qtconcurrentthreadengine.h>
       
    52 
       
    53 #ifndef QT_NO_STL
       
    54 #  include <iterator>
       
    55 #endif
       
    56 
       
    57 QT_BEGIN_HEADER
       
    58 QT_BEGIN_NAMESPACE
       
    59 
       
    60 QT_MODULE(Core)
       
    61 
       
    62 #ifndef qdoc
       
    63 
       
    64 namespace QtConcurrent {
       
    65 
       
    66 #ifndef QT_NO_STL
       
    67     using std::advance;
       
    68 #else
       
    69     template <typename It, typename T>
       
    70     void advance(It &it, T value)
       
    71     {
       
    72         it+=value;
       
    73     }
       
    74 #endif
       
    75 
       
    76 /*
       
    77     The BlockSizeManager class manages how many iterations a thread should
       
    78     reserve and process at a time. This is done by measuring the time spent
       
    79     in the user code versus the control part code, and then increasing
       
    80     the block size if the ratio between them is to small. The block size
       
    81     management is done on the basis of the median of several timing measuremens,
       
    82     and it is done induvidualy for each thread.
       
    83 */
       
    84 class Q_CORE_EXPORT BlockSizeManager
       
    85 {
       
    86 public:
       
    87     BlockSizeManager(int iterationCount);
       
    88     void timeBeforeUser();
       
    89     void timeAfterUser();
       
    90     int blockSize();
       
    91 private:
       
    92     inline bool blockSizeMaxed()
       
    93     {
       
    94         return (m_blockSize >= maxBlockSize);
       
    95     }
       
    96 
       
    97     const int maxBlockSize;
       
    98     qint64 beforeUser;
       
    99     qint64 afterUser;
       
   100     Median<double> controlPartElapsed;
       
   101     Median<double> userPartElapsed;
       
   102     int m_blockSize;
       
   103 };
       
   104 
       
   105 template <typename T>
       
   106 class ResultReporter
       
   107 {
       
   108 public:
       
   109     ResultReporter(ThreadEngine<T> *_threadEngine)
       
   110     :threadEngine(_threadEngine)
       
   111     {
       
   112 
       
   113     }
       
   114 
       
   115     void reserveSpace(int resultCount)
       
   116     {
       
   117         currentResultCount = resultCount;
       
   118         vector.resize(qMax(resultCount, vector.count()));
       
   119     }
       
   120 
       
   121     void reportResults(int begin)
       
   122     {
       
   123         const int useVectorThreshold = 4; // Tunable parameter.
       
   124         if (currentResultCount > useVectorThreshold) {
       
   125             vector.resize(currentResultCount);
       
   126             threadEngine->reportResults(vector, begin);
       
   127         } else {
       
   128             for (int i = 0; i < currentResultCount; ++i)
       
   129                 threadEngine->reportResult(&vector.at(i), begin + i);
       
   130         }
       
   131     }
       
   132 
       
   133     inline T * getPointer()
       
   134     {
       
   135         return vector.data();
       
   136     }
       
   137 
       
   138     int currentResultCount;
       
   139     ThreadEngine<T> *threadEngine;
       
   140     QVector<T> vector;
       
   141 };
       
   142 
       
   143 template <>
       
   144 class ResultReporter<void>
       
   145 {
       
   146 public:
       
   147     inline ResultReporter(ThreadEngine<void> *) { }
       
   148     inline void reserveSpace(int) { };
       
   149     inline void reportResults(int) { };
       
   150     inline void * getPointer() { return 0; }
       
   151 };
       
   152 
       
   153 #ifndef QT_NO_STL
       
   154 inline bool selectIteration(std::bidirectional_iterator_tag)
       
   155 {
       
   156     return false; // while
       
   157 }
       
   158 
       
   159 inline bool selectIteration(std::forward_iterator_tag)
       
   160 {
       
   161     return false; // while
       
   162 }
       
   163 
       
   164 inline bool selectIteration(std::random_access_iterator_tag)
       
   165 {
       
   166     return true; // for
       
   167 }
       
   168 #else
       
   169 // no stl support, always use while iteration
       
   170 template <typename T>
       
   171 inline bool selectIteration(T)
       
   172 {
       
   173     return false; // while
       
   174 }
       
   175 #endif
       
   176 
       
   177 template <typename Iterator, typename T>
       
   178 class IterateKernel : public ThreadEngine<T>
       
   179 {
       
   180 public:
       
   181     typedef T ResultType;
       
   182 
       
   183     IterateKernel(Iterator _begin, Iterator _end)
       
   184 #if defined (QT_NO_STL)
       
   185         : begin(_begin), end(_end), current(_begin), currentIndex(0),
       
   186            forIteration(false), progressReportingEnabled(true)
       
   187 #elif !defined(QT_NO_PARTIAL_TEMPLATE_SPECIALIZATION)
       
   188         : begin(_begin), end(_end), current(_begin), currentIndex(0),
       
   189            forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true)
       
   190 #else
       
   191         : begin(_begin), end(_end), currentIndex(0),
       
   192           forIteration(selectIteration(std::iterator_category(_begin))), progressReportingEnabled(true)
       
   193 #endif
       
   194     {
       
   195 #if defined (QT_NO_STL)
       
   196        iterationCount = 0;
       
   197 #else
       
   198         iterationCount =  forIteration ? std::distance(_begin, _end) : 0;
       
   199 
       
   200 #endif
       
   201     }
       
   202 
       
   203     virtual ~IterateKernel() { }
       
   204 
       
   205     virtual bool runIteration(Iterator it, int index , T *result)
       
   206         { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; }
       
   207     virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results)
       
   208         { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; }
       
   209 
       
   210     void start()
       
   211     {
       
   212         progressReportingEnabled = this->isProgressReportingEnabled();
       
   213         if (progressReportingEnabled && iterationCount > 0)
       
   214             this->setProgressRange(0, iterationCount);
       
   215     }
       
   216 
       
   217     bool shouldStartThread()
       
   218     {
       
   219         if (forIteration)
       
   220             return (currentIndex < iterationCount) && !this->shouldThrottleThread();
       
   221         else // whileIteration
       
   222             return (iteratorThreads == 0);
       
   223     }
       
   224 
       
   225     ThreadFunctionResult threadFunction()
       
   226     {
       
   227         if (forIteration)
       
   228             return this->forThreadFunction();
       
   229         else // whileIteration
       
   230             return this->whileThreadFunction();
       
   231     }
       
   232 
       
   233     ThreadFunctionResult forThreadFunction()
       
   234     {
       
   235         BlockSizeManager blockSizeManager(iterationCount);
       
   236         ResultReporter<T> resultReporter(this);
       
   237 
       
   238         for(;;) {
       
   239             if (this->isCanceled())
       
   240                 break;
       
   241 
       
   242             const int currentBlockSize = blockSizeManager.blockSize();
       
   243 
       
   244             if (currentIndex >= iterationCount)
       
   245                 break;
       
   246 
       
   247             // Atomically reserve a block of iterationCount for this thread.
       
   248             const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
       
   249             const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
       
   250 
       
   251             if (beginIndex >= endIndex) {
       
   252                 // No more work
       
   253                 break;
       
   254             }
       
   255 
       
   256             this->waitForResume(); // (only waits if the qfuture is paused.)
       
   257 
       
   258             if (shouldStartThread())
       
   259                 this->startThread();
       
   260 
       
   261             const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
       
   262             resultReporter.reserveSpace(finalBlockSize);
       
   263 
       
   264             // Call user code with the current iteration range.
       
   265             blockSizeManager.timeBeforeUser();
       
   266             const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
       
   267             blockSizeManager.timeAfterUser();
       
   268 
       
   269             if (resultsAvailable)
       
   270                 resultReporter.reportResults(beginIndex);
       
   271 
       
   272             // Report progress if progress reporting enabled.
       
   273             if (progressReportingEnabled) {
       
   274                 completed.fetchAndAddAcquire(finalBlockSize);
       
   275                 this->setProgressValue(this->completed);
       
   276             }
       
   277 
       
   278             if (this->shouldThrottleThread())
       
   279                 return ThrottleThread;
       
   280         }
       
   281         return ThreadFinished;
       
   282     }
       
   283 
       
   284     ThreadFunctionResult whileThreadFunction()
       
   285     {
       
   286         if (iteratorThreads.testAndSetAcquire(0, 1) == false)
       
   287             return ThreadFinished;
       
   288 
       
   289         ResultReporter<T> resultReporter(this);
       
   290         resultReporter.reserveSpace(1);
       
   291 
       
   292         while (current != end) {
       
   293             // The following two lines breaks support for input iterators according to
       
   294             // the sgi docs: dereferencing prev after calling ++current is not allowed
       
   295             // on input iterators. (prev is dereferenced inside user.runIteration())
       
   296             Iterator prev = current;
       
   297             ++current;
       
   298             int index = currentIndex.fetchAndAddRelaxed(1);
       
   299             iteratorThreads.testAndSetRelease(1, 0);
       
   300 
       
   301             this->waitForResume(); // (only waits if the qfuture is paused.)
       
   302 
       
   303             if (shouldStartThread())
       
   304                 this->startThread();
       
   305 
       
   306             const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
       
   307             if (resultAavailable)
       
   308                 resultReporter.reportResults(index);
       
   309 
       
   310             if (this->shouldThrottleThread())
       
   311                 return ThrottleThread;
       
   312 
       
   313             if (iteratorThreads.testAndSetAcquire(0, 1) == false)
       
   314                 return ThreadFinished;
       
   315         }
       
   316 
       
   317         return ThreadFinished;
       
   318     }
       
   319 
       
   320 
       
   321 public:
       
   322     const Iterator begin;
       
   323     const Iterator end;
       
   324     Iterator current;
       
   325     QAtomicInt currentIndex;
       
   326     bool forIteration;
       
   327     QAtomicInt iteratorThreads;
       
   328     int iterationCount;
       
   329 
       
   330     bool progressReportingEnabled;
       
   331     QAtomicInt completed;
       
   332 };
       
   333 
       
   334 } // namespace QtConcurrent
       
   335 
       
   336 #endif //qdoc
       
   337 
       
   338 QT_END_NAMESPACE
       
   339 QT_END_HEADER
       
   340 
       
   341 #endif // QT_NO_CONCURRENT
       
   342 
       
   343 #endif