Loading...
Searching...
No Matches
loops.h
Go to the documentation of this file.
1//
2// Copyright 2016 Pixar
3//
4// Licensed under the terms set forth in the LICENSE.txt file available at
5// https://openusd.org/license.
6//
7#ifndef PXR_BASE_WORK_LOOPS_H
8#define PXR_BASE_WORK_LOOPS_H
9
11#include "pxr/pxr.h"
12#include "pxr/base/work/api.h"
14#include "pxr/base/work/impl.h"
16
19
20#include <algorithm>
21
22PXR_NAMESPACE_OPEN_SCOPE
23
24using Work_ErrorTransports = tbb::concurrent_vector<TfErrorTransport>;
25
26template <class Fn>
27class Work_LoopsTaskWrapper
28{
29public:
30 Work_LoopsTaskWrapper(
31 Fn &&callback,
32 Work_ErrorTransports *errors)
33 : _callback(callback)
34 , _errors(errors) {}
35
36 template <typename ... Args>
37 void operator()(Args&&... args) const {
39 _callback(std::forward<Args>(args)...);
40 if (!m.IsClean()) {
41 TfErrorTransport transport = m.Transport();
42 _errors->grow_by(1)->swap(transport);
43 }
44 }
45
46private:
47 Fn & _callback;
48 Work_ErrorTransports *_errors;
49};
50
51template <class Fn>
52class Work_MallocTagsLoopsTaskWrapper
53{
54public:
55 Work_MallocTagsLoopsTaskWrapper(
56 Fn &&callback,
57 Work_ErrorTransports *errors)
58 : _callback(callback)
59 , _errors(errors)
60 , _mallocTagStack(TfMallocTag::GetCurrentStackState()) {}
61
62 template <typename ... Args>
63 void operator()(Args&&... args) const {
65 TfMallocTag::StackOverride ovr(_mallocTagStack);
66 _callback(std::forward<Args>(args)...);
67 if (!m.IsClean()) {
68 TfErrorTransport transport = m.Transport();
69 _errors->grow_by(1)->swap(transport);
70 }
71 }
72
73private:
74 Fn & _callback;
75 Work_ErrorTransports *_errors;
76 TfMallocTag::StackState _mallocTagStack;
77};
78
79template <class Fn>
80class Work_LoopsForEachTaskWrapper
81{
82public:
83 Work_LoopsForEachTaskWrapper(
84 Fn &&callback,
85 Work_ErrorTransports *errors)
86 : _callback(callback)
87 , _errors(errors) {}
88
89 template <typename Arg>
90 void operator()(Arg &&arg) const {
92 _callback(std::forward<Arg>(arg));
93 if (!m.IsClean()) {
94 TfErrorTransport transport = m.Transport();
95 _errors->grow_by(1)->swap(transport);
96 }
97 }
98
99private:
100 Fn & _callback;
101 Work_ErrorTransports *_errors;
102};
103
104template <class Fn>
105class Work_MallocTagsLoopsForEachTaskWrapper
106{
107public:
108 Work_MallocTagsLoopsForEachTaskWrapper(
109 Fn &&callback,
110 Work_ErrorTransports *errors)
111 : _callback(callback)
112 , _errors(errors)
113 , _mallocTagStack(TfMallocTag::GetCurrentStackState()) {}
114
115 template <typename Arg>
116 void operator()(Arg &&arg) const {
117 TfErrorMark m;
118 TfMallocTag::StackOverride ovr(_mallocTagStack);
119 _callback(std::forward<Arg>(arg));
120 if (!m.IsClean()) {
121 TfErrorTransport transport = m.Transport();
122 _errors->grow_by(1)->swap(transport);
123 }
124 }
125
126private:
127 Fn & _callback;
128 Work_ErrorTransports *_errors;
129 TfMallocTag::StackState _mallocTagStack;
130};
131
144template<typename Fn>
145void
146WorkSerialForN(size_t n, Fn &&fn)
147{
148 std::forward<Fn>(fn)(0, n);
149}
150
166template <typename Fn>
167void
168WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
169{
170 if (n == 0)
171 return;
172
173 // Don't bother with parallel_for, if concurrency is limited to 1.
174 if (WorkHasConcurrency()) {
175 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
176 Work_ErrorTransports errorTransports;
178 Work_MallocTagsLoopsTaskWrapper<Fn>
179 task(std::forward<Fn>(callback), &errorTransports);
180 WorkImpl_ParallelForN(n, task, grainSize);
181 }
182 else {
183 Work_LoopsTaskWrapper<Fn>
184 task(std::forward<Fn>(callback), &errorTransports);
185 WorkImpl_ParallelForN(n, task, grainSize);
186 }
187
188 for (auto &et: errorTransports) {
189 et.Post();
190 }
191 } else {
192 // If concurrency is limited to 1, execute serially.
193 WorkSerialForN(n, std::forward<Fn>(callback));
194 }
195}
196
208template <typename Fn>
209void
210WorkParallelForN(size_t n, Fn &&callback)
211{
212 WorkParallelForN(n, std::forward<Fn>(callback), 1);
213}
214
228template <typename RangeType, typename Fn>
229void
230WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
231{
232 // Don't bother with parallel_for, if concurrency is limited to 1.
233 if (WorkHasConcurrency()) {
234 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
235 // Use the work backend's ParallelForTBBRange if one exists
236 // otherwise use the default implementation below that builds off of the
237 // dispatcher.
238#if defined WORK_IMPL_HAS_PARALLEL_FOR_TBB_RANGE
239 Work_ErrorTransports errorTransports;
241 Work_MallocTagsLoopsTaskWrapper<Fn>
242 task(std::forward<Fn>(callback), &errorTransports);
243 WorkImpl_ParallelForTBBRange(range, task);
244 }
245 else {
246 Work_LoopsTaskWrapper<Fn>
247 task(std::forward<Fn>(callback), &errorTransports);
248 WorkImpl_ParallelForTBBRange(range, task);
249 }
250 for (auto &et: errorTransports) {
251 et.Post();
252 }
253#else
254 // The parallel task responsible for recursively sub-dividing the range
255 // and invoking the callback on the sub-ranges.
256 class _RangeTask
257 {
258 public:
259 _RangeTask(
260 WorkDispatcher &dispatcher,
261 RangeType &&range,
262 const Fn &callback)
263 : _dispatcher(dispatcher)
264 , _range(std::move(range))
265 , _callback(callback) {}
266
267 void operator()() const {
268 // Subdivide the given range until it is no longer divisible, and
269 // recursively spawn _RangeTasks for the right side of the split.
270 RangeType &leftRange = _range;
271 while (leftRange.is_divisible()) {
272 RangeType rightRange(leftRange, tbb::split());
273 _dispatcher.Run(_RangeTask(
274 _dispatcher, std::move(rightRange), _callback));
275 }
276
277 // If there are any more entries remaining in the left-most side
278 // of the given range, invoke the callback on the left-most range.
279 if (!leftRange.empty()) {
280 std::invoke(_callback, leftRange);
281 }
282 }
283
284 private:
285 WorkDispatcher &_dispatcher;
286 mutable RangeType _range;
287 const Fn &_callback;
288 };
289
290 WorkDispatcher dispatcher;
291 RangeType range = range;
292 dispatcher.Run(_RangeTask(
293 dispatcher, range, std::forward<Fn>(callback)));
294#endif
295 } else {
296 // If concurrency is limited to 1, execute serially.
297 std::forward<Fn>(callback)(range);
298 }
299}
300
313template <typename InputIterator, typename Fn>
314inline void
316 InputIterator first, InputIterator last, Fn &&fn)
317{
318 if (WorkHasConcurrency()) {
319 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
320 Work_ErrorTransports errorTransports;
322 Work_MallocTagsLoopsForEachTaskWrapper<Fn>
323 task(std::forward<Fn>(fn), &errorTransports);
324 WorkImpl_ParallelForEach(first, last, task);
325 }
326 else {
327 Work_LoopsForEachTaskWrapper<Fn>
328 task(std::forward<Fn>(fn), &errorTransports);
329 WorkImpl_ParallelForEach(first, last, task);
330 }
331 for (auto &et: errorTransports) {
332 et.Post();
333 }
334 } else {
335 std::for_each(first, last, std::forward<Fn>(fn));
336 }
337}
338
339PXR_NAMESPACE_CLOSE_SCOPE
340
341#endif // PXR_BASE_WORK_LOOPS_H
Class used to record the end of the error-list.
Definition: errorMark.h:48
TfErrorTransport Transport() const
Remove all errors in this mark fom the error system and return them in a TfErrorTransport.
Definition: errorMark.h:109
bool IsClean() const
Return true if no new errors were posted in this thread since the last call to SetMark(),...
Definition: errorMark.h:82
A facility for transporting errors from thread to thread.
void swap(TfErrorTransport &other)
Swap this TfErrorTransport's content with other.
An object that represents a snapshot of a thread's TfMallocTag stack state.
Definition: mallocTag.h:336
Top-down memory tagging system.
Definition: mallocTag.h:34
static bool IsInitialized()
Return true if the tagging system is active.
Definition: mallocTag.h:189
A work dispatcher runs concurrent tasks.
Definition: dispatcher.h:223
void WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
WorkParallelForTBBRange(const RangeType &r, Fn &&callback)
Definition: loops.h:230
void WorkParallelForEach(InputIterator first, InputIterator last, Fn &&fn)
WorkParallelForEach(Iterator first, Iterator last, CallbackType callback)
Definition: loops.h:315
void WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
WorkParallelForN(size_t n, CallbackType callback, size_t grainSize = 1)
Definition: loops.h:168
void WorkSerialForN(size_t n, Fn &&fn)
WorkSerialForN(size_t n, CallbackType callback)
Definition: loops.h:146
WORK_API bool WorkHasConcurrency()
Return true if WorkGetPhysicalConcurrencyLimit() returns a number greater than 1 and PXR_WORK_THREAD_...