7#ifndef PXR_EXEC_VDF_PARALLEL_EXECUTOR_DATA_VECTOR_H
8#define PXR_EXEC_VDF_PARALLEL_EXECUTOR_DATA_VECTOR_H
12#include "pxr/exec/vdf/api.h"
18#include "pxr/base/work/zeroAllocator.h"
20#include <tbb/concurrent_vector.h>
21#include <tbb/spin_mutex.h>
28PXR_NAMESPACE_OPEN_SCOPE
91 const uint8_t idx = _outputData[handle].bufferIndices.GetPrivateIndex();
92 return &_bufferData[idx][handle];
101 const uint8_t idx = _outputData[handle].bufferIndices.GetScratchIndex();
102 return &_bufferData[idx][handle];
111 const uint8_t idx = _outputData[handle].bufferIndices.GetPublicIndex();
112 return &_bufferData[idx][handle];
123 _outputData[handle].bufferIndices.PublishPrivateIndex();
134 _outputData[handle].bufferIndices.PublishScratchIndex();
146 return _inboxes[handle].Get();
165 return _inboxes[handle].Take(value, mask);
176 _inboxes[handle].Reset();
187 return &_invalidationData[handle];
198 return _outputData[handle].invalidationTimestamp;
211 _outputData[handle].invalidationTimestamp = timestamp;
219 const uint32_t idx = outputIdx / _TouchedWordBits;
221 UINT64_C(1) << (outputIdx & (_TouchedWordBits - 1));
223 idx < _numTouched.load(std::memory_order_acquire) &&
224 (_touched[idx].load(std::memory_order_relaxed) & bit) != 0;
232 if (ARCH_UNLIKELY(_numTouched.load(std::memory_order_acquire) == 0)) {
237 const uint32_t idx = outputIdx / _TouchedWordBits;
239 UINT64_C(1) << (outputIdx & (_TouchedWordBits - 1));
240 if ((_touched[idx].load(std::memory_order_relaxed) & bit) == 0) {
241 _touched[idx].fetch_or(bit);
253 const uint32_t idx = outputIdx / _TouchedWordBits;
255 UINT64_C(1) << (outputIdx & (_TouchedWordBits - 1));
256 if (idx < _numTouched.load(std::memory_order_acquire) &&
257 (_touched[idx].load(std::memory_order_relaxed) & bit) != 0) {
258 return (_touched[idx].fetch_and(~bit) & bit) != 0;
269 return _outputData.size();
305 (0 << _privateOffset) |
306 (1 << _scratchOffset) |
307 (2 << _publicOffset);
312 const uint8_t GetPrivateIndex()
const {
313 return (_indices.load(std::memory_order_acquire) & _privateMask) >>
319 const uint8_t GetScratchIndex()
const {
320 return (_indices.load(std::memory_order_acquire) & _scratchMask) >>
326 const uint8_t GetPublicIndex()
const {
327 return (_indices.load(std::memory_order_acquire) & _publicMask) >>
333 void PublishPrivateIndex() {
334 uint8_t indices = _indices.load(std::memory_order_relaxed);
337 (indices & _scratchMask) |
339 ((indices >> _publicOffset) & _privateMask) |
341 ((indices & _privateMask) << _publicOffset);
342 _indices.compare_exchange_strong(indices, newIndices);
347 void PublishScratchIndex() {
348 uint8_t indices = _indices.load(std::memory_order_relaxed);
351 (indices & _privateMask) |
353 ((indices >> _scratchOffset) & _scratchMask) |
355 ((indices & _scratchMask) << _scratchOffset);
356 _indices.compare_exchange_strong(indices, newIndices);
363 constexpr static uint8_t _privateOffset = 0;
364 constexpr static uint8_t _scratchOffset = 2;
365 constexpr static uint8_t _publicOffset = 4;
370 constexpr static uint8_t _privateMask = 0x03;
371 constexpr static uint8_t _scratchMask = 0x0C;
372 constexpr static uint8_t _publicMask = 0x30;
376 std::atomic<uint8_t> _indices;
387 _OutputData(
const _OutputData &) =
delete;
388 _OutputData &operator=(
const _OutputData &) =
delete;
392 explicit _OutputData(
const VdfId oid) {
398 void Reset(
const VdfId oid) {
400 bufferIndices.Reset();
403 invalidationTimestamp =
404 VdfExecutorInvalidationData::InitialInvalidationTimestamp;
408 outputId.store(oid, std::memory_order_release);
413 std::atomic<VdfId> outputId;
417 _BufferIndices bufferIndices;
440 std::atomic<uint8_t> constructionChecksum;
461 _Inbox() : _buffer() {}
467 _buffer.load(std::memory_order_acquire);
485 return _buffer.load(std::memory_order_acquire);
491 if (_buffer.load(std::memory_order_relaxed)) {
492 delete _buffer.exchange(
nullptr);
500 std::atomic<VdfExecutorBufferData *> _buffer;
505 using _LocationsSegment = std::atomic<int> *;
508 static_assert(
sizeof(_OutputData) == 16,
509 "_OutputData expected to be 16 bytes in size.");
520 _LocationsSegment _CreateSegment(
size_t segmentIndex)
const;
527 const VdfId outputId,
529 std::atomic<int> *newLocation)
const;
535 int _CreateData(
const VdfId outputId)
const;
541 const VdfId outputId,
543 std::atomic<int> *newLocation)
const;
548 int _WaitForLocation(
550 std::atomic<int> *newLocation)
const;
554 constexpr static size_t _NumBuffers = 3;
558 constexpr static size_t _InitialDataNum = 1024;
563 constexpr static int _LocationInvalid = -1;
564 constexpr static int _LocationPending = -2;
568 constexpr static uint32_t _TouchedWordBits =
sizeof(uint64_t) * CHAR_BIT;
573 constexpr static size_t _SegmentSize = 4096;
580 std::unique_ptr<std::atomic<_LocationsSegment>[]> _locations;
585 std::atomic<size_t> _numTouched;
586 std::unique_ptr<std::atomic<uint64_t>[]> _touched;
587 tbb::spin_mutex _touchedMutex;
591 using _OutputDataVector =
592 tbb::concurrent_vector<_OutputData, WorkZeroAllocator<_OutputData>>;
593 mutable _OutputDataVector _outputData;
597 using _BufferDataVector = tbb::concurrent_vector<VdfExecutorBufferData>;
598 mutable _BufferDataVector _bufferData[_NumBuffers];
602 using _InboxVector = tbb::concurrent_vector<_Inbox>;
603 mutable _InboxVector _inboxes;
607 using _InvalidationDataVector =
608 tbb::concurrent_vector<VdfExecutorInvalidationData>;
609 mutable _InvalidationDataVector _invalidationData;
617 const VdfId outputId)
const
623 const size_t segmentIndex = outputIndex / _SegmentSize;
627 _LocationsSegment segment =
628 _locations[segmentIndex].load(std::memory_order_acquire);
632 segment = _CreateSegment(segmentIndex);
636 const size_t segmentOffset = outputIndex & (_SegmentSize - 1);
637 std::atomic<int> *location = &segment[segmentOffset];
638 const int currentLocation = location->load(std::memory_order_acquire);
641 if (currentLocation < 0) {
642 return _CreateLocation(outputId, currentLocation, location);
647 else if (outputId != _outputData[currentLocation].outputId.load(
648 std::memory_order_acquire)) {
649 _ResetLocation(outputId, currentLocation, location);
653 return currentLocation;
663 const size_t segmentIndex = outputIndex / _SegmentSize;
666 if (segmentIndex >= _numSegments) {
671 _LocationsSegment segment =
672 _locations[segmentIndex].load(std::memory_order_acquire);
683 const size_t segmentOffset = outputIndex & (_SegmentSize - 1);
684 std::atomic<int> *
const location = &segment[segmentOffset];
685 int currentLocation = location->load(std::memory_order_acquire);
690 if (ARCH_UNLIKELY(currentLocation == _LocationPending)) {
691 currentLocation = _WaitForLocation(currentLocation, location);
695 currentLocation >= 0 &&
696 _outputData[currentLocation].outputId.load(
697 std::memory_order_acquire) == outputId
702PXR_NAMESPACE_CLOSE_SCOPE
This is a data container for executor data managers that uses data stored in vectors indexed by outpu...
VdfExecutorBufferData * GetScratchBufferData(const DataHandle handle) const
Returns the VdfExecutorBufferData associated with the given handle.
size_t GetNumData() const
Returns the number of outputs that have data associated with them.
VDF_API void Resize(const VdfNetwork &network)
Resize the data manager to accommodate the given network.
void PublishScratchBufferData(const DataHandle handle) const
Publishes the scratch VdfExecutorBufferData, and retains the previously public VdfExecutorBufferData.
size_t DataHandle
The data handle type is an index into the internal data vector.
VdfInvalidationTimestamp GetInvalidationTimestamp(const DataHandle handle) const
Returns the VdfInvalidationTimestamp associated with the given handle.
DataHandle GetDataHandle(const VdfId outputId) const
Returns an existing data handle for the given output.
Vdf_ParallelExecutorDataVector()
Constructor.
void ResetTransferredBufferData(const DataHandle handle)
Resets the transferred buffer associated with the given handle.
static const size_t InvalidHandle
This sentinel index denotes an invalid handle.
VdfExecutorInvalidationData * GetInvalidationData(const DataHandle handle) const
Returns the VdfExecutorInvalidationData associated with the given handle.
void SetInvalidationTimestamp(const DataHandle handle, VdfInvalidationTimestamp timestamp)
Sets the invalidation timestamp for the give data handle.
VDF_API ~Vdf_ParallelExecutorDataVector()
Destructor.
bool Untouch(const VdfId outputId)
Marks the data at the given handle as not having been touched by evaluation.
VdfExecutorBufferData * GetPrivateBufferData(const DataHandle handle) const
Returns the VdfExecutorBufferData associated with the given handle.
void PublishPrivateBufferData(const DataHandle handle) const
Publishes the private VdfExecutorBufferData, and retains the previously public VdfExecutorBufferData.
VdfExecutorBufferData * GetTransferredBufferData(const DataHandle handle) const
Returns the transferred VdfExecutorBufferData associated with the given handle.
DataHandle GetOrCreateDataHandle(const VdfId outputId) const
Returns an existing data handle, or creates a new one for the given output.
bool IsTouched(const VdfId outputId) const
Returns true if the data at the given output has been touched by evaluation.
VDF_API void Reset(const DataHandle handle, const VdfId outputId) const
Resets the output data at the given data handle to a newly constructed state.
VdfExecutorBufferData * GetPublicBufferData(const DataHandle handle) const
Returns the VdfExecutorBufferData associated with the given handle.
VDF_API void Clear()
Clears all the data from this manager.
bool TransferBufferData(const DataHandle handle, VdfVector *value, const VdfMask &mask)
Transfers ownership of the value to the output associated with handle.
void Touch(const VdfId outputId)
Marks the data at the given output as having been touched by evaluation.
This object is responsible for storing the executor buffer data, comprised of the executor cache vect...
A VdfMask is placed on connections to specify the data flowing through them.
A VdfNetwork is a collection of VdfNodes and their connections.
static VdfIndex GetIndexFromId(const VdfId id)
Get the output index from the output id.
This class is used to abstract away knowledge of the cache data used for each node.
uint64_t VdfId
The unique identifier type for Vdf objects.
unsigned int VdfInvalidationTimestamp
Type of the timestamp that identifies the most recent round of invalidation.
uint32_t VdfIndex
The index type for Vdf objects.
#define TF_DEV_AXIOM(cond)
The same as TF_AXIOM, but compiled only in dev builds.