7#ifndef PXR_BASE_WORK_LOOPS_H
8#define PXR_BASE_WORK_LOOPS_H
12#include "pxr/base/work/api.h"
14#include "pxr/base/work/impl.h"
23PXR_NAMESPACE_OPEN_SCOPE
25using Work_DiagnosticTransports = tbb::concurrent_vector<TfDiagnosticTransport>;
28class Work_LoopsTaskWrapper
31 Work_LoopsTaskWrapper(
33 Work_DiagnosticTransports *diagnostics)
35 , _diagnostics(diagnostics) {}
37 template <
typename ... Args>
38 void operator()(Args&&... args)
const {
40 _callback(std::forward<Args>(args)...);
42 *_diagnostics->grow_by(1) = trap.
Transport();
48 Work_DiagnosticTransports *_diagnostics;
52class Work_MallocTagsLoopsTaskWrapper
55 Work_MallocTagsLoopsTaskWrapper(
57 Work_DiagnosticTransports *diagnostics)
59 , _diagnostics(diagnostics)
60 , _mallocTagStack(
TfMallocTag::GetCurrentStackState()) {}
62 template <
typename ... Args>
63 void operator()(Args&&... args)
const {
66 _callback(std::forward<Args>(args)...);
68 *_diagnostics->grow_by(1) = trap.
Transport();
74 Work_DiagnosticTransports *_diagnostics;
79class Work_LoopsForEachTaskWrapper
82 Work_LoopsForEachTaskWrapper(
84 Work_DiagnosticTransports *diagnostics)
86 , _diagnostics(diagnostics) {}
88 template <
typename Arg>
89 void operator()(Arg &&arg)
const {
91 _callback(std::forward<Arg>(arg));
93 *_diagnostics->grow_by(1) = trap.
Transport();
99 Work_DiagnosticTransports *_diagnostics;
103class Work_MallocTagsLoopsForEachTaskWrapper
106 Work_MallocTagsLoopsForEachTaskWrapper(
108 Work_DiagnosticTransports *diagnostics)
109 : _callback(callback)
110 , _diagnostics(diagnostics)
111 , _mallocTagStack(
TfMallocTag::GetCurrentStackState()) {}
113 template <
typename Arg>
114 void operator()(Arg &&arg)
const {
117 _callback(std::forward<Arg>(arg));
119 *_diagnostics->grow_by(1) = trap.
Transport();
125 Work_DiagnosticTransports *_diagnostics;
145 std::forward<Fn>(fn)(0, n);
163template <
typename Fn>
172 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
173 Work_DiagnosticTransports diagnosticTransports;
175 Work_MallocTagsLoopsTaskWrapper<Fn>
176 task(std::forward<Fn>(callback), &diagnosticTransports);
177 WorkImpl_ParallelForN(n, task, grainSize);
180 Work_LoopsTaskWrapper<Fn>
181 task(std::forward<Fn>(callback), &diagnosticTransports);
182 WorkImpl_ParallelForN(n, task, grainSize);
185 for (
auto &dt: diagnosticTransports) {
205template <
typename Fn>
225template <
typename RangeType,
typename Fn>
231 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
235#if defined WORK_IMPL_HAS_PARALLEL_FOR_TBB_RANGE
236 Work_DiagnosticTransports diagnosticTransports;
238 Work_MallocTagsLoopsTaskWrapper<Fn>
239 task(std::forward<Fn>(callback), &diagnosticTransports);
240 WorkImpl_ParallelForTBBRange(range, task);
243 Work_LoopsTaskWrapper<Fn>
244 task(std::forward<Fn>(callback), &diagnosticTransports);
245 WorkImpl_ParallelForTBBRange(range, task);
247 for (
auto &dt: diagnosticTransports) {
260 : _dispatcher(dispatcher)
261 , _range(std::move(range))
262 , _callback(callback) {}
264 void operator()()
const {
267 RangeType &leftRange = _range;
268 while (leftRange.is_divisible()) {
269 RangeType rightRange(leftRange, tbb::split());
270 _dispatcher.Run(_RangeTask(
271 _dispatcher, std::move(rightRange), _callback));
276 if (!leftRange.empty()) {
277 std::invoke(_callback, leftRange);
283 mutable RangeType _range;
288 RangeType range = range;
289 dispatcher.Run(_RangeTask(
290 dispatcher, range, std::forward<Fn>(callback)));
294 std::forward<Fn>(callback)(range);
310template <
typename InputIterator,
typename Fn>
313 InputIterator first, InputIterator last, Fn &&fn)
316 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
317 Work_DiagnosticTransports diagnosticTransports;
319 Work_MallocTagsLoopsForEachTaskWrapper<Fn>
320 task(std::forward<Fn>(fn), &diagnosticTransports);
321 WorkImpl_ParallelForEach(first, last, task);
324 Work_LoopsForEachTaskWrapper<Fn>
325 task(std::forward<Fn>(fn), &diagnosticTransports);
326 WorkImpl_ParallelForEach(first, last, task);
328 for (
auto &dt: diagnosticTransports) {
332 std::for_each(first, last, std::forward<Fn>(fn));
336PXR_NAMESPACE_CLOSE_SCOPE
A scoped, stack-based mechanism for intercepting and examining diagnostics issued on the current thre...
TF_API TfDiagnosticTransport Transport()
Move all accumulated diagnostics into a TfDiagnosticTransport, leaving this trap active but empty.
TF_API bool IsClean() const
Return true if no diagnostics have been captured.
An object that represents a snapshot of a thread's TfMallocTag stack state.
Top-down memory tagging system.
static bool IsInitialized()
Return true if the tagging system is active.
A work dispatcher runs concurrent tasks.
void WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
WorkParallelForTBBRange(const RangeType &r, Fn &&callback)
void WorkParallelForEach(InputIterator first, InputIterator last, Fn &&fn)
WorkParallelForEach(Iterator first, Iterator last, CallbackType callback)
void WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
WorkParallelForN(size_t n, CallbackType callback, size_t grainSize = 1)
void WorkSerialForN(size_t n, Fn &&fn)
WorkSerialForN(size_t n, CallbackType callback)
WORK_API bool WorkHasConcurrency()
Return true if WorkGetPhysicalConcurrencyLimit() returns a number greater than 1 and PXR_WORK_THREAD_...