Loading...
Searching...
No Matches
loops.h
Go to the documentation of this file.
1//
2// Copyright 2016 Pixar
3//
4// Licensed under the terms set forth in the LICENSE.txt file available at
5// https://openusd.org/license.
6//
7#ifndef PXR_BASE_WORK_LOOPS_H
8#define PXR_BASE_WORK_LOOPS_H
9
11#include "pxr/pxr.h"
12#include "pxr/base/work/api.h"
14#include "pxr/base/work/impl.h"
16
20
21#include <algorithm>
22
23PXR_NAMESPACE_OPEN_SCOPE
24
25using Work_DiagnosticTransports = tbb::concurrent_vector<TfDiagnosticTransport>;
26
27template <class Fn>
28class Work_LoopsTaskWrapper
29{
30public:
31 Work_LoopsTaskWrapper(
32 Fn &&callback,
33 Work_DiagnosticTransports *diagnostics)
34 : _callback(callback)
35 , _diagnostics(diagnostics) {}
36
37 template <typename ... Args>
38 void operator()(Args&&... args) const {
40 _callback(std::forward<Args>(args)...);
41 if (!trap.IsClean()) {
42 *_diagnostics->grow_by(1) = trap.Transport();
43 }
44 }
45
46private:
47 Fn & _callback;
48 Work_DiagnosticTransports *_diagnostics;
49};
50
51template <class Fn>
52class Work_MallocTagsLoopsTaskWrapper
53{
54public:
55 Work_MallocTagsLoopsTaskWrapper(
56 Fn &&callback,
57 Work_DiagnosticTransports *diagnostics)
58 : _callback(callback)
59 , _diagnostics(diagnostics)
60 , _mallocTagStack(TfMallocTag::GetCurrentStackState()) {}
61
62 template <typename ... Args>
63 void operator()(Args&&... args) const {
65 TfMallocTag::StackOverride ovr(_mallocTagStack);
66 _callback(std::forward<Args>(args)...);
67 if (!trap.IsClean()) {
68 *_diagnostics->grow_by(1) = trap.Transport();
69 }
70 }
71
72private:
73 Fn & _callback;
74 Work_DiagnosticTransports *_diagnostics;
75 TfMallocTag::StackState _mallocTagStack;
76};
77
78template <class Fn>
79class Work_LoopsForEachTaskWrapper
80{
81public:
82 Work_LoopsForEachTaskWrapper(
83 Fn &&callback,
84 Work_DiagnosticTransports *diagnostics)
85 : _callback(callback)
86 , _diagnostics(diagnostics) {}
87
88 template <typename Arg>
89 void operator()(Arg &&arg) const {
91 _callback(std::forward<Arg>(arg));
92 if (!trap.IsClean()) {
93 *_diagnostics->grow_by(1) = trap.Transport();
94 }
95 }
96
97private:
98 Fn & _callback;
99 Work_DiagnosticTransports *_diagnostics;
100};
101
102template <class Fn>
103class Work_MallocTagsLoopsForEachTaskWrapper
104{
105public:
106 Work_MallocTagsLoopsForEachTaskWrapper(
107 Fn &&callback,
108 Work_DiagnosticTransports *diagnostics)
109 : _callback(callback)
110 , _diagnostics(diagnostics)
111 , _mallocTagStack(TfMallocTag::GetCurrentStackState()) {}
112
113 template <typename Arg>
114 void operator()(Arg &&arg) const {
115 TfDiagnosticTrap trap;
116 TfMallocTag::StackOverride ovr(_mallocTagStack);
117 _callback(std::forward<Arg>(arg));
118 if (!trap.IsClean()) {
119 *_diagnostics->grow_by(1) = trap.Transport();
120 }
121 }
122
123private:
124 Fn & _callback;
125 Work_DiagnosticTransports *_diagnostics;
126 TfMallocTag::StackState _mallocTagStack;
127};
128
141template<typename Fn>
142void
143WorkSerialForN(size_t n, Fn &&fn)
144{
145 std::forward<Fn>(fn)(0, n);
146}
147
163template <typename Fn>
164void
165WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
166{
167 if (n == 0)
168 return;
169
170 // Don't bother with parallel_for, if concurrency is limited to 1.
171 if (WorkHasConcurrency()) {
172 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
173 Work_DiagnosticTransports diagnosticTransports;
175 Work_MallocTagsLoopsTaskWrapper<Fn>
176 task(std::forward<Fn>(callback), &diagnosticTransports);
177 WorkImpl_ParallelForN(n, task, grainSize);
178 }
179 else {
180 Work_LoopsTaskWrapper<Fn>
181 task(std::forward<Fn>(callback), &diagnosticTransports);
182 WorkImpl_ParallelForN(n, task, grainSize);
183 }
184
185 for (auto &dt: diagnosticTransports) {
186 dt.Post();
187 }
188 } else {
189 // If concurrency is limited to 1, execute serially.
190 WorkSerialForN(n, std::forward<Fn>(callback));
191 }
192}
193
205template <typename Fn>
206void
207WorkParallelForN(size_t n, Fn &&callback)
208{
209 WorkParallelForN(n, std::forward<Fn>(callback), 1);
210}
211
225template <typename RangeType, typename Fn>
226void
227WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
228{
229 // Don't bother with parallel_for, if concurrency is limited to 1.
230 if (WorkHasConcurrency()) {
231 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
232 // Use the work backend's ParallelForTBBRange if one exists
233 // otherwise use the default implementation below that builds off of the
234 // dispatcher.
235#if defined WORK_IMPL_HAS_PARALLEL_FOR_TBB_RANGE
236 Work_DiagnosticTransports diagnosticTransports;
238 Work_MallocTagsLoopsTaskWrapper<Fn>
239 task(std::forward<Fn>(callback), &diagnosticTransports);
240 WorkImpl_ParallelForTBBRange(range, task);
241 }
242 else {
243 Work_LoopsTaskWrapper<Fn>
244 task(std::forward<Fn>(callback), &diagnosticTransports);
245 WorkImpl_ParallelForTBBRange(range, task);
246 }
247 for (auto &dt: diagnosticTransports) {
248 dt.Post();
249 }
250#else
251 // The parallel task responsible for recursively sub-dividing the range
252 // and invoking the callback on the sub-ranges.
253 class _RangeTask
254 {
255 public:
256 _RangeTask(
257 WorkDispatcher &dispatcher,
258 RangeType &&range,
259 const Fn &callback)
260 : _dispatcher(dispatcher)
261 , _range(std::move(range))
262 , _callback(callback) {}
263
264 void operator()() const {
265 // Subdivide the given range until it is no longer divisible, and
266 // recursively spawn _RangeTasks for the right side of the split.
267 RangeType &leftRange = _range;
268 while (leftRange.is_divisible()) {
269 RangeType rightRange(leftRange, tbb::split());
270 _dispatcher.Run(_RangeTask(
271 _dispatcher, std::move(rightRange), _callback));
272 }
273
274 // If there are any more entries remaining in the left-most side
275 // of the given range, invoke the callback on the left-most range.
276 if (!leftRange.empty()) {
277 std::invoke(_callback, leftRange);
278 }
279 }
280
281 private:
282 WorkDispatcher &_dispatcher;
283 mutable RangeType _range;
284 const Fn &_callback;
285 };
286
287 WorkDispatcher dispatcher;
288 RangeType range = range;
289 dispatcher.Run(_RangeTask(
290 dispatcher, range, std::forward<Fn>(callback)));
291#endif
292 } else {
293 // If concurrency is limited to 1, execute serially.
294 std::forward<Fn>(callback)(range);
295 }
296}
297
310template <typename InputIterator, typename Fn>
311inline void
313 InputIterator first, InputIterator last, Fn &&fn)
314{
315 if (WorkHasConcurrency()) {
316 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
317 Work_DiagnosticTransports diagnosticTransports;
319 Work_MallocTagsLoopsForEachTaskWrapper<Fn>
320 task(std::forward<Fn>(fn), &diagnosticTransports);
321 WorkImpl_ParallelForEach(first, last, task);
322 }
323 else {
324 Work_LoopsForEachTaskWrapper<Fn>
325 task(std::forward<Fn>(fn), &diagnosticTransports);
326 WorkImpl_ParallelForEach(first, last, task);
327 }
328 for (auto &dt: diagnosticTransports) {
329 dt.Post();
330 }
331 } else {
332 std::for_each(first, last, std::forward<Fn>(fn));
333 }
334}
335
336PXR_NAMESPACE_CLOSE_SCOPE
337
338#endif // PXR_BASE_WORK_LOOPS_H
A scoped, stack-based mechanism for intercepting and examining diagnostics issued on the current thre...
TF_API TfDiagnosticTransport Transport()
Move all accumulated diagnostics into a TfDiagnosticTransport, leaving this trap active but empty.
TF_API bool IsClean() const
Return true if no diagnostics have been captured.
An object that represents a snapshot of a thread's TfMallocTag stack state.
Definition: mallocTag.h:337
Top-down memory tagging system.
Definition: mallocTag.h:35
static bool IsInitialized()
Return true if the tagging system is active.
Definition: mallocTag.h:190
A work dispatcher runs concurrent tasks.
Definition: dispatcher.h:228
void WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
WorkParallelForTBBRange(const RangeType &r, Fn &&callback)
Definition: loops.h:227
void WorkParallelForEach(InputIterator first, InputIterator last, Fn &&fn)
WorkParallelForEach(Iterator first, Iterator last, CallbackType callback)
Definition: loops.h:312
void WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
WorkParallelForN(size_t n, CallbackType callback, size_t grainSize = 1)
Definition: loops.h:165
void WorkSerialForN(size_t n, Fn &&fn)
WorkSerialForN(size_t n, CallbackType callback)
Definition: loops.h:143
WORK_API bool WorkHasConcurrency()
Return true if WorkGetPhysicalConcurrencyLimit() returns a number greater than 1 and PXR_WORK_THREAD_...