7#ifndef PXR_BASE_WORK_LOOPS_H
8#define PXR_BASE_WORK_LOOPS_H
12#include "pxr/base/work/api.h"
14#include "pxr/base/work/impl.h"
22PXR_NAMESPACE_OPEN_SCOPE
24using Work_ErrorTransports = tbb::concurrent_vector<TfErrorTransport>;
27class Work_LoopsTaskWrapper
30 Work_LoopsTaskWrapper(
32 Work_ErrorTransports *errors)
36 template <
typename ... Args>
37 void operator()(Args&&... args)
const {
39 _callback(std::forward<Args>(args)...);
42 _errors->grow_by(1)->
swap(transport);
48 Work_ErrorTransports *_errors;
52class Work_MallocTagsLoopsTaskWrapper
55 Work_MallocTagsLoopsTaskWrapper(
57 Work_ErrorTransports *errors)
60 , _mallocTagStack(
TfMallocTag::GetCurrentStackState()) {}
62 template <
typename ... Args>
63 void operator()(Args&&... args)
const {
66 _callback(std::forward<Args>(args)...);
69 _errors->grow_by(1)->
swap(transport);
75 Work_ErrorTransports *_errors;
80class Work_LoopsForEachTaskWrapper
83 Work_LoopsForEachTaskWrapper(
85 Work_ErrorTransports *errors)
89 template <
typename Arg>
90 void operator()(Arg &&arg)
const {
92 _callback(std::forward<Arg>(arg));
95 _errors->grow_by(1)->
swap(transport);
101 Work_ErrorTransports *_errors;
105class Work_MallocTagsLoopsForEachTaskWrapper
108 Work_MallocTagsLoopsForEachTaskWrapper(
110 Work_ErrorTransports *errors)
111 : _callback(callback)
113 , _mallocTagStack(
TfMallocTag::GetCurrentStackState()) {}
115 template <
typename Arg>
116 void operator()(Arg &&arg)
const {
119 _callback(std::forward<Arg>(arg));
122 _errors->grow_by(1)->
swap(transport);
128 Work_ErrorTransports *_errors;
148 std::forward<Fn>(fn)(0, n);
166template <
typename Fn>
175 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
176 Work_ErrorTransports errorTransports;
178 Work_MallocTagsLoopsTaskWrapper<Fn>
179 task(std::forward<Fn>(callback), &errorTransports);
180 WorkImpl_ParallelForN(n, task, grainSize);
183 Work_LoopsTaskWrapper<Fn>
184 task(std::forward<Fn>(callback), &errorTransports);
185 WorkImpl_ParallelForN(n, task, grainSize);
188 for (
auto &et: errorTransports) {
208template <
typename Fn>
228template <
typename RangeType,
typename Fn>
234 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
238#if defined WORK_IMPL_HAS_PARALLEL_FOR_TBB_RANGE
239 Work_ErrorTransports errorTransports;
241 Work_MallocTagsLoopsTaskWrapper<Fn>
242 task(std::forward<Fn>(callback), &errorTransports);
243 WorkImpl_ParallelForTBBRange(range, task);
246 Work_LoopsTaskWrapper<Fn>
247 task(std::forward<Fn>(callback), &errorTransports);
248 WorkImpl_ParallelForTBBRange(range, task);
250 for (
auto &et: errorTransports) {
263 : _dispatcher(dispatcher)
264 , _range(std::move(range))
265 , _callback(callback) {}
267 void operator()()
const {
270 RangeType &leftRange = _range;
271 while (leftRange.is_divisible()) {
272 RangeType rightRange(leftRange, tbb::split());
273 _dispatcher.Run(_RangeTask(
274 _dispatcher, std::move(rightRange), _callback));
279 if (!leftRange.empty()) {
280 std::invoke(_callback, leftRange);
286 mutable RangeType _range;
291 RangeType range = range;
292 dispatcher.Run(_RangeTask(
293 dispatcher, range, std::forward<Fn>(callback)));
297 std::forward<Fn>(callback)(range);
313template <
typename InputIterator,
typename Fn>
316 InputIterator first, InputIterator last, Fn &&fn)
319 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
320 Work_ErrorTransports errorTransports;
322 Work_MallocTagsLoopsForEachTaskWrapper<Fn>
323 task(std::forward<Fn>(fn), &errorTransports);
324 WorkImpl_ParallelForEach(first, last, task);
327 Work_LoopsForEachTaskWrapper<Fn>
328 task(std::forward<Fn>(fn), &errorTransports);
329 WorkImpl_ParallelForEach(first, last, task);
331 for (
auto &et: errorTransports) {
335 std::for_each(first, last, std::forward<Fn>(fn));
339PXR_NAMESPACE_CLOSE_SCOPE
Class used to record the end of the error-list.
TfErrorTransport Transport() const
Remove all errors in this mark fom the error system and return them in a TfErrorTransport.
bool IsClean() const
Return true if no new errors were posted in this thread since the last call to SetMark(),...
A facility for transporting errors from thread to thread.
void swap(TfErrorTransport &other)
Swap this TfErrorTransport's content with other.
An object that represents a snapshot of a thread's TfMallocTag stack state.
Top-down memory tagging system.
static bool IsInitialized()
Return true if the tagging system is active.
A work dispatcher runs concurrent tasks.
void WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
WorkParallelForTBBRange(const RangeType &r, Fn &&callback)
void WorkParallelForEach(InputIterator first, InputIterator last, Fn &&fn)
WorkParallelForEach(Iterator first, Iterator last, CallbackType callback)
void WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
WorkParallelForN(size_t n, CallbackType callback, size_t grainSize = 1)
void WorkSerialForN(size_t n, Fn &&fn)
WorkSerialForN(size_t n, CallbackType callback)
WORK_API bool WorkHasConcurrency()
Return true if WorkGetPhysicalConcurrencyLimit() returns a number greater than 1 and PXR_WORK_THREAD_...