threadpool.h 17.3 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

/* Modifications Copyright (c) Microsoft. */

#pragma once
#include <string>
#include <vector>
#include <functional>
#include <memory>
#include "core/common/common.h"
#include "core/platform/env.h"

#include <functional>
#include <memory>

// ORT thread pool overview
// ------------------------
//
// The ORT thread pool implementation is split into two layers.  This
// file provides the high-level component.  See the accompanying
// comments in EigenNonBlockingThreadPool.h for the low-level
// component.
//
// threadpool.h defines the user-facing functions for use in
// operators.  The main abstraction are parallel loops
// (ThreadPool::TryParallelFor*), although we also support scheduling
// of asynchronous tasks (ThreadPool::Schedule), and the construction
// of multi-loop parallel sections (ThreadPool::ParallelSection).
//
// This high level API is accessed via static methods on the
// ThreadPool class.  These methods map the operations onto one of
// three low-level implementations: (#1) direct execution of the
// operations if there is no thread pool configured, (#2) execution of
// the operations using the modified Eigen threadpool, (#3) execution
// of the operations using OpenMP.  Option #1 enables execution in
// simple settings without needing threads.  Option #2 is the
// preferred approach for use in settings with parallelism.
//
// The high-level part of the thread pool is responsible for:
//
// - Exposing the desired degree of parallelism to user code, and to
//   libraries such as MLAS.  This lets the libraries tailor the
//   extent to which they parallelize work.
//
// - Handling trivial cases (such as directly running parallel loops
//   with only a single iteration, or with no iterations at all).
//
// - Deciding how to divide work efficiently between the threads
//   available.
//
//   The ThreadPool::TryParallelFor methods do this based on cost
//   estimates supplied by the caller, and are designed to support
//   loops with small amounts of work per iteration.  The loop body is
//   supplied as a function taking a [start,end) range of iterations
//   to execute (avoiding the need for per-iteration std::function
//   calls, or a reliance upon inlining to avoid those calls).
//
//   ThreadPool::TrySimpleParallelFor uses a simpler single-iteration
//   API based on the assumption that the caller has divided work to
//   an appropriate granularity.
//
// - When used with the Eigen-based thread pool, the implementation of
//   all of the loops maps down onto
//   ThreadPool::ParallelForFixedBlockSizeScheduling.  This method
//   takes the degree of parallelism (d_of_p) and work distribution
//   block size (from the cost-based heuristics), and creates a set of
//   tasks in the underlying thread pool (via
//   ThreadPool::RunInParallel).
//
//   These tasks then run a loop which picks off batches of iterations
//   from the user's code.  The distribution of these batches is
//   handled dynmamically via LoopCounter::ClaimIterations.  This
//   dynamic balancing behavior helps make performance robust to any
//   variability in the execution time across iterations, and to
//   situations such as multiple loops running concurrently on the
//   same thread pool.
//
// - When running a series of loops inside a parallel section, the
//   LoopCounter also helps obtain affinity between these loops (i.e.,
//   iteration X of one loop will tend to run on the same thread that
//   ran iteration X of prior loops).  This locality helps improve hit
//   rates in per-core caches across the series of short loops used in
//   operators like GRU.
//
// There are some known areas for exploration here:
//
// - The cost-based heuristics were developed prior to recent changes
//   to the thread pool.  The heuristics seem to work well, but we
//   should revisit the tuning periodically.
//
// - Can we unify the APIs for the different kinds of parallel loop?
//
//   In particular, we may be able to replace the current use of
//   TryBatchParallelFor with appropriate costs for each call site,
//   and then use TryParallelFor.  This would allow for more dynamic
//   re-balancing of work between threads than the current
//   ThreadPool::PartitionWork function provides.
//
// - Given the extensive modifications to original Eigen code, should
//   we separate that out as a new class and remove the dependence on
//   other Eigen components.

// This file use PIMPL to avoid having eigen headers here
namespace Eigen {
class Allocator;
class ThreadPoolInterface;
}  // namespace Eigen

