7#ifndef PXR_EXEC_VDF_PARALLEL_EXECUTOR_ENGINE_H
8#define PXR_EXEC_VDF_PARALLEL_EXECUTOR_ENGINE_H
17#include "pxr/base/work/taskGraph.h"
19#include <tbb/concurrent_unordered_map.h>
24PXR_NAMESPACE_OPEN_SCOPE
39template <
typename DataManagerType >
42 VdfParallelExecutorEngine<DataManagerType>,
67 DataManagerType *dataManager) :
68 Base(executor, dataManager)
94 const typename Base::_DataHandle dataHandle,
100 void _FinalizeEvaluation();
107 const typename Base::_DataHandle dataHandle,
113 void _PublishLockedBuffers();
125 const size_t numTasks);
153 std::unique_ptr<VdfMask[]> _masks;
158 std::atomic<size_t> _num;
169 _PublishLockedDataTask(
170 DataManagerType *dataManager,
171 const typename Base::_DataHandle dataHandle,
172 _LockedData *lockedData) :
173 _dataManager(dataManager),
174 _dataHandle(dataHandle),
175 _lockedData(lockedData)
183 DataManagerType *_dataManager;
184 const typename Base::_DataHandle _dataHandle;
185 std::unique_ptr<_LockedData> _lockedData;
190 _LockedData *_InsertLockedData(
193 const typename Base::_DataHandle dataHandle);
198 tbb::concurrent_unordered_map<typename Base::_DataHandle, _LockedData *>
200 _LockedDataMap _lockedDataMap;
205template <
typename DataManagerType >
214template <
typename DataManagerType >
219 Base::_dataManager->DataManagerType::Base::Touch(output);
222template <
typename DataManagerType >
228 const typename Base::_DataHandle dataHandle,
233 if (passToOutput && Base::_dataManager->HasInvalidationTimestampMismatch(
235 Base::_dataManager->GetDataHandle(passToOutput->
GetId()))) {
236 _LockBuffer(state, outputId, dataHandle, invocationIndex);
240template <
typename DataManagerType >
244 _PublishLockedBuffers();
247template <
typename DataManagerType >
252 const typename Base::_DataHandle dataHandle,
255 PEE_TRACE_SCOPE(
"VdfParallelExecutorEngine::_LockBuffer");
260 typename _LockedDataMap::iterator it = _lockedDataMap.find(dataHandle);
261 _LockedData *lockedData = it == _lockedDataMap.end()
262 ? _InsertLockedData(state, outputId, dataHandle)
267 Base::_dataManager->GetPrivateBufferData(dataHandle);
281template <
typename DataManagerType >
286 if (_lockedDataMap.empty()) {
290 PEE_TRACE_SCOPE(
"VdfParallelExecutorEngine::_PublishLockedBuffers");
292 Base::_isolatingDispatcher.Run(
293 [&lockedDataMap = _lockedDataMap, &taskGraph = Base::_taskGraph,
294 &dataManager = Base::_dataManager] {
297 for (
const auto &data : lockedDataMap) {
301 _PublishLockedDataTask *
const task =
302 taskGraph.template AllocateTask<_PublishLockedDataTask>(
303 dataManager, data.first, data.second);
304 taskGraph.RunTask(task);
308 lockedDataMap.clear();
314 Base::_isolatingDispatcher.Wait();
317template <
typename DataManagerType >
322 const typename Base::_DataHandle dataHandle)
333 _LockedData *newData =
new _LockedData(
339 std::pair<typename _LockedDataMap::iterator, bool> res =
340 _lockedDataMap.insert(std::make_pair(dataHandle, newData));
355 else if (tasks.size() > 1) {
356 Base::_taskGraph.RunTask(
357 Base::_taskGraph.
template AllocateTask<
358 typename Base::_ComputeAllTask>(
this, state, node));
363 return res.first->second;
366template <
typename DataManagerType >
372 _dataManager->GetPublicBufferData(_dataHandle);
375 _lockedData->TransferOwnership(publicBuffer);
381template <
typename DataManagerType >
385 const size_t numTasks) :
398 _masks.reset(
new VdfMask[numTasks]);
401template <
typename DataManagerType >
408 _value->Merge(value, mask);
412 const size_t maskIdx = _num.fetch_add(1, std::memory_order_release);
413 _masks[maskIdx] = mask;
416template <
typename DataManagerType >
422 const size_t num = _num.load(std::memory_order_acquire);
439 for (
size_t i = 1; i < num; ++i) {
440 unionBits |= _masks[i].GetBits();
448 value->
Merge(*_value, unionBits);
465PXR_NAMESPACE_CLOSE_SCOPE
Fast, compressed bit array which is capable of performing logical operations without first decompress...
This object holds state that remains persistent during one round of network evaluation.
const VdfSchedule & GetSchedule() const
The schedule used for evaluation.
This object is responsible for storing the executor buffer data, comprised of the executor cache vect...
const VdfMask & GetExecutorCacheMask() const
Get the available mask.
VdfVector * GetExecutorCache() const
Returns the executor cache stored at this buffer data instance.
void TakeOwnership(VdfVector *v)
Assumes ownership of the given vector.
void SetExecutorCacheMask(const VdfMask &mask)
Sets the available mask.
Abstract base class for classes that execute a VdfNetwork to compute a requested set of values.
A VdfMask is placed on connections to specify the data flowing through them.
VdfMask::Bits const & GetBits() const
Get this mask's content as CtCompressedfBits.
This is the base class for all nodes in a VdfNetwork.
A VdfOutput represents an output on a node.
const VdfNode & GetNode() const
Returns the owning node for this output.
VdfId GetId() const
The unique id of this output.
VDF_API const VdfOutputSpec & GetSpec() const
Returns the connector specification object for this output.
A VdfOuptutSpec describes an output connector.
VDF_API VdfVector * AllocateCache() const
Allocate a new VdfVector with this spec's type.
VDF_API void ResizeCache(VdfVector *vector, const VdfMask::Bits &bits) const
Resize an existing VdfVector to accomodate all the data set in the bits.
The base class for all parallel executor engines.
A generic, but fully-featured parallel executor engine, deriving from VdfParallelExecutorEngineBase.
VdfParallelExecutorEngine(const VdfExecutorInterface &executor, DataManagerType *dataManager)
Constructor.
VdfParallelExecutorEngineBase< VdfParallelExecutorEngine< DataManagerType >, DataManagerType > Base
Base class.
VdfParallelSpeculationExecutorEngine< DataManagerType > SpeculationExecutorEngine
The equivalent speculation executor engine.
An executor engine used for parallel speculation node evaluation, deriving from VdfParallelExecutorEn...
Minimal iterator range that the schedule returns instances of, in order to facilitate iterating over ...
An OutputId is a small key object that, once obtained for a particular VdfOutput, can be used to quer...
Contains a specification of how to execute a particular VdfNetwork.
VDF_API const VdfMask & GetRequestMask(const OutputId &outputId) const
Returns the request mask associated with the given OutputId.
const TaskIdRange GetComputeTaskIds(const VdfNode &node) const
Returns a range of ids describing compute tasks associated with the given node.
VDF_API const VdfOutput * GetOutput(const OutputId &outputId) const
Returns the scheduled VdfOutput associated with the given OutputId.
This class is used to abstract away knowledge of the cache data used for each node.
VDF_API void Merge(const VdfVector &rhs, const VdfMask::Bits &bits)
Merges the contents of rhs into this vector.
Base class for a parallel task that emulates tbb::task (deprecated in the oneTBB version upgrade....
bool VdfScheduleTaskIsInvalid(uint32_t task)
Returns true if the given task index or id is invalid.
uint32_t VdfScheduleTaskIndex
Type describing a task index.