Loading...
Searching...
No Matches
loops.h
Go to the documentation of this file.
1//
2// Copyright 2016 Pixar
3//
4// Licensed under the terms set forth in the LICENSE.txt file available at
5// https://openusd.org/license.
6//
7#ifndef PXR_BASE_WORK_LOOPS_H
8#define PXR_BASE_WORK_LOOPS_H
9
11#include "pxr/pxr.h"
12#include "pxr/base/work/api.h"
15#include "pxr/base/work/impl.h"
17
18#include <algorithm>
19
20PXR_NAMESPACE_OPEN_SCOPE
21
22
23using Work_ErrorTransports = tbb::concurrent_vector<TfErrorTransport>;
24
25template <class Fn>
26class Work_ErrorTransportTaskWrapper
27{
28public:
29 Work_ErrorTransportTaskWrapper(
30 Fn &&callback,
31 Work_ErrorTransports *errors)
32 : _callback(callback)
33 , _errors(errors) {}
34
35 template <typename ... Args>
36 void operator()(Args&&... args) const {
38 _callback(std::forward<Args>(args)...);
39 if (!m.IsClean()) {
40 TfErrorTransport transport = m.Transport();
41 _errors->grow_by(1)->swap(transport);
42 }
43 }
44
45private:
46 Fn & _callback;
47 Work_ErrorTransports *_errors;
48};
49
50template <class Fn>
51class Work_ErrorTransportForEachTaskWrapper
52{
53public:
54 Work_ErrorTransportForEachTaskWrapper(
55 Fn &&callback,
56 Work_ErrorTransports *errors)
57 : _callback(callback)
58 , _errors(errors) {}
59
60 template <typename Arg>
61 void operator()( Arg&& arg) const {
63 _callback(std::forward<Arg>(arg));
64 if (!m.IsClean()) {
65 TfErrorTransport transport = m.Transport();
66 _errors->grow_by(1)->swap(transport);
67 }
68 }
69
70private:
71 Fn & _callback;
72 Work_ErrorTransports *_errors;
73};
74
87template<typename Fn>
88void
89WorkSerialForN(size_t n, Fn &&fn)
90{
91 std::forward<Fn>(fn)(0, n);
92}
93
109template <typename Fn>
110void
111WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
112{
113 if (n == 0)
114 return;
115
116 // Don't bother with parallel_for, if concurrency is limited to 1.
117 if (WorkHasConcurrency()) {
118 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
119 Work_ErrorTransports errorTransports;
120 Work_ErrorTransportTaskWrapper<Fn> task(std::forward<Fn>(callback),
121 &errorTransports);
122 WorkImpl_ParallelForN(n, task, grainSize);
123 for (auto &et: errorTransports) {
124 et.Post();
125 }
126 } else {
127 // If concurrency is limited to 1, execute serially.
128 WorkSerialForN(n, std::forward<Fn>(callback));
129 }
130}
131
143template <typename Fn>
144void
145WorkParallelForN(size_t n, Fn &&callback)
146{
147 WorkParallelForN(n, std::forward<Fn>(callback), 1);
148}
149
163template <typename RangeType, typename Fn>
164void
165WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
166{
167 // Don't bother with parallel_for, if concurrency is limited to 1.
168 if (WorkHasConcurrency()) {
169 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
170 // Use the work backend's ParallelForTBBRange if one exists
171 // otherwise use the default implementation below that builds off of the
172 // dispatcher.
173#if defined WORK_IMPL_HAS_PARALLEL_FOR_TBB_RANGE
174 Work_ErrorTransports errorTransports;
175 Work_ErrorTransportTaskWrapper<Fn> task(std::forward<Fn>(callback),
176 &errorTransports);
177 WorkImpl_ParallelForTBBRange(range, task);
178 for (auto &et: errorTransports) {
179 et.Post();
180 }
181#else
182 // The parallel task responsible for recursively sub-dividing the range
183 // and invoking the callback on the sub-ranges.
184 class _RangeTask
185 {
186 public:
187 _RangeTask(
188 WorkDispatcher &dispatcher,
189 RangeType &&range,
190 const Fn &callback)
191 : _dispatcher(dispatcher)
192 , _range(std::move(range))
193 , _callback(callback) {}
194
195 void operator()() const {
196 // Subdivide the given range until it is no longer divisible, and
197 // recursively spawn _RangeTasks for the right side of the split.
198 RangeType &leftRange = _range;
199 while (leftRange.is_divisible()) {
200 RangeType rightRange(leftRange, tbb::split());
201 _dispatcher.Run(_RangeTask(
202 _dispatcher, std::move(rightRange), _callback));
203 }
204
205 // If there are any more entries remaining in the left-most side
206 // of the given range, invoke the callback on the left-most range.
207 if (!leftRange.empty()) {
208 std::invoke(_callback, leftRange);
209 }
210 }
211
212 private:
213 WorkDispatcher &_dispatcher;
214 mutable RangeType _range;
215 const Fn &_callback;
216 };
217
218 WorkDispatcher dispatcher;
219 RangeType range = range;
220 dispatcher.Run(_RangeTask(
221 dispatcher, range, std::forward<Fn>(callback)));
222#endif
223 } else {
224 // If concurrency is limited to 1, execute serially.
225 std::forward<Fn>(callback)(range);
226 }
227}
228
241template <typename InputIterator, typename Fn>
242inline void
244 InputIterator first, InputIterator last, Fn &&fn)
245{
246 if (WorkHasConcurrency()) {
247 PXR_WORK_IMPL_NAMESPACE_USING_DIRECTIVE;
248 Work_ErrorTransports errorTransports;
249 Work_ErrorTransportForEachTaskWrapper<Fn>
250 task(std::forward<Fn>(fn), &errorTransports);
251 WorkImpl_ParallelForEach(first, last, task);
252 for (auto &et: errorTransports) {
253 et.Post();
254 }
255 } else {
256 std::for_each(first, last, std::forward<Fn>(fn));
257 }
258}
259
260PXR_NAMESPACE_CLOSE_SCOPE
261
262#endif // PXR_BASE_WORK_LOOPS_H
Class used to record the end of the error-list.
Definition: errorMark.h:48
TfErrorTransport Transport() const
Remove all errors in this mark fom the error system and return them in a TfErrorTransport.
Definition: errorMark.h:109
bool IsClean() const
Return true if no new errors were posted in this thread since the last call to SetMark(),...
Definition: errorMark.h:82
A facility for transporting errors from thread to thread.
void swap(TfErrorTransport &other)
Swap this TfErrorTransport's content with other.
A work dispatcher runs concurrent tasks.
Definition: dispatcher.h:176
void WorkParallelForTBBRange(const RangeType &range, Fn &&callback)
WorkParallelForTBBRange(const RangeType &r, Fn &&callback)
Definition: loops.h:165
void WorkParallelForEach(InputIterator first, InputIterator last, Fn &&fn)
WorkParallelForEach(Iterator first, Iterator last, CallbackType callback)
Definition: loops.h:243
void WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
WorkParallelForN(size_t n, CallbackType callback, size_t grainSize = 1)
Definition: loops.h:111
void WorkSerialForN(size_t n, Fn &&fn)
WorkSerialForN(size_t n, CallbackType callback)
Definition: loops.h:89
WORK_API bool WorkHasConcurrency()
Return true if WorkGetPhysicalConcurrencyLimit() returns a number greater than 1 and PXR_WORK_THREAD_...