namespace onnxruntime {

struct TensorOpCost {
  double bytes_loaded;
  double bytes_stored;
  double compute_cycles;
};


namespace concurrency {

template <typename Environment>
class ThreadPoolTempl;

class ExtendedThreadPoolInterface;
class LoopCounter;
class ThreadPoolParallelSection;

class ThreadPool {
 public:
#ifdef _WIN32
  using NAME_CHAR_TYPE = wchar_t;
#else
  using NAME_CHAR_TYPE = char;
#endif
  // Constructs a pool for running with with "degree_of_parallelism" threads with
  // specified "name". env->StartThread() is used to create individual threads
  // with the given ThreadOptions. If "low_latency_hint" is true the thread pool
  // implementation may use it as a hint that lower latency is preferred at the
  // cost of higher CPU usage, e.g. by letting one or more idle threads spin
  // wait. Conversely, if the threadpool is used to schedule high-latency
  // operations like I/O the hint should be set to false.
  //
  // REQUIRES: degree_of_parallelism > 0
  ThreadPool(Env* env,
             const ThreadOptions& thread_options,
             const NAME_CHAR_TYPE* name,
             int degree_of_parallelism,
             bool low_latency_hint);

  // Waits until all scheduled work has finished and then destroy the
  // set of threads.
  ~ThreadPool();

  // Start and end a multi-loop parallel section.  Parallel loops can
  // be executed directly (without using this API), but entering a
  // parallel section allows the runtime system to amortize loop
  // entry/exit costs over multiple loops, and allows it to promote
  // affinity between corresponding iterations of different loops.
  //
  // Multi-loop sections would typically be used in cases where a
  // series of loops executes without much code in between them, and
  // where it is impractical to refactor code into a single loop.  For
  // instance:
  //
  // {
  //   onnxruntime::concurrency::ThreadPoool::ParallelSection ps(tp);
  //   for (int x = 0; x < seq_len; x++) {
  //     TrySimpleParallelFor(tp, 16, [&]() { ... });
  //   }
  // }
  //
  // The parallel section is entered via the constructor of
  // ThreadPool::ParallelSection, and exited via the destructor.
  // Currently, thread-local state is used to track whether or not the
  // current thread is inside a parallel section.  In contrast to
  // handling parallel section objects explicitly in user code, this
  // approach allows code such as MLAS to operate with/without the use
  // of parallel sections.
  //
  // Parallel sections are only implemented with the Eigen threadpool.
  // They have no effect when using OpenMP.
  //
  // Parallel sections may not be nested, and may not be used inside
  // parallel loops.

  class ParallelSection {
  public:
    explicit ParallelSection(ThreadPool *tp);
    ~ParallelSection();

  private:
    friend class ThreadPool;

    // Owning reference for the underlying ThreadPoolParallelSection
    // which implements the thread management.  We use an explicit
    // deleter here so that the definition of
    // ThreadPoolParallelSection does not need to be available at this
    // point to avoid a dependence on the Eigen headers.
    std::unique_ptr<ThreadPoolParallelSection, void(*)(ThreadPoolParallelSection*)>
      ps_{nullptr, [](ThreadPoolParallelSection*){}};
#ifndef _OPENMP
    ThreadPool *tp_;
#endif
    ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(ParallelSection);

    // Non-owning reference to the current thread's paralel section
    // (or nullptr outside parallel sections).
    static thread_local ParallelSection *current_parallel_section;
    static_assert(std::is_trivially_destructible<decltype(current_parallel_section)>::value,
                  "Per-thread state should be trivially destructible");
  };

  // Schedules fn() for execution in the pool of threads.  The function may run
  // synchronously if it cannot be enqueued.  This will occur if the thread pool's
  // degree-of-parallelism is 1, but it may also occur for implementation-dependent
  // reasons such as if queues used for buffering work are full.
  static void Schedule(ThreadPool* tp,
                       std::function<void()> fn) {
    if (tp) {
      tp->Schedule(fn);
    } else {
      fn();
    }
  }

