Loading...
Searching...
No Matches
parallelExecutorEngine.h
Go to the documentation of this file.
1//
2// Copyright 2025 Pixar
3//
4// Licensed under the terms set forth in the LICENSE.txt file available at
5// https://openusd.org/license.
6//
7#ifndef PXR_EXEC_VDF_PARALLEL_EXECUTOR_ENGINE_H
8#define PXR_EXEC_VDF_PARALLEL_EXECUTOR_ENGINE_H
9
11
12#include "pxr/pxr.h"
13
15
17#include "pxr/base/work/taskGraph.h"
18
19#include <tbb/concurrent_unordered_map.h>
20
21#include <atomic>
22#include <memory>
23
24PXR_NAMESPACE_OPEN_SCOPE
25
26// Forward declare the speculation executor engine with equivalent traits to
27// this executor engine.
28template <typename> class VdfParallelSpeculationExecutorEngine;
29
39template < typename DataManagerType >
42 VdfParallelExecutorEngine<DataManagerType>,
43 DataManagerType>
44{
45public:
46
51 typedef
54
57 typedef
60 DataManagerType>
62
66 const VdfExecutorInterface &executor,
67 DataManagerType *dataManager) :
68 Base(executor, dataManager)
69 {}
70
71private:
72
73 // Befriend the base class so that it has access to the private methods
74 // used for static polymorphism.
75 //
76 friend Base;
77
78 // This executor engine does not do cycle detection.
79 //
80 bool _DetectCycle(
81 const VdfEvaluationState &state,
82 const VdfNode &node);
83
84 // This executor engine supports touching.
85 //
86 void _Touch(const VdfOutput &output);
87
88 // Finalize the output before publishing any buffers.
89 //
90 void _FinalizeOutput(
91 const VdfEvaluationState &state,
92 const VdfOutput &output,
93 const VdfSchedule::OutputId outputId,
94 const typename Base::_DataHandle dataHandle,
95 const VdfScheduleTaskIndex invocationIndex,
96 const VdfOutput *passToOutput);
97
98 // Finalize state after evaluation completed.
99 //
100 void _FinalizeEvaluation();
101
102 // Lock the buffer for a given output for mung buffer locking.
103 //
104 void _LockBuffer(
105 const VdfEvaluationState &state,
106 const VdfSchedule::OutputId outputId,
107 const typename Base::_DataHandle dataHandle,
108 const VdfScheduleTaskIndex invocationIndex);
109
110 // Publish all locked buffers. This method must not be invoked before all
111 // outputs with locked buffers had their private/scratch buffers published.
112 //
113 void _PublishLockedBuffers();
114
115 // This structure holds all the data relevant to locked buffers.
116 //
117 class _LockedData
118 {
119 public:
120 // Constructor.
121 //
122 _LockedData(
123 const VdfOutputSpec &spec,
124 const VdfMask &mask,
125 const size_t numTasks);
126
127 // Destructor.
128 //
129 ~_LockedData() {
130 delete _value;
131 }
132
133 // Merge values into this buffer.
134 //
135 void Merge(
136 const VdfVector &value,
137 const VdfMask &mask);
138
139 // Transfer ownership of this buffer.
140 //
141 void TransferOwnership(VdfExecutorBufferData *destination);
142
143 private:
144 // The data locked at this output.
145 //
146 VdfVector *_value;
147
148 // An array of masks, one for each compute task having merged data
149 // into _value.
150 //
151 // XXX: This should be a small vector to optimize for size = 1.
152 //
153 std::unique_ptr<VdfMask[]> _masks;
154
155 // The number of entries in the _masks array, which are currently
156 // occupied.
157 //
158 std::atomic<size_t> _num;
159 };
160
161 // TBB task wrapping for publishing locked data to public buffers.
162 //
163 class _PublishLockedDataTask : public WorkTaskGraph::BaseTask
164 {
165 public:
166 // Constructor. Note that the task takes ownership of lockedData and
167 // will destruct the structure and free its memory upon completion.
168 //
169 _PublishLockedDataTask(
170 DataManagerType *dataManager,
171 const typename Base::_DataHandle dataHandle,
172 _LockedData *lockedData) :
173 _dataManager(dataManager),
174 _dataHandle(dataHandle),
175 _lockedData(lockedData)
176 {}
177
178 // Task execution entry point.
179 //
180 WorkTaskGraph::BaseTask *execute() override;
181
182 private:
183 DataManagerType *_dataManager;
184 const typename Base::_DataHandle _dataHandle;
185 std::unique_ptr<_LockedData> _lockedData;
186 };
187
188 // Insert a new entry into the locked data map.
189 //
190 _LockedData *_InsertLockedData(
191 const VdfEvaluationState &state,
192 const VdfSchedule::OutputId outputId,
193 const typename Base::_DataHandle dataHandle);
194
195 // A map from output data handle to data locked at the output.
196 //
197 typedef
198 tbb::concurrent_unordered_map<typename Base::_DataHandle, _LockedData *>
199 _LockedDataMap;
200 _LockedDataMap _lockedDataMap;
201};
202
204
205template < typename DataManagerType >
206inline bool
208 const VdfEvaluationState &state,
209 const VdfNode &node)
210{
211 return false;
212}
213
214template < typename DataManagerType >
215void
217 const VdfOutput &output)
218{
219 Base::_dataManager->DataManagerType::Base::Touch(output);
220}
221
222template < typename DataManagerType >
223void
225 const VdfEvaluationState &state,
226 const VdfOutput &output,
227 const VdfSchedule::OutputId outputId,
228 const typename Base::_DataHandle dataHandle,
229 const VdfScheduleTaskIndex invocationIndex,
230 const VdfOutput *passToOutput)
231{
232 // Does this buffer require locking?
233 if (passToOutput && Base::_dataManager->HasInvalidationTimestampMismatch(
234 dataHandle,
235 Base::_dataManager->GetDataHandle(passToOutput->GetId()))) {
236 _LockBuffer(state, outputId, dataHandle, invocationIndex);
237 }
238}
239
240template < typename DataManagerType >
241void
243{
244 _PublishLockedBuffers();
245}
246
247template < typename DataManagerType >
248void
250 const VdfEvaluationState &state,
251 const VdfSchedule::OutputId outputId,
252 const typename Base::_DataHandle dataHandle,
253 const VdfScheduleTaskIndex invocationIndex)
254{
255 PEE_TRACE_SCOPE("VdfParallelExecutorEngine::_LockBuffer");
256
257 // Get the locked data structure for the given output as identified by
258 // its data handle. If no locked data exists for that output, insert a
259 // new one into the map.
260 typename _LockedDataMap::iterator it = _lockedDataMap.find(dataHandle);
261 _LockedData *lockedData = it == _lockedDataMap.end()
262 ? _InsertLockedData(state, outputId, dataHandle)
263 : it->second;
264
265 // Get the private buffer data. This is the buffer that we want to lock.
266 VdfExecutorBufferData *privateBuffer =
267 Base::_dataManager->GetPrivateBufferData(dataHandle);
268
269 // Retrieve the lock mask based on whether the buffer is being locked for
270 // a node with multiple invocations, or just a single invocation. The
271 // lock mask is the relevant request mask in either case.
272 const VdfSchedule &schedule = state.GetSchedule();
273 const VdfMask &lockMask = !VdfScheduleTaskIsInvalid(invocationIndex)
274 ? schedule.GetRequestMask(invocationIndex)
275 : schedule.GetRequestMask(outputId);
276
277 // Merge the private buffer data into the locked data structure.
278 lockedData->Merge(*privateBuffer->GetExecutorCache(), lockMask);
279}
280
281template < typename DataManagerType >
282void
284{
285 // Bail out if there is no locked data to publish.
286 if (_lockedDataMap.empty()) {
287 return;
288 }
289
290 PEE_TRACE_SCOPE("VdfParallelExecutorEngine::_PublishLockedBuffers");
291
292 Base::_isolatingDispatcher.Run(
293 [&lockedDataMap = _lockedDataMap, &taskGraph = Base::_taskGraph,
294 &dataManager = Base::_dataManager] {
295 // For each entry in the locked data map, spawn a new task to publish
296 // the locked data.
297 for (const auto &data : lockedDataMap) {
298 // Allocate a new task responsible for publishing the data. Note
299 // that the _PublishLockedDataTask will take ownership of the locked
300 // data structure, and is responsible for deallocating it.
301 _PublishLockedDataTask * const task =
302 taskGraph.template AllocateTask<_PublishLockedDataTask>(
303 dataManager, data.first, data.second);
304 taskGraph.RunTask(task);
305 }
306
307 // Clear the map, while the data is still being published.
308 lockedDataMap.clear();
309
310 // Wait for all the publishing to complete.
311 taskGraph.Wait();
312 });
313
314 Base::_isolatingDispatcher.Wait();
315}
316
317template < typename DataManagerType >
320 const VdfEvaluationState &state,
321 const VdfSchedule::OutputId outputId,
322 const typename Base::_DataHandle dataHandle)
323{
324 // Get the output and node.
325 const VdfSchedule &schedule = state.GetSchedule();
326 const VdfOutput &output = *schedule.GetOutput(outputId);
327 const VdfNode &node = output.GetNode();
328
329 // Get all the compute tasks for this node.
330 VdfSchedule::TaskIdRange tasks = schedule.GetComputeTaskIds(node);
331
332 // Construct a new instance of the locked data structure.
333 _LockedData *newData = new _LockedData(
334 output.GetSpec(), schedule.GetRequestMask(outputId), tasks.size());
335
336 // Attempt to insert the newly allocated locked data structure into the
337 // map. Note that this will fail if a parallel thread got to inserting
338 // an instance for the same output data handle first.
339 std::pair<typename _LockedDataMap::iterator, bool> res =
340 _lockedDataMap.insert(std::make_pair(dataHandle, newData));
341
342 // If the insertion failed, because another thread got to inserting the
343 // data first, we need to free the new locked data structure instance we
344 // just allocated.
345 if (!res.second) {
346 delete newData;
347 }
348
349 // The insertion suceeded if this is the first time that this output is
350 // being locked during this round of evaluation. In that case, and if the
351 // node has more than a single compute task, make sure that all compute
352 // tasks will be run. Otherwise, cache hits on other outputs could result in
353 // some of these compute tasks not being invoked, and cause the locked
354 // buffer to be incomplete.
355 else if (tasks.size() > 1) {
356 Base::_taskGraph.RunTask(
357 Base::_taskGraph.template AllocateTask<
358 typename Base::_ComputeAllTask>(this, state, node));
359 }
360
361 // Return a pointer to the locked data structure instance (either the newly
362 // inserted one, or an existing one).
363 return res.first->second;
364}
365
366template < typename DataManagerType >
369{
370 // We are going to publish the locked data to the public buffer.
371 VdfExecutorBufferData *publicBuffer =
372 _dataManager->GetPublicBufferData(_dataHandle);
373
374 // Transfer ownership of the locked data to the public buffer.
375 _lockedData->TransferOwnership(publicBuffer);
376
377 // No scheduler bypass.
378 return nullptr;
379}
380
381template < typename DataManagerType >
383 const VdfOutputSpec &spec,
384 const VdfMask &mask,
385 const size_t numTasks) :
386 _num(0)
387{
388 // Allocate a new VdfVector for the given output.
389 _value = spec.AllocateCache();
390
391 // Make sure the VdfVector is appropriately sized in order to accommodate
392 // all the data as indicated by the mask.
393 spec.ResizeCache(_value, mask.GetBits());
394
395 // Allocate an array of masks - one for each task. Note that not all tasks
396 // will necessarily end up locking any data, but pre-sizing the array for
397 // the worst case allows us to remain lockless.
398 _masks.reset(new VdfMask[numTasks]);
399}
400
401template < typename DataManagerType >
402void
404 const VdfVector &value,
405 const VdfMask &mask)
406{
407 // Merge all the data from the source vector into the internal vector.
408 _value->Merge(value, mask);
409
410 // Store the mask to denote the data entries we just copied. We will
411 // accumulate all the masks when we later publish the locked buffer.
412 const size_t maskIdx = _num.fetch_add(1, std::memory_order_release);
413 _masks[maskIdx] = mask;
414}
415
416template < typename DataManagerType >
417void
419 VdfExecutorBufferData *destination)
420{
421 // If there is no data locked at the output, bail out early.
422 const size_t num = _num.load(std::memory_order_acquire);
423 if (num == 0) {
424 return;
425 }
426
427 // If there is only a single mask locked at this output, and the
428 // destination buffer does not have its own cache, we can straight up
429 // transfer ownership to the destination. This is the fast path.
430 if (num == 1 && !destination->GetExecutorCache()) {
431 destination->SetExecutorCacheMask(_masks[0]);
432 destination->TakeOwnership(_value);
433 _value = nullptr;
434 return;
435 }
436
437 // Accumulate all the masks locked for this output.
438 VdfMask::Bits unionBits(_masks[0].GetBits());
439 for (size_t i = 1; i < num; ++i) {
440 unionBits |= _masks[i].GetBits();
441 }
442
443 // If the destination buffer already has a cache, we need to merge the
444 // locked cache into the existing cache, and append the locked mask to
445 // the cache mask. The destination buffer does not take ownership of the
446 // local VdfVector in this case.
447 if (VdfVector *value = destination->GetExecutorCache()) {
448 value->Merge(*_value, unionBits);
449 unionBits |= destination->GetExecutorCacheMask().GetBits();
450 }
451
452 // If the destination buffer has no cache, we can simply transfer ownership
453 // of the local VdfVector. Don't forget to reset the local VdfVector
454 // pointer to prevent double-freeing the memory (this constructor and
455 // the VdfExecutorBufferData owner).
456 else {
457 destination->TakeOwnership(_value);
458 _value = nullptr;
459 }
460
461 // Finally, apply the executor cache mask using the accumulated mask.
462 destination->SetExecutorCacheMask(VdfMask(unionBits));
463}
464
465PXR_NAMESPACE_CLOSE_SCOPE
466
467#endif
Fast, compressed bit array which is capable of performing logical operations without first decompress...
This object holds state that remains persistent during one round of network evaluation.
const VdfSchedule & GetSchedule() const
The schedule used for evaluation.
This object is responsible for storing the executor buffer data, comprised of the executor cache vect...
const VdfMask & GetExecutorCacheMask() const
Get the available mask.
VdfVector * GetExecutorCache() const
Returns the executor cache stored at this buffer data instance.
void TakeOwnership(VdfVector *v)
Assumes ownership of the given vector.
void SetExecutorCacheMask(const VdfMask &mask)
Sets the available mask.
Abstract base class for classes that execute a VdfNetwork to compute a requested set of values.
A VdfMask is placed on connections to specify the data flowing through them.
Definition: mask.h:37
VdfMask::Bits const & GetBits() const
Get this mask's content as CtCompressedfBits.
Definition: mask.h:556
This is the base class for all nodes in a VdfNetwork.
Definition: node.h:53
A VdfOutput represents an output on a node.
Definition: output.h:32
const VdfNode & GetNode() const
Returns the owning node for this output.
Definition: output.h:57
VdfId GetId() const
The unique id of this output.
Definition: output.h:100
VDF_API const VdfOutputSpec & GetSpec() const
Returns the connector specification object for this output.
A VdfOuptutSpec describes an output connector.
Definition: outputSpec.h:40
VDF_API VdfVector * AllocateCache() const
Allocate a new VdfVector with this spec's type.
VDF_API void ResizeCache(VdfVector *vector, const VdfMask::Bits &bits) const
Resize an existing VdfVector to accomodate all the data set in the bits.
The base class for all parallel executor engines.
A generic, but fully-featured parallel executor engine, deriving from VdfParallelExecutorEngineBase.
VdfParallelExecutorEngine(const VdfExecutorInterface &executor, DataManagerType *dataManager)
Constructor.
VdfParallelExecutorEngineBase< VdfParallelExecutorEngine< DataManagerType >, DataManagerType > Base
Base class.
VdfParallelSpeculationExecutorEngine< DataManagerType > SpeculationExecutorEngine
The equivalent speculation executor engine.
An executor engine used for parallel speculation node evaluation, deriving from VdfParallelExecutorEn...
Minimal iterator range that the schedule returns instances of, in order to facilitate iterating over ...
Definition: schedule.h:47
An OutputId is a small key object that, once obtained for a particular VdfOutput, can be used to quer...
Definition: schedule.h:91
Contains a specification of how to execute a particular VdfNetwork.
Definition: schedule.h:41
VDF_API const VdfMask & GetRequestMask(const OutputId &outputId) const
Returns the request mask associated with the given OutputId.
const TaskIdRange GetComputeTaskIds(const VdfNode &node) const
Returns a range of ids describing compute tasks associated with the given node.
Definition: schedule.h:402
VDF_API const VdfOutput * GetOutput(const OutputId &outputId) const
Returns the scheduled VdfOutput associated with the given OutputId.
This class is used to abstract away knowledge of the cache data used for each node.
Definition: vector.h:56
VDF_API void Merge(const VdfVector &rhs, const VdfMask::Bits &bits)
Merges the contents of rhs into this vector.
Base class for a parallel task that emulates tbb::task (deprecated in the oneTBB version upgrade....
Definition: taskGraph.h:106
bool VdfScheduleTaskIsInvalid(uint32_t task)
Returns true if the given task index or id is invalid.
Definition: scheduleTasks.h:41
uint32_t VdfScheduleTaskIndex
Type describing a task index.
Definition: scheduleTasks.h:29