All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
dispatcher.h
Go to the documentation of this file.
1//
2// Copyright 2016 Pixar
3//
4// Licensed under the terms set forth in the LICENSE.txt file available at
5// https://openusd.org/license.
6//
7#ifndef PXR_BASE_WORK_DISPATCHER_H
8#define PXR_BASE_WORK_DISPATCHER_H
9
11
12#include "pxr/pxr.h"
14#include "pxr/base/work/api.h"
15
18
19// Blocked range is not used in this file, but this header happens to pull in
20// the TBB version header in a way that works in all TBB versions.
21#include <tbb/blocked_range.h>
22#include <tbb/concurrent_vector.h>
23#if TBB_INTERFACE_VERSION_MAJOR >= 12
24#include <tbb/task_group.h>
25#else
26#include <tbb/task.h>
27#endif
28
29#include <functional>
30#include <type_traits>
31#include <utility>
32
33PXR_NAMESPACE_OPEN_SCOPE
34
66{
67public:
69 WORK_API WorkDispatcher();
70
72 WORK_API ~WorkDispatcher() noexcept;
73
74 WorkDispatcher(WorkDispatcher const &) = delete;
75 WorkDispatcher &operator=(WorkDispatcher const &) = delete;
76
77#ifdef doxygen
78
89 template <class Callable, class A1, class A2, ... class AN>
90 void Run(Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN);
91
92#else // doxygen
93
94 template <class Callable>
95 inline void Run(Callable &&c) {
96#if TBB_INTERFACE_VERSION_MAJOR >= 12
97 _taskGroup.run(_InvokerTask<typename std::remove_reference<Callable>::type>(std::forward<Callable>(c), &_errors));
98#else
99 _rootTask->spawn(_MakeInvokerTask(std::forward<Callable>(c)));
100#endif
101 }
102
103 template <class Callable, class A0, class ... Args>
104 inline void Run(Callable &&c, A0 &&a0, Args&&... args) {
105 Run(std::bind(std::forward<Callable>(c),
106 std::forward<A0>(a0),
107 std::forward<Args>(args)...));
108 }
109
110#endif // doxygen
111
113 WORK_API void Wait();
114
125 WORK_API void Cancel();
126
127private:
128 typedef tbb::concurrent_vector<TfErrorTransport> _ErrorTransports;
129
130 // Function invoker helper that wraps the invocation with an ErrorMark so we
131 // can transmit errors that occur back to the thread that Wait() s for tasks
132 // to complete.
133#if TBB_INTERFACE_VERSION_MAJOR >= 12
134 template <class Fn>
135 struct _InvokerTask {
136 explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
137 : _fn(std::move(fn)), _errors(err) {}
138
139 explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
140 : _fn(fn), _errors(err) {}
141
142 // Ensure only moves happen, no copies.
143 _InvokerTask(_InvokerTask &&other) = default;
144 _InvokerTask(const _InvokerTask &other) = delete;
145 _InvokerTask &operator=(const _InvokerTask &other) = delete;
146
147 void operator()() const {
148 TfErrorMark m;
149 _fn();
150 if (!m.IsClean())
151 WorkDispatcher::_TransportErrors(m, _errors);
152 }
153 private:
154 Fn _fn;
155 _ErrorTransports *_errors;
156 };
157#else
158 template <class Fn>
159 struct _InvokerTask : public tbb::task {
160 explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
161 : _fn(std::move(fn)), _errors(err) {}
162
163 explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
164 : _fn(fn), _errors(err) {}
165
166 virtual tbb::task* execute() {
167 TfErrorMark m;
168 // In anticipation of OneTBB, ensure that _fn meets OneTBB's
169 // requirement that a task's call operator must be const.
170 const_cast<_InvokerTask const *>(this)->_fn();
171 if (!m.IsClean())
172 WorkDispatcher::_TransportErrors(m, _errors);
173 return NULL;
174 }
175 private:
176 Fn _fn;
177 _ErrorTransports *_errors;
178 };
179
180 // Make an _InvokerTask instance, letting the function template deduce Fn.
181 template <class Fn>
182 _InvokerTask<typename std::remove_reference<Fn>::type>&
183 _MakeInvokerTask(Fn &&fn) {
184 return *new( _rootTask->allocate_additional_child_of(*_rootTask) )
185 _InvokerTask<typename std::remove_reference<Fn>::type>(
186 std::forward<Fn>(fn), &_errors);
187 }
188#endif
189
190 // Helper function that removes errors from \p m and stores them in a new
191 // entry in \p errors.
192 WORK_API static void
193 _TransportErrors(const TfErrorMark &m, _ErrorTransports *errors);
194
195 // Task group context to run tasks in.
196 tbb::task_group_context _context;
197#if TBB_INTERFACE_VERSION_MAJOR >= 12
198 // Custom task group that lets us implement thread safe concurrent wait.
199 class _TaskGroup : public tbb::task_group {
200 public:
201 _TaskGroup(tbb::task_group_context& ctx) : tbb::task_group(ctx) {}
202 inline tbb::detail::d1::wait_context& _GetInternalWaitContext();
203 };
204
205 _TaskGroup _taskGroup;
206#else
207 // Root task that allows us to cancel tasks invoked directly by this
208 // dispatcher.
209 tbb::empty_task* _rootTask;
210#endif
211
212 // The error transports we use to transmit errors in other threads back to
213 // this thread.
214 _ErrorTransports _errors;
215
216 // Concurrent calls to Wait() have to serialize certain cleanup operations.
217 std::atomic_flag _waitCleanupFlag;
218};
219
220// Wrapper class for non-const tasks.
221template <class Fn>
222struct Work_DeprecatedMutableTask {
223 explicit Work_DeprecatedMutableTask(Fn &&fn)
224 : _fn(std::move(fn)) {}
225
226 explicit Work_DeprecatedMutableTask(Fn const &fn)
227 : _fn(fn) {}
228
229 // Ensure only moves happen, no copies.
230 Work_DeprecatedMutableTask
231 (Work_DeprecatedMutableTask &&other) = default;
232 Work_DeprecatedMutableTask
233 (const Work_DeprecatedMutableTask &other) = delete;
234 Work_DeprecatedMutableTask
235 &operator= (const Work_DeprecatedMutableTask &other) = delete;
236
237 void operator()() const {
238 _fn();
239 }
240private:
241 mutable Fn _fn;
242};
243
244// Wrapper function to convert non-const tasks to a Work_DeprecatedMutableTask.
245// When adding new tasks refrain from using this wrapper, instead ensure the
246// call operator of the task is const such that it is compatible with oneTBB.
247template <typename Fn>
248Work_DeprecatedMutableTask<typename std::remove_reference_t<Fn>>
249WorkMakeDeprecatedMutableTask(Fn &&fn) {
250 return Work_DeprecatedMutableTask<typename std::remove_reference_t<Fn>>
251 (std::forward<Fn>(fn));
252}
253
255
256PXR_NAMESPACE_CLOSE_SCOPE
257
258#endif // PXR_BASE_WORK_DISPATCHER_H
Class used to record the end of the error-list.
Definition: errorMark.h:48
bool IsClean() const
Return true if no new errors were posted in this thread since the last call to SetMark(),...
Definition: errorMark.h:82
A work dispatcher runs concurrent tasks.
Definition: dispatcher.h:66
WORK_API ~WorkDispatcher() noexcept
Wait() for any pending tasks to complete, then destroy the dispatcher.
WORK_API void Cancel()
Cancel remaining work and return immediately.
WORK_API void Wait()
Block until the work started by Run() completes.
WORK_API WorkDispatcher()
Construct a new dispatcher.
void Run(Callable &&c, A1 &&a1, A2 &&a2,... AN &&aN)
Add work for the dispatcher to run.
STL namespace.