  // ParallelFor shards the "total" units of work assuming each unit of work
  // having roughly "cost_per_unit" cost, in cycles. Each unit of work is
  // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
  // and the total cost of each shard is roughly the same.
  //
  // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds
  // if not CPU-bound) to complete a unit of work. Overestimating creates too
  // many shards and CPU time will be dominated by per-shard overhead, such as
  // Context creation. Underestimating may not fully make use of the specified
  // parallelism, and may also cause inefficiencies due to load balancing
  // issues and stragglers.

  static void TryParallelFor(ThreadPool* tp, std::ptrdiff_t total, double cost_per_unit,
                             const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn) {
    TryParallelFor(tp, total, TensorOpCost{0, 0, static_cast<double>(cost_per_unit)}, fn);
  }

  static void TryParallelFor(ThreadPool* tp, std::ptrdiff_t total, const TensorOpCost& cost_per_unit,
                             const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn);

  // Directly schedule the 'total' tasks to the underlying threadpool, without
  // cutting them by halves

  inline static void TrySimpleParallelFor(ThreadPool* tp, std::ptrdiff_t total,
                                          const std::function<void(std::ptrdiff_t)>& fn) {
#ifdef _OPENMP
    ORT_UNUSED_PARAMETER(tp);
#pragma omp parallel for
    for (std::ptrdiff_t i = 0; i < total; ++i) {
      fn(i);
    }
#else
    if (tp != nullptr) {
      tp->SimpleParallelFor(total, fn);
    } else {
      for (std::ptrdiff_t i = 0; i < total; ++i) {
        // In many cases, fn can be inlined here.
        fn(i);
      }
    }
#endif
  }

  /**
   * Tries to call the given function in parallel, with calls split into (num_batches) batches.
   *\param num_batches If it is zero, it will be replaced to the value of DegreeOfParallelism().
   *\param fn A std::function or STL style functor with signature of "void f(std::ptrdiff_t);"
   * Pitfall: Caller should cap `num_batches` to a reasonable value based on the cost of `fn` and the value of `total`.
   *For example, if fn is as simple as: int sum=0; fn = [&](int i){sum +=i;} and `total` is 100, then num_batches should
   *be just 1.
   *
   * ```
   **/
  template <typename F>
  inline static void TryBatchParallelFor(ThreadPool* tp, std::ptrdiff_t total, F&& fn, std::ptrdiff_t num_batches) {
#ifdef _OPENMP
    ORT_UNUSED_PARAMETER(tp);
    ORT_UNUSED_PARAMETER(num_batches);
#pragma omp parallel for
    for (std::ptrdiff_t i = 0; i < total; ++i) {
      fn(i);
    }
#else
    if (tp == nullptr) {
      for (std::ptrdiff_t i = 0; i < total; ++i) {
        // In many cases, fn can be inlined here.
        fn(i);
      }
      return;
    }
    if (total <= 0)
      return;

    if (total == 1) {
      fn(0);
      return;
    }

    if (num_batches <= 0) {
      num_batches = std::min<ptrdiff_t>(total, DegreeOfParallelism(tp));
    }

    if (num_batches <= 1) {
      for (int i = 0; i < total; i++) {
        fn(i);
      }
      return;
    }

    tp->SimpleParallelFor(num_batches, [&](std::ptrdiff_t batch_index) {
      auto work = PartitionWork(batch_index, num_batches, total);
      for (std::ptrdiff_t i = work.start; i < work.end; i++) {
        fn(i);
      }
    });
#endif
  }

  struct WorkInfo {
    std::ptrdiff_t start;
    std::ptrdiff_t end;
  };

  /** Calculate the start and end offsets for a batch.
      @remarks Based on MlasPartitionWork
  */
  static WorkInfo PartitionWork(std::ptrdiff_t batch_idx, std::ptrdiff_t num_batches, std::ptrdiff_t total_work) {
    const std::ptrdiff_t work_per_batch = total_work / num_batches;
    const std::ptrdiff_t work_per_batch_extra = total_work % num_batches;

    WorkInfo info;
    if (batch_idx < work_per_batch_extra) {
      info.start = (work_per_batch + 1) * batch_idx;
      info.end = info.start + work_per_batch + 1;
    } else {
      info.start = work_per_batch * batch_idx + work_per_batch_extra;
      info.end = info.start + work_per_batch;
    }

    return info;
  }

