Loading...
Searching...
No Matches
parallelExecutorDataVector.h
1//
2// Copyright 2025 Pixar
3//
4// Licensed under the terms set forth in the LICENSE.txt file available at
5// https://openusd.org/license.
6//
7#ifndef PXR_EXEC_VDF_PARALLEL_EXECUTOR_DATA_VECTOR_H
8#define PXR_EXEC_VDF_PARALLEL_EXECUTOR_DATA_VECTOR_H
9
10#include "pxr/pxr.h"
11
12#include "pxr/exec/vdf/api.h"
15#include "pxr/exec/vdf/output.h"
16#include "pxr/exec/vdf/types.h"
17
18#include "pxr/base/work/zeroAllocator.h"
19
20#include <tbb/concurrent_vector.h>
21#include <tbb/spin_mutex.h>
22
23#include <atomic>
24#include <climits>
25#include <cstdint>
26#include <memory>
27
28PXR_NAMESPACE_OPEN_SCOPE
29
30class VdfNetwork;
31
41{
42public:
43
46 using DataHandle = size_t;
47
50 static const size_t InvalidHandle = size_t(-1);
51
55 _numSegments(0),
56 _numTouched(0)
57 {}
58
61 VDF_API
63
69 VDF_API
70 void Resize(const VdfNetwork &network);
71
77 inline DataHandle GetOrCreateDataHandle(const VdfId outputId) const;
78
83 inline DataHandle GetDataHandle(const VdfId outputId) const;
84
91 const uint8_t idx = _outputData[handle].bufferIndices.GetPrivateIndex();
92 return &_bufferData[idx][handle];
93 }
94
101 const uint8_t idx = _outputData[handle].bufferIndices.GetScratchIndex();
102 return &_bufferData[idx][handle];
103 }
104
111 const uint8_t idx = _outputData[handle].bufferIndices.GetPublicIndex();
112 return &_bufferData[idx][handle];
113 }
114
122 void PublishPrivateBufferData(const DataHandle handle) const {
123 _outputData[handle].bufferIndices.PublishPrivateIndex();
124 }
125
133 void PublishScratchBufferData(const DataHandle handle) const {
134 _outputData[handle].bufferIndices.PublishScratchIndex();
135 }
136
145 const DataHandle handle) const {
146 return _inboxes[handle].Get();
147 }
148
162 const DataHandle handle,
163 VdfVector *value,
164 const VdfMask &mask) {
165 return _inboxes[handle].Take(value, mask);
166 }
167
176 _inboxes[handle].Reset();
177 }
178
185 VdfExecutorInvalidationData *GetInvalidationData(
186 const DataHandle handle) const {
187 return &_invalidationData[handle];
188 }
189
197 const DataHandle handle) const {
198 return _outputData[handle].invalidationTimestamp;
199 }
200
209 const DataHandle handle,
210 VdfInvalidationTimestamp timestamp) {
211 _outputData[handle].invalidationTimestamp = timestamp;
212 }
213
217 bool IsTouched(const VdfId outputId) const {
218 const VdfIndex outputIdx = VdfOutput::GetIndexFromId(outputId);
219 const uint32_t idx = outputIdx / _TouchedWordBits;
220 const uint64_t bit =
221 UINT64_C(1) << (outputIdx & (_TouchedWordBits - 1));
222 return
223 idx < _numTouched.load(std::memory_order_acquire) &&
224 (_touched[idx].load(std::memory_order_relaxed) & bit) != 0;
225 }
226
230 void Touch(const VdfId outputId) {
231 // Lazily initialize the touched array, if necessary.
232 if (ARCH_UNLIKELY(_numTouched.load(std::memory_order_acquire) == 0)) {
233 _GrowTouched();
234 }
235
236 const VdfIndex outputIdx = VdfOutput::GetIndexFromId(outputId);
237 const uint32_t idx = outputIdx / _TouchedWordBits;
238 const uint64_t bit =
239 UINT64_C(1) << (outputIdx & (_TouchedWordBits - 1));
240 if ((_touched[idx].load(std::memory_order_relaxed) & bit) == 0) {
241 _touched[idx].fetch_or(bit);
242 }
243 }
244
251 bool Untouch(const VdfId outputId) {
252 const VdfIndex outputIdx = VdfOutput::GetIndexFromId(outputId);
253 const uint32_t idx = outputIdx / _TouchedWordBits;
254 const uint64_t bit =
255 UINT64_C(1) << (outputIdx & (_TouchedWordBits - 1));
256 if (idx < _numTouched.load(std::memory_order_acquire) &&
257 (_touched[idx].load(std::memory_order_relaxed) & bit) != 0) {
258 return (_touched[idx].fetch_and(~bit) & bit) != 0;
259 }
260 return false;
261 }
262
268 size_t GetNumData() const {
269 return _outputData.size();
270 }
271
276 VDF_API
277 void Reset(const DataHandle handle, const VdfId outputId) const;
278
284 VDF_API
285 void Clear();
286
287private:
288
289 // An 8-bit field containing indices into the executor buffer data array.
290 //
291 class _BufferIndices
292 {
293 public:
294
295 // Default constructor.
296 //
297 _BufferIndices() {
298 Reset();
299 }
300
301 // Reset the buffer indices.
302 //
303 void Reset() {
304 _indices =
305 (0 << _privateOffset) |
306 (1 << _scratchOffset) |
307 (2 << _publicOffset);
308 }
309
310 // Returns the private index.
311 //
312 const uint8_t GetPrivateIndex() const {
313 return (_indices.load(std::memory_order_acquire) & _privateMask) >>
314 _privateOffset;
315 }
316
317 // Returns the scratch index.
318 //
319 const uint8_t GetScratchIndex() const {
320 return (_indices.load(std::memory_order_acquire) & _scratchMask) >>
321 _scratchOffset;
322 }
323
324 // Returns the public index.
325 //
326 const uint8_t GetPublicIndex() const {
327 return (_indices.load(std::memory_order_acquire) & _publicMask) >>
328 _publicOffset;
329 }
330
331 // Swaps the private buffer with the public buffer index.
332 //
333 void PublishPrivateIndex() {
334 uint8_t indices = _indices.load(std::memory_order_relaxed);
335 uint8_t newIndices =
336 // Scratch index stays.
337 (indices & _scratchMask) |
338 // Public index replaces the private index.
339 ((indices >> _publicOffset) & _privateMask) |
340 // Private index replaces the public index.
341 ((indices & _privateMask) << _publicOffset);
342 _indices.compare_exchange_strong(indices, newIndices);
343 }
344
345 // Swaps the scratch buffer with the public buffer index.
346 //
347 void PublishScratchIndex() {
348 uint8_t indices = _indices.load(std::memory_order_relaxed);
349 uint8_t newIndices =
350 // Private index stays.
351 (indices & _privateMask) |
352 // Public index replaces the scratch index.
353 ((indices >> _scratchOffset) & _scratchMask) |
354 // Scratch index replaces the public index.
355 ((indices & _scratchMask) << _scratchOffset);
356 _indices.compare_exchange_strong(indices, newIndices);
357 }
358
359 private:
360
361 // The bit offsets into the indices bitset.
362 //
363 constexpr static uint8_t _privateOffset = 0;
364 constexpr static uint8_t _scratchOffset = 2;
365 constexpr static uint8_t _publicOffset = 4;
366
367 // The bitmasks for each entry in the indices bitset. Two bits per
368 // buffer index.
369 //
370 constexpr static uint8_t _privateMask = 0x03;
371 constexpr static uint8_t _scratchMask = 0x0C;
372 constexpr static uint8_t _publicMask = 0x30;
373
374 // The buffer indices packed into a bitset.
375 //
376 std::atomic<uint8_t> _indices;
377 };
378
379 // The generic output data stored for each entry in this container. Note,
380 // the memory underpinning this structure must be zero-initialized. This
381 // is required for the "constructed flags" to work properly.
382 //
383 struct _OutputData
384 {
385 // Noncopyable.
386 //
387 _OutputData(const _OutputData &) = delete;
388 _OutputData &operator=(const _OutputData &) = delete;
389
390 // Default constructor.
391 //
392 explicit _OutputData(const VdfId oid) {
393 Reset(oid);
394 }
395
396 // Reset the output data to its defaul constructed state.
397 //
398 void Reset(const VdfId oid) {
399 // Set the default buffer indices.
400 bufferIndices.Reset();
401
402 // Reset the invalidation timestamp.
403 invalidationTimestamp =
404 VdfExecutorInvalidationData::InitialInvalidationTimestamp;
405
406 // Enforce release semantics on the outputId to synchronize the
407 // non-atomic and dependent atomic write above.
408 outputId.store(oid, std::memory_order_release);
409 }
410
411 // The output id.
412 //
413 std::atomic<VdfId> outputId;
414
415 // The buffer indices.
416 //
417 _BufferIndices bufferIndices;
418
419 // A zero-initialized checksum to synchronize the corresponding data
420 // construction. The checksum will be incremented after construction of
421 // each piece of data. Once the checksum has reached a specific value,
422 // all data is guaranteed to be constructed.
423 //
424 // The correctness of the checksum implementation requires that this
425 // member be initialized to zero prior to becoming visible to other
426 // threads. Due to the design of concurrent_vector, zero_allocator is
427 // used to ensure that the storage is zeroed but the lifetime of the
428 // _OutputData object is not guaranteed to begin before other threads
429 // are able to observe its entry in the vector. Aside from this
430 // undefined behavior, there is another issue. As of C++20,
431 // std::atomic's default constructor does value-initialization. This
432 // zeroes the checksum again *after* other threads may have already
433 // incremented its value. Placing the checksum in a union and
434 // omitting any explicit initialization in the _OutputData constructor
435 // dodges this specific re-zeroing problem. However, the workaround
436 // cannot solve the fundamental issue of object lifetime outlined
437 // above.
438 //
439 union {
440 std::atomic<uint8_t> constructionChecksum;
441 };
442
443 // The invalidation timestamp. We store this information in the
444 // generic data vector in order to make it available during evaluation
445 // (mung buffer locking).
446 //
447 VdfInvalidationTimestamp invalidationTimestamp;
448
449 };
450
451 // A simple container that contains output values that had their ownership
452 // transferred into this data manager. The operations provided on this
453 // class are thread-safe.
454 //
455 class _Inbox
456 {
457 public:
458
459 // Constructor.
460 //
461 _Inbox() : _buffer() {}
462
463 // Destructor.
464 //
465 ~_Inbox() {
466 VdfExecutorBufferData *buffer =
467 _buffer.load(std::memory_order_acquire);
468 if (buffer) {
469 delete buffer;
470 }
471 }
472
473 // Takes ownership of the \p value, and returns \c true if ownership
474 // has successfully been transferred. Note that this instance will
475 // assume responsibility of lifetime management over \p value if this
476 // operation succeeds. Otherwise, the call site will maintain this
477 // responsibility.
478 //
479 VDF_API
480 bool Take(VdfVector *value, const VdfMask &mask);
481
482 // Gets the current value.
483 //
484 VdfExecutorBufferData *Get() const {
485 return _buffer.load(std::memory_order_acquire);
486 }
487
488 // Clears out the inbox.
489 //
490 void Reset() {
491 if (_buffer.load(std::memory_order_relaxed)) {
492 delete _buffer.exchange(nullptr);
493 }
494 }
495
496 private:
497
498 // The buffer. May be nullptr, if this inbox is empty.
499 //
500 std::atomic<VdfExecutorBufferData *> _buffer;
501
502 };
503
504 // Type of each segment in the locations array. An array of atomic integers.
505 using _LocationsSegment = std::atomic<int> *;
506
507 // Expected to be 16 bytes in size.
508 static_assert(sizeof(_OutputData) == 16,
509 "_OutputData expected to be 16 bytes in size.");
510
511 // Resize the touched bitset to accommodate touching any output currently
512 // in the network. It is safe to call this concurrently.
513 //
514 VDF_API
515 void _GrowTouched();
516
517 // Atomically create a new locations segment and return its pointer.
518 //
519 VDF_API
520 _LocationsSegment _CreateSegment(size_t segmentIndex) const;
521
522 // Create a new location index and its corresponding data for the output
523 // with the given output id.
524 //
525 VDF_API
526 int _CreateLocation(
527 const VdfId outputId,
528 int currentLocation,
529 std::atomic<int> *newLocation) const;
530
531 // Pushes a new data entry into the internal vectors for the output with
532 // the given outputId.
533 //
534 VDF_API
535 int _CreateData(const VdfId outputId) const;
536
537 // Reset the data stored at the location.
538 //
539 VDF_API
540 void _ResetLocation(
541 const VdfId outputId,
542 int currentLocation,
543 std::atomic<int> *newLocation) const;
544
545 // Wait for the location entry to become available.
546 //
547 VDF_API
548 int _WaitForLocation(
549 int currentLocation,
550 std::atomic<int> *newLocation) const;
551
552 // The number of output buffers (public, private, scratch).
553 //
554 constexpr static size_t _NumBuffers = 3;
555
556 // The initial number of entries reserved in the data vectors.
557 //
558 constexpr static size_t _InitialDataNum = 1024;
559
560 // Sentinels for invalid (not yet created) and pending (currently being
561 // created) location indices.
562 //
563 constexpr static int _LocationInvalid = -1;
564 constexpr static int _LocationPending = -2;
565
566 // The number of bits in a word of the touched array.
567 //
568 constexpr static uint32_t _TouchedWordBits = sizeof(uint64_t) * CHAR_BIT;
569
570 // The size of a segment in the segmented locations array. Must be a power
571 // of two.
572 //
573 constexpr static size_t _SegmentSize = 4096;
574
575 // The locations array, mapping from output index to output data,
576 // evaluation data and invalidation data index. The array is segmented, and
577 // segments will be lazily allocated.
578 //
579 size_t _numSegments;
580 std::unique_ptr<std::atomic<_LocationsSegment>[]> _locations;
581
582 // The touched bitset. This data needs to be stored outside of _OutputData
583 // to allow us to set bits concurrently from speculation node evaluation.
584 //
585 std::atomic<size_t> _numTouched;
586 std::unique_ptr<std::atomic<uint64_t>[]> _touched;
587 tbb::spin_mutex _touchedMutex;
588
589 // The output data.
590 //
591 using _OutputDataVector =
592 tbb::concurrent_vector<_OutputData, WorkZeroAllocator<_OutputData>>;
593 mutable _OutputDataVector _outputData;
594
595 // The arrays of buffer data corresponding with the output data.
596 //
597 using _BufferDataVector = tbb::concurrent_vector<VdfExecutorBufferData>;
598 mutable _BufferDataVector _bufferData[_NumBuffers];
599
600 // The array of inboxes corresponding with the output data.
601 //
602 using _InboxVector = tbb::concurrent_vector<_Inbox>;
603 mutable _InboxVector _inboxes;
604
605 // The invalidation specific data corresponding with the output data.
606 //
607 using _InvalidationDataVector =
608 tbb::concurrent_vector<VdfExecutorInvalidationData>;
609 mutable _InvalidationDataVector _invalidationData;
610
611};
612
614
617 const VdfId outputId) const
618{
619 // Get the output index.
620 const VdfIndex outputIndex = VdfOutput::GetIndexFromId(outputId);
621
622 // Compute the index to the segment.
623 const size_t segmentIndex = outputIndex / _SegmentSize;
624 TF_DEV_AXIOM(segmentIndex < _numSegments);
625
626 // Retrieve the segment.
627 _LocationsSegment segment =
628 _locations[segmentIndex].load(std::memory_order_acquire);
629
630 // Allocate the segment, if required.
631 if (!segment) {
632 segment = _CreateSegment(segmentIndex);
633 }
634
635 // Using the output index, look up the location in the data vector.
636 const size_t segmentOffset = outputIndex & (_SegmentSize - 1);
637 std::atomic<int> *location = &segment[segmentOffset];
638 const int currentLocation = location->load(std::memory_order_acquire);
639
640 // Create a new entry, if the location is still uninitialized.
641 if (currentLocation < 0) {
642 return _CreateLocation(outputId, currentLocation, location);
643 }
644
645 // Make sure the output id matches at the location, and reset the entry
646 // if there is a mismatch.
647 else if (outputId != _outputData[currentLocation].outputId.load(
648 std::memory_order_acquire)) {
649 _ResetLocation(outputId, currentLocation, location);
650 }
651
652 // Return the existing location.
653 return currentLocation;
654}
655
658{
659 // Get the output index.
660 const VdfIndex outputIndex = VdfOutput::GetIndexFromId(outputId);
661
662 // Compute the index to the segment.
663 const size_t segmentIndex = outputIndex / _SegmentSize;
664
665 // If the index is out of bounds, we can bail out right away.
666 if (segmentIndex >= _numSegments) {
667 return InvalidHandle;
668 }
669
670 // Retrieve the segment.
671 _LocationsSegment segment =
672 _locations[segmentIndex].load(std::memory_order_acquire);
673
674 // If the segment is not allocated, we can bail out right away.
675 if (!segment) {
676 return InvalidHandle;
677 }
678
679 // If the location points to a valid entry in the data vector, and the
680 // data at that index matches the output id, we can return the data.
681 // Otherwise, the location may either be garbage, or the output version
682 // may have changed.
683 const size_t segmentOffset = outputIndex & (_SegmentSize - 1);
684 std::atomic<int> * const location = &segment[segmentOffset];
685 int currentLocation = location->load(std::memory_order_acquire);
686
687 // Because of the ABA problems that can occur when we call
688 // _ResetLocation, it's possible to observe _LocationPending
689 // as the location for this output. In this case we have to wait
690 if (ARCH_UNLIKELY(currentLocation == _LocationPending)) {
691 currentLocation = _WaitForLocation(currentLocation, location);
692 }
693
694 return
695 currentLocation >= 0 &&
696 _outputData[currentLocation].outputId.load(
697 std::memory_order_acquire) == outputId
698 ? currentLocation
700}
701
702PXR_NAMESPACE_CLOSE_SCOPE
703
704#endif
This is a data container for executor data managers that uses data stored in vectors indexed by outpu...
VdfExecutorBufferData * GetScratchBufferData(const DataHandle handle) const
Returns the VdfExecutorBufferData associated with the given handle.
size_t GetNumData() const
Returns the number of outputs that have data associated with them.
VDF_API void Resize(const VdfNetwork &network)
Resize the data manager to accommodate the given network.
void PublishScratchBufferData(const DataHandle handle) const
Publishes the scratch VdfExecutorBufferData, and retains the previously public VdfExecutorBufferData.
size_t DataHandle
The data handle type is an index into the internal data vector.
VdfInvalidationTimestamp GetInvalidationTimestamp(const DataHandle handle) const
Returns the VdfInvalidationTimestamp associated with the given handle.
DataHandle GetDataHandle(const VdfId outputId) const
Returns an existing data handle for the given output.
void ResetTransferredBufferData(const DataHandle handle)
Resets the transferred buffer associated with the given handle.
static const size_t InvalidHandle
This sentinel index denotes an invalid handle.
VdfExecutorInvalidationData * GetInvalidationData(const DataHandle handle) const
Returns the VdfExecutorInvalidationData associated with the given handle.
void SetInvalidationTimestamp(const DataHandle handle, VdfInvalidationTimestamp timestamp)
Sets the invalidation timestamp for the give data handle.
VDF_API ~Vdf_ParallelExecutorDataVector()
Destructor.
bool Untouch(const VdfId outputId)
Marks the data at the given handle as not having been touched by evaluation.
VdfExecutorBufferData * GetPrivateBufferData(const DataHandle handle) const
Returns the VdfExecutorBufferData associated with the given handle.
void PublishPrivateBufferData(const DataHandle handle) const
Publishes the private VdfExecutorBufferData, and retains the previously public VdfExecutorBufferData.
VdfExecutorBufferData * GetTransferredBufferData(const DataHandle handle) const
Returns the transferred VdfExecutorBufferData associated with the given handle.
DataHandle GetOrCreateDataHandle(const VdfId outputId) const
Returns an existing data handle, or creates a new one for the given output.
bool IsTouched(const VdfId outputId) const
Returns true if the data at the given output has been touched by evaluation.
VDF_API void Reset(const DataHandle handle, const VdfId outputId) const
Resets the output data at the given data handle to a newly constructed state.
VdfExecutorBufferData * GetPublicBufferData(const DataHandle handle) const
Returns the VdfExecutorBufferData associated with the given handle.
VDF_API void Clear()
Clears all the data from this manager.
bool TransferBufferData(const DataHandle handle, VdfVector *value, const VdfMask &mask)
Transfers ownership of the value to the output associated with handle.
void Touch(const VdfId outputId)
Marks the data at the given output as having been touched by evaluation.
This object is responsible for storing the executor buffer data, comprised of the executor cache vect...
A VdfMask is placed on connections to specify the data flowing through them.
Definition: mask.h:37
A VdfNetwork is a collection of VdfNodes and their connections.
Definition: network.h:60
static VdfIndex GetIndexFromId(const VdfId id)
Get the output index from the output id.
Definition: output.h:109
This class is used to abstract away knowledge of the cache data used for each node.
Definition: vector.h:56
uint64_t VdfId
The unique identifier type for Vdf objects.
Definition: types.h:107
unsigned int VdfInvalidationTimestamp
Type of the timestamp that identifies the most recent round of invalidation.
Definition: types.h:74
uint32_t VdfIndex
The index type for Vdf objects.
Definition: types.h:110
#define TF_DEV_AXIOM(cond)
The same as TF_AXIOM, but compiled only in dev builds.
Definition: diagnostic.h:205