  //......................................................................
  //
  // The following static methods take into account whether OpenMP is
  // enabled/disabled, and if the thread pool pointer is nullptr
  // during sequential execution.

  // Provide a hint to the caller for whether or not to parallelize
  // work.  This lets a caller switch to a sequential version of an
  // algorithm rather than using calls via the ParallelFor functions.

  static bool ShouldParallelize(const ThreadPool* tp);

  // Return the degree of parallelism that code should assume when using the thread pool.
  // It decouples the degree of parallelism for use with the thread pool from
  // the implementation choice of whether this matches the number of threads created in
  // the pool.
  //
  // Currently, a loop with degree-of-parallelism N is supported by a pool of N-1 threads
  // working in combination with the thread initiating the loop.
  static int DegreeOfParallelism(const ThreadPool* tp);

  ORT_DISALLOW_COPY_AND_ASSIGNMENT(ThreadPool);

 private:
  friend class LoopCounter;

  // Returns the number of threads created in the pool.  This may be different from the
  // value returned by DegreeOfParallelism to code using the pool.
  int NumThreads() const;

  // Returns current thread id between 0 and NumThreads() - 1, if called from a
  // thread in the pool. Returns -1 otherwise.
  int CurrentThreadId() const;

  // Run fn with up to n degree-of-parallelism enlisting the thread pool for
  // help.  The degree-of-parallelism includes the caller, and so if n==1
  // then the function will run directly in the caller.  The fork-join
  // synchronization is handled in the thread pool, and so any state captured
  // by fn() is safe from concurrent access once RunWithHelp returns.
  void RunInParallel(std::function<void(unsigned idx)> fn, unsigned n);

  // Divides the work represented by the range [0, total) into k shards.
  // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k).
  // Each shard may be executed on a different thread in parallel, depending on
  // the number of threads available in the pool.
  // When (i+1)*block_size > total, fn(i*block_size, total) is called instead.
  // Requires 0 < block_size <= total.
  void ParallelForFixedBlockSizeScheduling(std::ptrdiff_t total, std::ptrdiff_t block_size,
                                           const std::function<void(std::ptrdiff_t, std::ptrdiff_t)>& fn);

  // Return whether or not the calling thread should run a loop of
  // num_iterations divided in chunks of block_size in parallel.  If not,
  // the caller should run the loop sequentially.
  bool ShouldParallelizeLoop(const std::ptrdiff_t num_iterations,
                             const std::ptrdiff_t block_size = 1) const;

  // Internal (non-static) parallel loop methods.  Unlike the public static methods,
  // these will not handle the cases of OpenMP builds. or builds without a threadpool.
  void ParallelFor(std::ptrdiff_t total, double cost_per_unit,
                   const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn);

  void ParallelFor(std::ptrdiff_t total, const TensorOpCost& cost_per_unit,
                   const std::function<void(std::ptrdiff_t first, std::ptrdiff_t)>& fn);

  void SimpleParallelFor(std::ptrdiff_t total, const std::function<void(std::ptrdiff_t)>& fn);

  void Schedule(std::function<void()> fn);

  ThreadOptions thread_options_;

  // If a thread pool is created with degree_of_parallelism != 1 then an underlying
  // EigenThreadPool is used to create OS threads and handle work distribution to them.
  // If degree_of_parallelism == 1 then underlying_threadpool_ is left as nullptr
  // and parallel work is run directly by the caller.
  ExtendedThreadPoolInterface* underlying_threadpool_ = nullptr;

  // If used, underlying_threadpool_ is instantiated and owned by the ThreadPool.
  std::unique_ptr<ThreadPoolTempl<Env> > extended_eigen_threadpool_;
};

}  // namespace concurrency
}  // namespace onnxruntime