Loading...
Searching...
No Matches
parallelExecutorEngineBase.h
Go to the documentation of this file.
1//
2// Copyright 2025 Pixar
3//
4// Licensed under the terms set forth in the LICENSE.txt file available at
5// https://openusd.org/license.
6//
7#ifndef PXR_EXEC_VDF_PARALLEL_EXECUTOR_ENGINE_BASE_H
8#define PXR_EXEC_VDF_PARALLEL_EXECUTOR_ENGINE_BASE_H
9
11
12#include "pxr/pxr.h"
13
22#include "pxr/exec/vdf/mask.h"
24#include "pxr/exec/vdf/node.h"
25#include "pxr/exec/vdf/output.h"
29#include "pxr/exec/vdf/vector.h"
30
34#include "pxr/base/work/loops.h"
36#include "pxr/base/work/taskGraph.h"
37
38#include <tbb/concurrent_vector.h>
39
40PXR_NAMESPACE_OPEN_SCOPE
41
42// Use this macro to enable tracing in the executor engine.
43#define PEE_TRACE_SCOPE(x)
44
57template < typename Derived, typename DataManager >
59{
61
62public:
66 const VdfParallelExecutorEngineBase &) = delete;
68 const VdfParallelExecutorEngineBase &) = delete;
69
70
74 const VdfExecutorInterface &executor,
75 DataManager *dataManager);
76
80
85 const VdfSchedule &schedule,
86 const VdfRequest &computeRequest,
87 VdfExecutorErrorLogger *errorLogger) {
89 schedule, computeRequest, errorLogger,
90 [](const VdfMaskedOutput &, size_t){});
91 }
92
98 template < typename Callback >
100 const VdfSchedule &schedule,
101 const VdfRequest &computeRequest,
102 VdfExecutorErrorLogger *errorLogger,
103 Callback &&callback);
104
105protected:
106 // The data handle type from the data manager implementation.
107 typedef typename DataManager::DataHandle _DataHandle;
108
109 // An integer type for storing the current per-task evaluation stage.
110 typedef uint32_t _EvaluationStage;
111
112 // A leaf task, i.e. the entry point for parallel evaluation.
113 template < typename Callback >
114 class _LeafTask : public WorkTaskGraph::BaseTask
115 {
116 public:
117 _LeafTask(
118 This *engine,
119 const VdfEvaluationState &state,
120 const VdfMaskedOutput &output,
121 const size_t requestedIndex,
122 Callback &callback) :
123 _engine(engine),
124 _state(state),
125 _output(output),
126 _requestedIndex(requestedIndex),
127 _callback(callback),
128 _evaluationStage(0)
129 {}
130
131 // Task execution entry point.
132 WorkTaskGraph::BaseTask * execute() override;
133
134 private:
135 This *_engine;
136 const VdfEvaluationState &_state;
137 const VdfMaskedOutput &_output;
138 const size_t _requestedIndex;
139 Callback &_callback;
140 _EvaluationStage _evaluationStage;
141 };
142
143 // A scheduled compute task.
144 class _ComputeTask : public WorkTaskGraph::BaseTask
145 {
146 public:
147 _ComputeTask(
148 This *engine,
149 const VdfEvaluationState &state,
150 const VdfNode &node,
151 VdfScheduleTaskId taskIndex) :
152 _engine(engine),
153 _state(state),
154 _node(node),
155 _taskIndex(taskIndex),
156 _evaluationStage(0)
157 {}
158
159 // Task execution entry point.
160 WorkTaskGraph::BaseTask *execute() override;
161
162 private:
163 This *_engine;
164 const VdfEvaluationState &_state;
165 const VdfNode &_node;
166 VdfScheduleTaskId _taskIndex;
167 _EvaluationStage _evaluationStage;
168 };
169
170 // A scheduled inputs task.
171 class _InputsTask : public WorkTaskGraph::BaseTask
172 {
173 public:
174 _InputsTask(
175 This *engine,
176 const VdfEvaluationState &state,
177 const VdfNode &node,
178 VdfScheduleTaskIndex taskIndex) :
179 _engine(engine),
180 _state(state),
181 _node(node),
182 _taskIndex(taskIndex),
183 _evaluationStage(0)
184 {}
185
186 // Task execution entry point.
187 WorkTaskGraph::BaseTask *execute();
188
189 private:
190 This *_engine;
191 const VdfEvaluationState &_state;
192 const VdfNode &_node;
193 VdfScheduleTaskIndex _taskIndex;
194 _EvaluationStage _evaluationStage;
195 };
196
197 // A scheduled keep task.
198 class _KeepTask : public WorkTaskGraph::BaseTask
199 {
200 public:
201 _KeepTask(
202 This *engine,
203 const VdfEvaluationState &state,
204 const VdfNode &node,
205 VdfScheduleTaskIndex taskIndex) :
206 _engine(engine),
207 _state(state),
208 _node(node),
209 _taskIndex(taskIndex),
210 _evaluationStage(0)
211 {}
212
213 // Task execution entry point.
214 WorkTaskGraph::BaseTask *execute();
215
216 private:
217 This *_engine;
218 const VdfEvaluationState &_state;
219 const VdfNode &_node;
220 VdfScheduleTaskIndex _taskIndex;
221 _EvaluationStage _evaluationStage;
222 };
223
224 // A touch-task for touching all outputs between a from-buffer source
225 // and a destination output.
226 class _TouchTask : public WorkTaskGraph::BaseTask
227 {
228 public:
229 _TouchTask(
230 This *engine,
231 const VdfOutput &dest,
232 const VdfOutput &source) :
233 _engine(engine),
234 _dest(dest),
235 _source(source)
236 {}
237
238 // Task execution entry point.
239 WorkTaskGraph::BaseTask *execute();
240
241 private:
242 This *_engine;
243 const VdfOutput &_dest;
244 const VdfOutput &_source;
245 };
246
247 // A task that invokes all compute tasks scheduled for a particular node.
248 class _ComputeAllTask : public WorkTaskGraph::BaseTask {
249 public:
250 _ComputeAllTask(
251 This *engine,
252 const VdfEvaluationState &state,
253 const VdfNode &node) :
254 _engine(engine),
255 _state(state),
256 _node(node),
257 _completed(false)
258 {}
259
260 WorkTaskGraph::BaseTask *execute();
261
262 private:
263 This *_engine;
264 const VdfEvaluationState &_state;
265 const VdfNode &_node;
266 bool _completed;
267 };
268
269 // Reset the engine's internal state. Every round of evaluation starts with
270 // clean state.
271 void _ResetState(const VdfSchedule &schedule);
272
273 // Run a single, requested output. If the output is uncached, this will
274 // reset the internal state (if not already done), and add the leaf task to
275 // the task list.
276 template < typename Callback >
277 void _RunOutput(
278 const VdfEvaluationState &state,
279 const VdfMaskedOutput &maskedOutput,
280 const size_t requestedIndex,
281 Callback &callback,
282 WorkTaskGraph::TaskList *taskList);
283
284 // Spawn the task(s) requested for a given node. These are the tasks spawn
285 // as entry points into evaluating the schedule. Remaining tasks will be
286 // spawn as input dependencies to these requested tasks.
287 void _SpawnRequestedTasks(
288 const VdfEvaluationState &state,
289 const VdfNode &node,
290 WorkTaskGraph::BaseTask *successor,
291 WorkTaskGraph::BaseTask **bypass);
292
293 // Spawn a new task, or assign the task to the bypass output parameter,
294 // if no task has previously been assigned to bypass. The output
295 // parameter can later be used to drive scheduler bypassing in order to
296 // reduce scheduling overhead.
297 void _SpawnOrBypass(
299 WorkTaskGraph::BaseTask **bypass);
300
301 // The task execution entry point for the scheduled leaf tasks. These tasks
302 // are the main entry points to evaluation. The engine will spawn one leaf
303 // task for each uncached requested output. Returns true if the task is not
304 // done after returning, and must therefore be recycled for re-execution
305 // after all its input dependencies have been completed.
306 template < typename Callback >
307 bool _ProcessLeafTask(
309 const VdfEvaluationState &state,
310 const VdfMaskedOutput &maskedOutput,
311 const size_t requestedIndex,
312 Callback &callback,
313 _EvaluationStage *evaluationStage,
314 WorkTaskGraph::BaseTask **bypass);
315
316 // The task execution entry point for scheduled compute tasks. Returns
317 // true if the task is not done after returning, and must therefore be
318 // recycled for re-execution after all its input dependencies have been
319 // completed.
320 bool _ProcessComputeTask(
322 const VdfEvaluationState &state,
323 const VdfNode &node,
324 const VdfScheduleComputeTask &scheduleTask,
325 _EvaluationStage *evaluationStage,
326 WorkTaskGraph::BaseTask **bypass);
327
328 // The task execution entry point for scheduled inputs tasks. Returns
329 // true if the task is not done after returning, and must therefore be
330 // recycled for re-execution after all its input dependencies have been
331 // completed.
332 bool _ProcessInputsTask(
334 const VdfEvaluationState &state,
335 const VdfNode &node,
336 const VdfScheduleInputsTask &scheduleTask,
337 _EvaluationStage *evaluationStage,
338 WorkTaskGraph::BaseTask **bypass);
339
340 // The task execution entry point for scheduled keep tasks. Returns
341 // true if the task is not done after returning, and must therefore be
342 // recycled for re-execution after all its input dependencies have been
343 // completed.
344 bool _ProcessKeepTask(
346 const VdfEvaluationState &state,
347 const VdfNode &node,
348 _EvaluationStage *evaluationStage,
349 WorkTaskGraph::BaseTask **bypass);
350
351 // Invokes a keep task, as an input dependency to the successor task.
352 // Returns true if the successor must wait for completion of the newly
353 // invoked task. If this method returns false, the input dependency
354 // has already been fulfilled.
355 bool _InvokeKeepTask(
356 const VdfScheduleTaskIndex idx,
357 const VdfNode &node,
358 const VdfEvaluationState &state,
359 WorkTaskGraph::BaseTask *successor,
360 WorkTaskGraph::BaseTask **bypass);
361
362 // Invokes a touch task, touching all outputs between dest and source. The
363 // touching happens in the background. Only the root task synchronizes on
364 // this work.
365 void _InvokeTouchTask(
366 const VdfOutput &dest,
367 const VdfOutput &source);
368
369 // Invokes a compute task, as an input dependency to the successor task.
370 // Returns true if the successor must wait for completion of the newly
371 // invoked task. If this method returns false, the input dependency
372 // has already been fulfilled.
373 bool _InvokeComputeTask(
374 const VdfScheduleTaskId taskIndex,
375 const VdfEvaluationState &state,
376 const VdfNode &node,
377 WorkTaskGraph::BaseTask *successor,
378 WorkTaskGraph::BaseTask **bypass);
379
380 // Calls _InvokeComputeTask on an iterable range of tasks.
381 template < typename Iterable >
382 bool _InvokeComputeTasks(
383 const Iterable &tasks,
384 const VdfEvaluationState &state,
385 const VdfNode &node,
386 WorkTaskGraph::BaseTask *successor,
387 WorkTaskGraph::BaseTask **bypass);
388
389 // Check whether the output attached to the input dependency has already
390 // been cached.
391 bool _IsInputDependencyCached(
393 const VdfOutput &output,
394 const VdfMask &mask);
395
396 // Calls _InvokeComputeTask on a range of tasks specified by input.
397 // Alternatively, if input specifies a keep task, this method will invoke
398 // the keep task instead.
399 bool _InvokeComputeOrKeepTasks(
400 const VdfScheduleInputDependency &input,
401 const VdfEvaluationState &state,
402 WorkTaskGraph::BaseTask *successor,
403 WorkTaskGraph::BaseTask **bypass);
404
405 // Calls _InvokeComputeTask on a range of tasks providing values for the
406 // specified output. Alternatively, if the values for the specified output
407 // are being provided by a keep task, this method will invoke the keep task
408 // instead.
409 bool _InvokeComputeOrKeepTasks(
410 const VdfOutput &output,
411 const VdfEvaluationState &state,
412 WorkTaskGraph::BaseTask *successor,
413 WorkTaskGraph::BaseTask **bypass);
414
415 // Invokes all the compute tasks required to fulfill all prereq
416 // dependencies. Returns true if the successor must wait for completion of
417 // the newly invoked tasks. If this method returns false, the input
418 // dependencies have already been fulfilled.
419 bool _InvokePrereqInputs(
420 const VdfScheduleInputsTask &scheduleTask,
421 const VdfEvaluationState &state,
422 WorkTaskGraph::BaseTask *successor,
423 WorkTaskGraph::BaseTask **bypass);
424
425 // Invokes all the compute tasks required to fulfill all optional input
426 // dependencies (those dependent on the results of prereqs). Returns true
427 // if the successor must wait for completion of the newly invoked tasks. If
428 // this method returns false, the input dependencies have already been
429 // fulfilled.
430 bool _InvokeOptionalInputs(
431 const VdfScheduleInputsTask &scheduleTask,
432 const VdfEvaluationState &state,
433 const VdfNode &node,
434 WorkTaskGraph::BaseTask *successor,
435 WorkTaskGraph::BaseTask **bypass);
436
437 // Invokes all the compute tasks required to fulfill all required input
438 // dependencies (those not dependent on prereqs, and read/writes). Returns
439 // true if the successor must wait for completion of the newly invoked
440 // tasks. If this method returns false, the input dependencies have already
441 // been fulfilled.
442 bool _InvokeRequiredInputs(
443 const VdfScheduleComputeTask &scheduleTask,
444 const VdfEvaluationState &state,
445 WorkTaskGraph::BaseTask *successor,
446 WorkTaskGraph::BaseTask **bypass);
447
448 // Invokes an inputs task, as an input dependency to the successor task.
449 // Returns true if the successor must wait for completion of the newly
450 // invoked task. If this method returns false, the input dependency
451 // has already been fulfilled.
452 bool _InvokeInputsTask(
453 const VdfScheduleComputeTask &scheduleTask,
454 const VdfEvaluationState &state,
455 const VdfNode &node,
456 WorkTaskGraph::BaseTask *successor,
457 WorkTaskGraph::BaseTask **bypass);
458
459 // Invokes a task that prepares a node for execution, as an input
460 // dependency to the successor task. Returns true if the successor must
461 // wait for completion of the newly invoked task. If this method returns
462 // false, the input dependency has already been fulfilled.
463 bool _InvokePrepTask(
464 const VdfScheduleComputeTask &scheduleTask,
465 const VdfEvaluationState &state,
466 const VdfNode &node,
467 WorkTaskGraph::BaseTask *successor);
468
469 // Prepares a node for execution. Every node has to be prepared exactly
470 // once. Nodes with multiple invocations will be prepared by the first
471 // compute task that gets to the node preparation stage.
472 void _PrepareNode(
473 const VdfEvaluationState &state,
474 const VdfNode &node);
475
476 // Prepares an output for execution.
477 void _PrepareOutput(
478 const VdfSchedule &schedule,
479 const VdfSchedule::OutputId outputId);
480
481 // Create the cache for the scratch buffer. This will make sure the cache
482 // can accomodate all the data denoted by mask.
483 void _CreateScratchCache(
484 const VdfOutput &output,
485 const _DataHandle dataHandle,
486 const VdfMask &mask,
487 VdfExecutorBufferData *scratchBuffer);
488
489 // Evaluate a node by either invoking its Compute() method, or passing
490 // through all data.
491 void _EvaluateNode(
492 const VdfScheduleComputeTask &scheduleTask,
493 const VdfEvaluationState &state,
494 const VdfNode &node,
495 WorkTaskGraph::BaseTask *successor);
496
497 // Compute a node by invoking its Compute() method.
498 void _ComputeNode(
499 const VdfScheduleComputeTask &scheduleTask,
500 const VdfEvaluationState &state,
501 const VdfNode &node);
502
503 // Pass all the read/write data through the node.
504 void _PassThroughNode(
505 const VdfScheduleComputeTask &scheduleTask,
506 const VdfEvaluationState &state,
507 const VdfNode &node);
508
509 // Process an output after execution.
510 void _ProcessOutput(
511 const VdfScheduleComputeTask &scheduleTask,
512 const VdfEvaluationState &state,
513 const VdfOutput &output,
514 const VdfSchedule::OutputId outputId,
515 const _DataHandle dataHandle,
516 const bool hasAssociatedInput,
517 VdfExecutorBufferData *privateBuffer);
518
519 // Prepares a read/write buffer by ensure that the private data is
520 // available at the output.
521 void _PrepareReadWriteBuffer(
522 const VdfOutput &output,
523 const VdfSchedule::OutputId outputId,
524 const VdfMask &mask,
525 const VdfSchedule &schedule,
526 VdfExecutorBufferData *privateBuffer);
527
528 // Pass a read/write buffer from the source output to the destination
529 // output, or copy the data if required.
530 void _PassOrCopyBuffer(
531 const VdfOutput &output,
532 const VdfOutput &source,
533 const VdfMask &inputMask,
534 const VdfSchedule &schedule,
535 VdfExecutorBufferData *privateBuffer);
536
537 // Pass a read/write buffer from the source buffer to the destination
538 // buffer.
539 //
540 void _PassBuffer(
541 VdfExecutorBufferData *fromBuffer,
542 VdfExecutorBufferData *toBuffer) const;
543
544 // Copy a read/write buffer from the source output to the destination
545 // output.
546 void _CopyBuffer(
547 const VdfOutput &output,
548 const VdfOutput &source,
549 const VdfMask &fromMask,
550 VdfExecutorBufferData *toData) const;
551
552 // Publish the data in the scratch buffers of this node.
553 void _PublishScratchBuffers(
554 const VdfSchedule &schedule,
555 const VdfNode &node);
556
557 // Copies all of the publicly available data missing from \p haveMask into
558 // the scratch buffer and extends the executor cache mask. Returns a pointer
559 // to the destination vector if any data was copied.
560 VdfVector *_AbsorbPublicBuffer(
561 const VdfOutput &output,
562 const _DataHandle dataHandle,
563 const VdfMask &haveMask);
564
565 // Detects interruption by querying the executor interruption API and
566 // calling into the derived engine to do cycle detection. Sets the
567 // interruption flag if interruption (or a cycle) has been detected.
568 bool _DetectInterruption(
569 const VdfEvaluationState &state,
570 const VdfNode &node);
571
572 // Returns true if the interruption flag (as determined by
573 // _DetectInterruption()) has been set.
574 bool _HasDetectedInterruption() const;
575
576 // Create an error transport out of an error mark to enable transferring
577 // the errors to the calling thread later on.
578 void _TransportErrors(const TfErrorMark &errorMark);
579
580 // Post all the transported errors on the calling thread.
581 void _PostTransportedErrors();
582
583 // Returns a reference to the derived class for static polymorphism.
584 Derived &_Self() {
585 return *static_cast<Derived *>(this);
586 }
587
588 // The executor that uses this engine.
589 const VdfExecutorInterface &_executor;
590
591 // The data manager populated by this engine.
592 DataManager *_dataManager;
593
594 // A task graph for dynamically adding and spawning tasks during execution.
595 WorkTaskGraph _taskGraph;
596
597 // A dispatcher for running tasks within an isolated region.
598 WorkIsolatingDispatcher _isolatingDispatcher;
599
600 // Keep track of which unique input dependencies have had their cached
601 // state checked.
602 std::unique_ptr<std::atomic<uint8_t>[]> _dependencyState;
603
604 // The structures that orchestrate synchronization for the different task
605 // types.
606 //
607 // XXX: We should explore folding all these into a single instance.
608 std::atomic<bool> _resetState;
609 VdfParallelTaskSync _computeTasks;
610 VdfParallelTaskSync _inputsTasks;
611 VdfParallelTaskSync _prepTasks;
612 VdfParallelTaskSync _keepTasks;
613
614 // Keep a record of errors to post to the calling thread.
615 tbb::concurrent_vector<TfErrorTransport> _errors;
616
617 // Stores the interruption signal as determined by _DetectInterruption.
618 std::atomic<bool> _isInterrupted;
619};
620
622
623template < typename Derived, typename DataManager >
626 const VdfExecutorInterface &executor,
627 DataManager *dataManager) :
628 _executor(executor),
629 _dataManager(dataManager),
630 _resetState(),
631 _computeTasks(&_taskGraph),
632 _inputsTasks(&_taskGraph),
633 _prepTasks(&_taskGraph),
634 _keepTasks(&_taskGraph),
635 _isInterrupted()
636{
637}
638
639template < typename Derived, typename DataManager >
642{
643}
644
645template < typename Derived, typename DataManager >
646template < typename Callback >
647void
649 const VdfSchedule &schedule,
650 const VdfRequest &computeRequest,
651 VdfExecutorErrorLogger *errorLogger,
652 Callback &&callback)
653{
654 TRACE_SCOPE("VdfParallelExecutorEngineBase::RunSchedule");
655
656 // Release the python GIL before creating and running parallel work.
657 TF_PY_ALLOW_THREADS_IN_SCOPE();
658
659 // Make sure the data manager is appropriately sized.
660 _dataManager->Resize(*schedule.GetNetwork());
661
662 // Indicate that the internal state has not yet been reset.
663 _resetState.store(false, std::memory_order_relaxed);
664
665 // The persistent evaluation state.
666 VdfEvaluationState state(_executor, schedule, errorLogger);
667
668 // Build an indexed view ontop of the compute request. We will use this
669 // view for random access into the compute request in a parallel for-loop.
670 VdfRequest::IndexedView view(computeRequest);
671
672 // Perform all the work of spawning and waiting on tasks with isolated
673 // parallelism, in order to prevent evaluation tasks from being stolen in
674 // unrelated loops.
676 _isolatingDispatcher.Run([engine, &state, &view, &callback] {
677 // Collect all the leaf tasks, which are the entry point for evaluation.
678 // We will later spawn all these tasks together.
679 WorkTaskGraph::TaskLists taskLists;
680
681 // Run all the outputs in parallel. This will reset the internal state,
682 // if necessary, and collect all the leaf tasks for uncached outputs.
684 view.GetSize(),
685 [engine, &state, &view, &callback, &taskLists]
686 (size_t b, size_t e) {
687 WorkTaskGraph::TaskList *taskList = &taskLists.local();
688 for (size_t i = b; i != e; ++i) {
689 if (const VdfMaskedOutput *maskedOutput = view.Get(i)) {
690 engine->_RunOutput(
691 state, *maskedOutput, i, callback, taskList);
692 }
693 }
694 });
695
696 // Now, spawn all the leaf tasks for uncached outputs. We need to first
697 // check the cache for all requested outputs, before even running the
698 // first uncached one. Otherwise, we could get cache hits for outputs
699 // that were just computed, failing to invoke the callback.
700 engine->_taskGraph.RunLists(taskLists);
701
702 // Now, wait for all the tasks to complete.
703 {
704 TRACE_SCOPE(
705 "VdfParallelExecutorEngineBase::RunSchedule "
706 "(wait for parallel tasks)");
707 engine->_taskGraph.Wait();
708 }
709 });
710
711 _isolatingDispatcher.Wait();
712
713 // Allow the derived executor engine to finalize state after evaluation
714 // completed.
715 _Self()._FinalizeEvaluation();
716
717 // Reset the interruption signal.
718 _isInterrupted.store(false, std::memory_order_relaxed);
719
720 // Post all transported errors on the calling thread.
721 _PostTransportedErrors();
722}
723
724template < typename Derived, typename DataManager >
725void
727 const VdfSchedule &schedule)
728{
729 TRACE_FUNCTION();
730
731 // Each input dependency is uniquely indexed in the schedule, and each
732 // input dependency may be required by more than a single node / invocation.
733 // We only check state of each input dependency once, cache the result,
734 // and then re-use that cache for subsequent lookups.
735 const size_t numUniqueDeps = schedule.GetNumUniqueInputDependencies();
736 _dependencyState.reset(new std::atomic<uint8_t>[numUniqueDeps]);
737 char *const dependencyState =
738 reinterpret_cast<char*>(_dependencyState.get());
739 memset(dependencyState, 0,
740 sizeof(std::atomic<uint8_t>) * numUniqueDeps);
741
742 // Reset the task synchronization structures for all the different types
743 // of tasks.
744 _computeTasks.Reset(schedule.GetNumComputeTasks());
745 _inputsTasks.Reset(schedule.GetNumInputsTasks());
746 _prepTasks.Reset(schedule.GetNumPrepTasks());
747 _keepTasks.Reset(schedule.GetNumKeepTasks());
748}
749
750template < typename Derived, typename DataManager >
751template < typename Callback >
752void
754 const VdfEvaluationState &state,
755 const VdfMaskedOutput &maskedOutput,
756 const size_t requestedIndex,
757 Callback &callback,
758 WorkTaskGraph::TaskList *taskList)
759{
760 // The output and mask for the output to run.
761 const VdfOutput &output = *maskedOutput.GetOutput();
762 const VdfMask &mask = maskedOutput.GetMask();
763
764 // Check whether the output already has a value cached. If that's the case
765 // we do not need to run the output, but we must invoke the callback to
766 // notify the client side that evaluation of the requested output has
767 // completed.
768 if (_executor.GetOutputValue(output, mask)) {
769 callback(maskedOutput, requestedIndex);
770 return;
771 }
772
773 // If the output is uncached we need to eventually run its leaf task. This
774 // means that we need the internal state to be reset. Attempt to do that
775 // now, if it hasn't already happened.
776 bool isReset = _resetState.load(std::memory_order_relaxed);
777 if (!isReset && _resetState.compare_exchange_strong(isReset, true)) {
778 _ResetState(state.GetSchedule());
779 }
780
781 // Then allocate a leaf task and add it to the task list. We will spawn it
782 // later along with all other leaf tasks.
784 _taskGraph.AllocateTask< _LeafTask<Callback> >(
785 this, state, maskedOutput, requestedIndex, callback);
786 taskList->push_back(task);
787}
788
789template < typename Derived, typename DataManager >
790template < typename Callback >
794{
795 // Bump the ref count to 1, because as child tasks finish executing before
796 // returning from this function, we don't want this task to get re-executed
797 // prematurely.
798 AddChildReference();
799
800 // Dedicate one task for scheduler bypass to reduce scheduling overhead.
801 WorkTaskGraph::BaseTask *bypass = nullptr;
802
803 // Process the scheduled task, and recycle this task for re-execution if
804 // requested. Note that this will implicitly decrement the ref count.
805 if (_engine->_ProcessLeafTask(
806 this, _state, _output, _requestedIndex, _callback, &_evaluationStage,
807 &bypass)) {
808 _RecycleAsContinuation();
809 }
810
811 // If the task is done and does not require re-execution we will have to
812 // manually decrement the task's ref count here in order to undo the
813 // increment above.
814 else {
815 RemoveChildReference();
816 }
817
818 // Return a task for scheduler bypassing, if any.
819 return bypass;
820}
821
822template < typename Derived, typename DataManager >
825{
826 // Create an error mark, so that we can later detect if any errors have
827 // been posted, and transport them to the calling thread.
828 TfErrorMark errorMark;
829
830 // Bump the ref count to 1, because as child tasks finish executing before
831 // returning from this function, we don't want this task to get re-executed
832 // prematurely.
833 AddChildReference();
834
835 // Dedicate one task for scheduler bypass to reduce scheduling overhead.
836 WorkTaskGraph::BaseTask *bypass = nullptr;
837
838 // Get the scheduled task.
839 const VdfScheduleComputeTask &scheduleTask =
840 _state.GetSchedule().GetComputeTask(_taskIndex);
841
842 // Process the scheduled task, and recycle this task for re-execution if
843 // requested. Note that this will implicitly decrement the ref count.
844 if (_engine->_ProcessComputeTask(
845 this, _state, _node, scheduleTask, &_evaluationStage, &bypass)) {
846 _RecycleAsContinuation();
847 }
848
849 // If the task is done and does not require re-execution, mark it as done.
850 // If the task is not being recycled, we will have to manually decrement
851 // its ref count.
852 else {
853 _engine->_computeTasks.MarkDone(_taskIndex);
854 RemoveChildReference();
855 }
856
857 // If any errors have been recorded, transport them so that they can later
858 // be posted to the calling thread.
859 if (!errorMark.IsClean()) {
860 _engine->_TransportErrors(errorMark);
861 }
862
863 // Return a task for scheduler bypassing, if any.
864 return bypass;
865}
866
867template < typename Derived, typename DataManager >
870{
871 // Bump the ref count to 1, because as child tasks finish executing before
872 // returning from this function, we don't want this task to get re-executed
873 // prematurely.
874 AddChildReference();
875
876 // Dedicate one task for scheduler bypass to reduce scheduling overhead.
877 WorkTaskGraph::BaseTask *bypass = nullptr;
878
879 // Get the scheduled task.
880 const VdfScheduleInputsTask &scheduleTask =
881 _state.GetSchedule().GetInputsTask(_taskIndex);
882
883 // Process the scheduled task, and recycle this task for re-execution if
884 // requested. Note that this will implicitly decrement the ref count.
885 if (_engine->_ProcessInputsTask(
886 this, _state, _node, scheduleTask, &_evaluationStage, &bypass)) {
887 _RecycleAsContinuation();
888 }
889
890 // If the task is done and does not require re-execution, mark it as done.
891 // We will have to manually decrement the task's ref count here.
892 else {
893 _engine->_inputsTasks.MarkDone(_taskIndex);
894 RemoveChildReference();
895 }
896
897 // Return a task for scheduler bypassing, if any.
898 return bypass;
899}
900
901template < typename Derived, typename DataManager >
904{
905 // Bump the ref count to 1, because as child tasks finish executing before
906 // returning from this function, we don't want this task to get re-executed
907 // prematurely.
908 AddChildReference();
909
910 // Dedicate one task for scheduler bypass to reduce scheduling overhead.
911 WorkTaskGraph::BaseTask *bypass = nullptr;
912
913 // Process the scheduled task, and recycle this task for re-execution if
914 // requested. Note that this will implicitly decrement the ref count.
915 if (_engine->_ProcessKeepTask(
916 this, _state, _node, &_evaluationStage, &bypass)) {
917 _RecycleAsContinuation();
918 }
919
920 // If the task is done and does not require re-execution, mark it as done.
921 // We will have to manually decrement the task's ref count here.
922 else {
923 _engine->_keepTasks.MarkDone(_taskIndex);
924 RemoveChildReference();
925 }
926
927 // Return a task for scheduler bypassing, if any.
928 return bypass;
929}
930
931template < typename Derived, typename DataManager >
934{
935 // Touch all the output buffers between the source output and the
936 // destination output, not including the source output itself.
937 const VdfOutput *output = VdfGetAssociatedSourceOutput(_dest);
938 while (output && output != &_source) {
939 _engine->_Self()._Touch(*output);
940 output = VdfGetAssociatedSourceOutput(*output);
941 }
942
943 // No scheduler bypass.
944 return nullptr;
945}
946
947template < typename Derived, typename DataManager >
950{
951 if (_completed) {
952 return nullptr;
953 }
954
955 // Bump the ref count to 1, because as child tasks finish executing before
956 // returning from this function, we don't want this task to get re-executed
957 // prematurely.
958 AddChildReference();
959
960 // Invoke all the compute tasks associated with the given node.
961 const bool invoked = _engine->_InvokeComputeTasks(
962 _state.GetSchedule().GetComputeTaskIds(_node),
963 _state, _node, this, nullptr);
964
965 // If any compute tasks were invoked, recycle this task for re-execution.
966 // This task will not perform any work upon re-execution, but we use its
967 // ref count to synchronize completion of all the compute tasks.
968 // Note that recycling will implicitly decrement the ref count.
969 if (invoked) {
970 _RecycleAsContinuation();
971 _completed = true;
972 }
973
974 // If the task is done and does not require re-execution, manually decrement
975 // the ref count here.
976 else {
977 RemoveChildReference();
978 }
979
980 // No scheduler bypass.
981 return nullptr;
982}
983
984template < typename Derived, typename DataManager >
985void
987 const VdfEvaluationState &state,
988 const VdfNode &node,
989 WorkTaskGraph::BaseTask *successor,
991{
992 // Get the compute tasks associated with the requested node.
993 const VdfSchedule &schedule = state.GetSchedule();
994 VdfSchedule::TaskIdRange tasks = schedule.GetComputeTaskIds(node);
995
996 // Note that we only actually spawn requested tasks, if the task indices
997 // have been claimed successfully. If the task has already been claimed as
998 // an input dependency, then the root task will already synchronize on its
999 // completion. Otherwise, if the task has already been completed, there
1000 // isn't anything more to do.
1001
1002 // If this node has just a single compute task, it can't possible have a
1003 // keep task. Otherwise, check if the node has a keep task. If so, we need
1004 // to make sure to spawn the keep task, such that the kept data (the
1005 // requested data) will be published.
1006 if (tasks.size() > 1) {
1007 const VdfScheduleTaskIndex keepTaskIndex =
1008 schedule.GetKeepTaskIndex(node);
1009 if (!VdfScheduleTaskIsInvalid(keepTaskIndex)) {
1010 if (_keepTasks.Claim(keepTaskIndex, successor) ==
1012 _KeepTask *task =
1013 successor->AllocateChild<_KeepTask>(
1014 this, state, node, keepTaskIndex);
1015 _SpawnOrBypass(task, bypass);
1016 }
1017 return;
1018 }
1019 }
1020
1021 // If there is no keep task, spawn all of the node's compute tasks.
1022 for (const VdfScheduleTaskId computeTaskIndex : tasks) {
1023 if (_computeTasks.Claim(computeTaskIndex, successor) ==
1025 _ComputeTask *task =
1026 successor->AllocateChild<_ComputeTask>(
1027 this, state, node, computeTaskIndex);
1028 _SpawnOrBypass(task, bypass);
1029 }
1030 }
1031}
1032
1033template < typename Derived, typename DataManager >
1034void
1037 WorkTaskGraph::BaseTask **bypass)
1038{
1039 // If bypass has already been assigned a value, spawn the specified task.
1040 // Otherwise, assign the task to bypass, and later use it to drive the
1041 // scheduler bypass optimization.
1042
1043 if (!bypass || *bypass) {
1044 _taskGraph.RunTask(task);
1045 } else {
1046 *bypass = task;
1047 }
1048}
1049
1050template < typename Derived, typename DataManager >
1051template < typename Callback >
1052bool
1055 const VdfEvaluationState &state,
1056 const VdfMaskedOutput &maskedOutput,
1057 const size_t requestedIndex,
1058 Callback &callback,
1059 _EvaluationStage *evaluationStage,
1060 WorkTaskGraph::BaseTask **bypass)
1061{
1062 // The evaluation stages this task can be in.
1063 enum {
1064 EvaluationStageSpawn,
1065 EvaluationStageCallback
1066 };
1067
1068 // Handle the current evaluation stage.
1069 switch (*evaluationStage) {
1070
1071 // Spawn all the requested tasks, and recycle this task for
1072 // re-evaluation. Once the requested tasks have been completed, we will
1073 // re-run this task in the callback stage.
1074 case EvaluationStageSpawn: {
1075 const VdfNode &node = maskedOutput.GetOutput()->GetNode();
1076 _SpawnRequestedTasks(state, node, task, bypass);
1077 *evaluationStage = EvaluationStageCallback;
1078 return true;
1079 }
1080
1081 // Invoke the callback. This will happen once the requested tasks have
1082 // run and the output cache has been populated.
1083 case EvaluationStageCallback: {
1084 callback(maskedOutput, requestedIndex);
1085 }
1086 }
1087
1088 return false;
1089}
1090
1091template < typename Derived, typename DataManager >
1092bool
1095 const VdfEvaluationState &state,
1096 const VdfNode &node,
1097 const VdfScheduleComputeTask &scheduleTask,
1098 _EvaluationStage *evaluationStage,
1099 WorkTaskGraph::BaseTask **bypass)
1100{
1101 // The evaluation stages this task can be in.
1102 enum {
1103 EvaluationStageInputs,
1104 EvaluationStagePrepNode,
1105 EvaluationStageEvaluateNode
1106 };
1107
1108 // Handle the current evaluation stage.
1109 switch (*evaluationStage) {
1110
1111 // Input dependencies.
1112 case EvaluationStageInputs: {
1113 // Handle interruption detection during the first stage of
1114 // evaluation, and bail out if interruption has been detected. This
1115 // covers the outbound path (finding inputs) of the traversal.
1116 if (_DetectInterruption(state, node)) {
1117 return false;
1118 }
1119
1120 // Log execution stats for required input dependencies.
1122 _executor.GetExecutionStats(),
1123 node, VdfExecutionStats::NodeRequiredInputsEvent);
1124
1125 // Invoke the required reads and the inputs task, if applicable.
1126 const bool invokedRequireds =
1127 _InvokeRequiredInputs(scheduleTask, state, task, bypass);
1128 const bool invokedInputsTask =
1129 _InvokeInputsTask(scheduleTask, state, node, task, bypass);
1130
1131 // If we just invoked any requireds, or an inputs task: Re-execute
1132 // this task once the input dependencies have been fulfilled.
1133 if (invokedRequireds || invokedInputsTask) {
1134 *evaluationStage = EvaluationStagePrepNode;
1135 return true;
1136 }
1137 }
1138
1139 // Node preparation.
1140 case EvaluationStagePrepNode: {
1141 // Also detect interruption before actually prepping and running the
1142 // node. If interruption has been detected, there is no need to
1143 // prep or evaluate this node. This covers the inbound path
1144 // (evaluating nodes once inputs are available) of the traversal.
1145 if (_DetectInterruption(state, node)) {
1146 return false;
1147 }
1148
1149 // If we did in fact invoke a separate prep task: Re-execute this
1150 // task once the prep task has been completed.
1151 if (_InvokePrepTask(scheduleTask, state, node, task)) {
1152 *evaluationStage = EvaluationStageEvaluateNode;
1153 return true;
1154 }
1155 }
1156
1157 // Node (invocation) evaluation.
1158 case EvaluationStageEvaluateNode: {
1159 // We really only want to evaluate this node if no interruption has
1160 // been detected. Otherwise, we would be trying to dereference
1161 // output buffers, which may not available due to bailing out from
1162 // interruption.
1163 if (_HasDetectedInterruption()) {
1164 return false;
1165 }
1166
1167 // Evaluate the node, i.e. compute or pass through.
1168 _EvaluateNode(scheduleTask, state, node, task);
1169 }
1170 }
1171
1172 // No more re-execution required: We are done!
1173 return false;
1174}
1175
1176template < typename Derived, typename DataManager >
1177bool
1180 const VdfEvaluationState &state,
1181 const VdfNode &node,
1182 const VdfScheduleInputsTask &scheduleTask,
1183 _EvaluationStage *evaluationStage,
1184 WorkTaskGraph::BaseTask **bypass)
1185{
1186 // The evaluation stages this task can be in.
1187 enum {
1188 EvaluationStagePrereqs,
1189 EvaluationStageOptionals,
1190 EvaluationStageDone
1191 };
1192
1193 // Log execution stats for the inputs task.
1195 _executor.GetExecutionStats(),
1196 node, VdfExecutionStats::NodeInputsTaskEvent);
1197
1198 // Handle the current evaluation stage.
1199 switch (*evaluationStage) {
1200
1201 // Prereq inputs.
1202 case EvaluationStagePrereqs: {
1203 // If we did in fact invoke any compute tasks for prereqs:
1204 // Re-execute this task once the input dependencies has been
1205 // fulfilled.
1206 if (_InvokePrereqInputs(
1207 scheduleTask, state, task, bypass)) {
1208 *evaluationStage = EvaluationStageOptionals;
1209 return true;
1210 }
1211 }
1212
1213 // Optional inputs (those dependent on prereq values).
1214 case EvaluationStageOptionals: {
1215 // If interruption has been detected, we have to bail from this
1216 // task. This is to prevent us from reading prereq input values,
1217 // which may have ended in interruption (and therefore are not
1218 // available for reading), when determining which optional inputs
1219 // to run.
1220 if (_HasDetectedInterruption()) {
1221 return false;
1222 }
1223
1224 // If we did in fact invoke any compute tasks for optionals:
1225 // Re-execute this task once the input dependencies has been
1226 // fulfilled.
1227 if (_InvokeOptionalInputs(
1228 scheduleTask, state, node, task, bypass)) {
1229 *evaluationStage = EvaluationStageDone;
1230 return true;
1231 }
1232 }
1233 }
1234
1235 // No more re-execution required: We are done!
1236 return false;
1237}
1238
1239template < typename Derived, typename DataManager >
1240bool
1243 const VdfEvaluationState &state,
1244 const VdfNode &node,
1245 _EvaluationStage *evaluationStage,
1246 WorkTaskGraph::BaseTask **bypass)
1247{
1248 // The evaluation stages this task can be in.
1249 enum {
1250 EvaluationStageKeep,
1251 EvaluationStagePublish
1252 };
1253
1254 // Get the current schedule. We'll need it for all possible evaluation
1255 // stages below.
1256 const VdfSchedule &schedule = state.GetSchedule();
1257
1258 // Handle the current evaluation stage.
1259 switch (*evaluationStage) {
1260
1261 // Run all tasks contributing to the kept buffer.
1262 case EvaluationStageKeep: {
1263 VdfSchedule::TaskIdRange tasks = schedule.GetComputeTaskIds(node);
1264 TF_DEV_AXIOM(!tasks.empty());
1265
1266 // Look at all the compute tasks associated with the node keeping
1267 // the data. There should be at least one contributing to the kept
1268 // buffer.
1269 bool invoked = false;
1270 for (const VdfScheduleTaskId taskId : tasks) {
1271 const VdfScheduleComputeTask &computeTask =
1272 schedule.GetComputeTask(taskId);
1273
1274 // If this compute task contributes to the kept buffer, invoke
1275 // it, and remember that we just invoked a task.
1276 if (computeTask.flags.hasKeep) {
1277 invoked |= _InvokeComputeTask(
1278 taskId, state, node, task, bypass);
1279 }
1280 }
1281
1282 // If we invoked at least one task, we'll re-execute this task
1283 // once all the input dependencies have been fulfilled.
1284 if (invoked) {
1285 *evaluationStage = EvaluationStagePublish;
1286 return true;
1287 }
1288 }
1289
1290 // Publish the kept buffers.
1291 case EvaluationStagePublish: {
1292 // Make sure not to publish anything after interruption.
1293 if (_HasDetectedInterruption()) {
1294 return false;
1295 }
1296
1297 // Publish the scratch buffers now containing the kept data.
1298 _PublishScratchBuffers(schedule, node);
1299 }
1300 }
1301
1302 return false;
1303}
1304
1305template < typename Derived, typename DataManager >
1306bool
1308 const VdfScheduleTaskIndex idx,
1309 const VdfNode &node,
1310 const VdfEvaluationState &state,
1311 WorkTaskGraph::BaseTask *successor,
1312 WorkTaskGraph::BaseTask **bypass)
1313{
1314 // Attempt to claim the keep task.
1315 VdfParallelTaskSync::State claimState = _keepTasks.Claim(idx, successor);
1316
1317 // If the task has been claimed successfully, i.e. we are the first to claim
1318 // it as an input dependency, go ahead and spawn a corresponding TBB task.
1319 if (claimState == VdfParallelTaskSync::State::Claimed) {
1320 _KeepTask *task = successor->AllocateChild<_KeepTask>(
1321 this, state, node, idx);
1322 _SpawnOrBypass(task, bypass);
1323 }
1324
1325 // If the task isn't done already (i.e. we just claimed it, or were
1326 // instructed to wait for its completion) return false.
1327 return claimState != VdfParallelTaskSync::State::Done;
1328}
1329
1330template < typename Derived, typename DataManager >
1331void
1333 const VdfOutput &dest,
1334 const VdfOutput &source)
1335{
1336 // Allocate a new touch task and spawn it. Note that only the root task has
1337 // to wait for completion of this task, since this is purely background
1338 // work.
1339
1340 _TouchTask *task = _taskGraph.AllocateTask<_TouchTask>(
1341 this, dest, source);
1342 _taskGraph.RunTask(task);
1343}
1344
1345template < typename Derived, typename DataManager >
1346bool
1348 const VdfScheduleTaskId taskIndex,
1349 const VdfEvaluationState &state,
1350 const VdfNode &node,
1351 WorkTaskGraph::BaseTask *successor,
1352 WorkTaskGraph::BaseTask **bypass)
1353{
1354 // Attempt to claim the compute task.
1356 claimState = _computeTasks.Claim(taskIndex, successor);
1357
1358 // If the task has been claimed successfully, i.e. we are the first to claim
1359 // it as an input dependency, go ahead and spawn a corresponding TBB task.
1360 if (claimState == VdfParallelTaskSync::State::Claimed) {
1361 _ComputeTask *task =
1362 successor->AllocateChild<_ComputeTask>(
1363 this, state, node, taskIndex);
1364 _SpawnOrBypass(task, bypass);
1365 }
1366
1367 // If the task isn't done already (i.e. we just claimed it, or were
1368 // instructed to wait for its completion) return false.
1369 return claimState != VdfParallelTaskSync::State::Done;
1370}
1371
1372template < typename Derived, typename DataManager >
1373template < typename Iterable >
1374bool
1376 const Iterable &tasks,
1377 const VdfEvaluationState &state,
1378 const VdfNode &node,
1379 WorkTaskGraph::BaseTask *successor,
1380 WorkTaskGraph::BaseTask **bypass)
1381{
1382 // Invoke all compute tasks within the iterable range.
1383 bool invoked = false;
1384 for (const VdfScheduleTaskId taskId : tasks) {
1385 invoked |= _InvokeComputeTask(taskId, state, node, successor, bypass);
1386 }
1387
1388 // Return true if any tasks have been invoked.
1389 return invoked;
1390}
1391
1392template < typename Derived, typename DataManager >
1393bool
1396 const VdfOutput &output,
1397 const VdfMask &mask)
1398{
1399 enum {
1400 StateUndecided,
1401 StateCached,
1402 StateUncached
1403 };
1404
1405 // Figure out what state this dependency is currently in.
1406 std::atomic<uint8_t> *state = &_dependencyState[uniqueIndex];
1407 uint8_t currentState = state->load(std::memory_order_relaxed);
1408
1409 // If we haven't yet decided whether this dependency has been cached or not,
1410 // check now.
1411 if (currentState == StateUndecided) {
1412 // Determine the cache state.
1413 const bool isCached = _executor.GetOutputValue(output, mask);
1414 const uint8_t newState = isCached ? StateCached : StateUncached;
1415
1416 // Store the new state, but only if it has not changed (e.g. updated by
1417 // a different thread) in the meantime. If the CAS below fails,
1418 // currentState will be updated with the new state.
1419 if (state->compare_exchange_strong(currentState, newState)) {
1420 return isCached;
1421 }
1422 }
1423
1424 // Return true if the dependency has been cached.
1425 return currentState == StateCached;
1426}
1427
1428template < typename Derived, typename DataManager >
1429bool
1431 const VdfScheduleInputDependency &input,
1432 const VdfEvaluationState &state,
1433 WorkTaskGraph::BaseTask *successor,
1434 WorkTaskGraph::BaseTask **bypass)
1435{
1436 // Check if the input dependency has already been fulfilled by looking up
1437 // the relevant output data in the executor caches. If the data is there,
1438 // we don't need to worry about invoking any tasks. Note, that if we decide
1439 // to invoke the corresponding task, we commit to running all the tasks for
1440 // all the invocations of the node! That's why we cache the result of
1441 // determining the output cache state the first time. This avoids a
1442 // correctness problem where the parent executor publishes the requested
1443 // output data after at least one invocations has already been invoked,
1444 // and subsequent invocations would then fail to run, because the data is
1445 // now available.
1446 if (_IsInputDependencyCached(input.uniqueIndex, input.output, input.mask)) {
1447 return false;
1448 }
1449
1450 // Get the current schedule.
1451 const VdfSchedule &schedule = state.GetSchedule();
1452
1453 // Get an iterable range of compute tasks for this input dependency.
1454 VdfSchedule::TaskIdRange tasks = schedule.GetComputeTaskIds(input);
1455
1456 // Retrieve the node ad the source end of the input dependency.
1457 const VdfNode &node = input.output.GetNode();
1458
1459 // Invoke the relevant compute tasks, if any.
1460 bool invoked = _InvokeComputeTasks(tasks, state, node, successor, bypass);
1461
1462 // If there are no compute tasks, and the dependency is instead for a keep
1463 // task, invoke that keep task instead.
1464 const VdfScheduleTaskIndex keepTask = input.computeOrKeepTaskId;
1465 if (input.computeTaskNum == 0 && !VdfScheduleTaskIsInvalid(keepTask)) {
1466 invoked |= _InvokeKeepTask(keepTask, node, state, successor, bypass);
1467 }
1468
1469 // Return true if any tasks have been invoked.
1470 return invoked;
1471}
1472
1473template < typename Derived, typename DataManager >
1474bool
1476 const VdfOutput &output,
1477 const VdfEvaluationState &state,
1478 WorkTaskGraph::BaseTask *successor,
1479 WorkTaskGraph::BaseTask **bypass)
1480{
1481 // Get the current schedule.
1482 const VdfSchedule &schedule = state.GetSchedule();
1483
1484 // If the output is not scheduled, there is no need to invoke a task.
1485 VdfSchedule::OutputId oid = schedule.GetOutputId(output);
1486 if (!oid.IsValid()) {
1487 return false;
1488 }
1489
1490 // Is the output already cached? If that's the case there is no need to
1491 // invoke any tasks.
1492 const VdfMask &requestMask = schedule.GetRequestMask(oid);
1493 if (_executor.GetOutputValue(output, requestMask)) {
1494 return false;
1495 }
1496
1497 // Retrieve the node at the source end of the input dependency.
1498 const VdfNode &node = output.GetNode();
1499
1500 // Get an iterable range of tasks for this input dependency.
1501 VdfSchedule::TaskIdRange tasks = schedule.GetComputeTaskIds(node);
1502
1503 // Invoke all the dependent tasks.
1504 bool invoked = _InvokeComputeTasks(tasks, state, node, successor, bypass);
1505
1506 // If there are no compute tasks, and the dependency is instead for a keep
1507 // task, invoke that keep task instead.
1508 const VdfScheduleTaskIndex keepTask = schedule.GetKeepTaskIndex(node);
1509 if (!VdfScheduleTaskIsInvalid(keepTask)) {
1510 invoked |= _InvokeKeepTask(keepTask, node, state, successor, bypass);
1511 }
1512
1513 // Return true if any tasks have been invoked.
1514 return invoked;
1515}
1516
1517template < typename Derived, typename DataManager >
1518bool
1520 const VdfScheduleInputsTask &scheduleTask,
1521 const VdfEvaluationState &state,
1522 WorkTaskGraph::BaseTask *successor,
1523 WorkTaskGraph::BaseTask **bypass)
1524{
1525 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_InvokePrereqInputs");
1526
1527 // If there are no prereqs dependencies, bail out.
1528 if (!scheduleTask.prereqsNum) {
1529 return false;
1530 }
1531
1532 // Get a range of input dependencies to fulfill to satisfy the prereqs.
1534 state.GetSchedule().GetPrereqInputDependencies(scheduleTask);
1535
1536 // Iterate over all the prereq dependencies, and invoke the relevant
1537 // compute and/or keep tasks.
1538 bool invoked = false;
1539 for (const VdfScheduleInputDependency &i : prereqs) {
1540 invoked |= _InvokeComputeOrKeepTasks(i, state, successor, bypass);
1541 }
1542
1543 // Return true if any tasks have been invoked.
1544 return invoked;
1545}
1546
1547template < typename Derived, typename DataManager >
1548bool
1550 const VdfScheduleInputsTask &scheduleTask,
1551 const VdfEvaluationState &state,
1552 const VdfNode &node,
1553 WorkTaskGraph::BaseTask *successor,
1554 WorkTaskGraph::BaseTask **bypass)
1555{
1556 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_InvokeOptionalInputs");
1557
1558 // If there are no dependencies, bail out.
1559 if (!scheduleTask.optionalsNum) {
1560 return false;
1561 }
1562
1563 // Get the schedule from the state.
1564 const VdfSchedule &schedule = state.GetSchedule();
1565
1566 // Get the read dependencies from the schedule.
1568 schedule.GetOptionalInputDependencies(scheduleTask);
1569
1570 // Ask the node for its required inputs.
1571 VdfRequiredInputsPredicate inputsPredicate =
1572 node.GetRequiredInputsPredicate(VdfContext(state, node));
1573
1574 // If the node does not require any inputs, bail out.
1575 if (!inputsPredicate.HasRequiredReads()) {
1576 return false;
1577 }
1578
1579 // Have any tasks been invoked?
1580 bool invoked = false;
1581
1582 // If all inputs are required, simply invoke tasks for each one of the
1583 // required input dependencies. We do not need to do any task inversion in
1584 // this case, which is great.
1585 if (inputsPredicate.RequiresAllReads()) {
1586 for (const VdfScheduleInputDependency &i : inputs) {
1587 invoked |= _InvokeComputeOrKeepTasks(i, state, successor, bypass);
1588 }
1589 }
1590
1591 // If only a subset of the inputs is required, we need to invert the
1592 // required inputs into compute tasks, and invoke those.
1593 else {
1594 PEE_TRACE_SCOPE("Task Inversion");
1595
1596 // Find all the compute tasks for all the source outputs on all
1597 // connections on required inputs. Then, invoke those tasks. Note, that
1598 // the schedule will only contain compute tasks for nodes that have
1599 // also been scheduled, so there is no need to check if a source output
1600 // has been scheduled, here.
1601 for (const VdfScheduleInput &scheduleInput : schedule.GetInputs(node)) {
1602 if (inputsPredicate.IsRequiredRead(*scheduleInput.input)) {
1603 invoked |= _InvokeComputeOrKeepTasks(
1604 *scheduleInput.source, state, successor, bypass);
1605 }
1606 }
1607 }
1608
1609 // Return true if any compute tasks have been invoked.
1610 return invoked;
1611}
1612
1613template < typename Derived, typename DataManager >
1614bool
1616 const VdfScheduleComputeTask &scheduleTask,
1617 const VdfEvaluationState &state,
1618 WorkTaskGraph::BaseTask *successor,
1619 WorkTaskGraph::BaseTask **bypass)
1620{
1621 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_InvokeRequiredInputs");
1622
1623 // Get the current schedule.
1624 const VdfSchedule &schedule = state.GetSchedule();
1625
1626 // Get an iterable range of required input dependencies for this task.
1628 schedule.GetRequiredInputDependencies(scheduleTask);
1629
1630 // Invoke the compute tasks for all required input dependencies.
1631 bool invoked = false;
1632 for (const VdfScheduleInputDependency &i : requireds) {
1633 invoked |= _InvokeComputeOrKeepTasks(i, state, successor, bypass);
1634 }
1635
1636 // Returns true if any compute tasks have been invoked.
1637 return invoked;
1638}
1639
1640template < typename Derived, typename DataManager >
1641bool
1643 const VdfScheduleComputeTask &scheduleTask,
1644 const VdfEvaluationState &state,
1645 const VdfNode &node,
1646 WorkTaskGraph::BaseTask *successor,
1647 WorkTaskGraph::BaseTask **bypass)
1648{
1649 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_InvokeInputsTask");
1650
1651 // Check if this compute task has a valid inputs task, and bail out
1652 // if that's not the case.
1653 const VdfScheduleTaskIndex inputsTaskIndex = scheduleTask.inputsTaskIndex;
1654 if (VdfScheduleTaskIsInvalid(inputsTaskIndex)) {
1655 return false;
1656 }
1657
1658 // Attempt to claim the inputs task.
1659 VdfParallelTaskSync::State claimState =
1660 _inputsTasks.Claim(inputsTaskIndex, successor);
1661
1662 // If the inputs task has been successfully claimed, i.e. we are the first
1663 // to claim this task, go ahead an allocate and spawn a TBB task.
1664 if (claimState == VdfParallelTaskSync::State::Claimed) {
1665 _InputsTask *task =
1666 successor->AllocateChild<_InputsTask>(
1667 this, state, node, inputsTaskIndex);
1668 _SpawnOrBypass(task, bypass);
1669 }
1670
1671 // If the task isn't done already (i.e. we just claimed it, or were
1672 // instructed to wait for its completion) return false.
1673 return claimState != VdfParallelTaskSync::State::Done;
1674}
1675
1676template < typename Derived, typename DataManager >
1677bool
1679 const VdfScheduleComputeTask &scheduleTask,
1680 const VdfEvaluationState &state,
1681 const VdfNode &node,
1682 WorkTaskGraph::BaseTask *successor)
1683{
1684 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_InvokePrepTask");
1685
1686 // Check if this compute task has a valid prep task. If it does not have
1687 // a valid prep task, we still have to prepare the node. However, since
1688 // there is no separate task for node preparation, we know that there is
1689 // only one claimant for this task, and we can therefore simply call into
1690 // _PrepareNode. It's not necessary to update and synchronization
1691 // structure at this point, and we can also return false, because no
1692 // task has been invoked,
1693 const VdfScheduleTaskIndex prepTaskIndex = scheduleTask.prepTaskIndex;
1694 if (VdfScheduleTaskIsInvalid(prepTaskIndex)) {
1695 _PrepareNode(state, node);
1696 return false;
1697 }
1698
1699 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_InvokePrepTask (task)");
1700
1701 // If there is a separate task for node preparation, attempt to claim it.
1702 VdfParallelTaskSync::State claimState =
1703 _prepTasks.Claim(prepTaskIndex, successor);
1704
1705 // If the prep task has been successfully claimed, i.e. we are the first
1706 // to claim this task, go ahead and do the preparation. Note, that it's
1707 // not necessary to actually do the invocation in a separate task. We can
1708 // return false here, because no task was
1709 if (claimState == VdfParallelTaskSync::State::Claimed) {
1710 _PrepareNode(state, node);
1711 _prepTasks.MarkDone(prepTaskIndex);
1712 return false;
1713 }
1714
1715 // If we were instructed to wait for this task to complete, return true.
1716 // Otherwise the task had already been completed, and we don't need to
1717 // synchronize on it.
1718 return claimState == VdfParallelTaskSync::State::Wait;
1719}
1720
1721template < typename Derived, typename DataManager >
1722void
1724 const VdfEvaluationState &state,
1725 const VdfNode &node)
1726{
1727 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_PrepareNode");
1728
1729 // Log execution stats for node preparation.
1731 _executor.GetExecutionStats(),
1732 node, VdfExecutionStats::NodePrepareEvent);
1733
1734 // Prepare each one of the scheduled outputs.
1735 const VdfSchedule &schedule = state.GetSchedule();
1736 VDF_FOR_EACH_SCHEDULED_OUTPUT_ID(outputId, schedule, node) {
1737 _PrepareOutput(schedule, outputId);
1738 }
1739}
1740
1741template < typename Derived, typename DataManager >
1742void
1744 const VdfSchedule &schedule,
1745 const VdfSchedule::OutputId outputId)
1746{
1747 // Get the VdfOutput for this scheduled output.
1748 const VdfOutput &output = *schedule.GetOutput(outputId);
1749
1750 // Mark the output as having been touched during evaluation. We defer this
1751 // work to the derived class, because the executor engine may or may not
1752 // be required to actually do any touching.
1753 _Self()._Touch(output);
1754
1755 // Retrieve the data handle.
1756 _DataHandle dataHandle =
1757 _dataManager->GetOrCreateDataHandle(output.GetId());
1758
1759 // Reset the private buffer, and assign the request mask.
1760 const VdfMask &requestMask = schedule.GetRequestMask(outputId);
1761 VdfExecutorBufferData *privateBuffer =
1762 _dataManager->GetPrivateBufferData(dataHandle);
1763 privateBuffer->ResetExecutorCache(requestMask);
1764
1765 // For associated outputs, make sure the private data is available, before
1766 // we start writing to it from multiple threads. This will make sure that
1767 // the buffer has been passed or copied down from the source output.
1768 if (output.GetAssociatedInput()) {
1769 _PrepareReadWriteBuffer(
1770 output, outputId, requestMask, schedule, privateBuffer);
1771 }
1772
1773 // Reset the scratch buffer, and assign the keep mask, if any.
1774 const VdfMask &keepMask = schedule.GetKeepMask(outputId);
1775 VdfExecutorBufferData *scratchBuffer =
1776 _dataManager->GetScratchBufferData(dataHandle);
1777 scratchBuffer->ResetExecutorCache(keepMask);
1778
1779 // Make sure the scratch buffer is available, and sized appropriately
1780 // to accommodate all the kept data, without having to resize the
1781 // buffer (which would not be thread-safe). We will subsequently be
1782 // populating this scratch buffer, and that may happen from multiple
1783 // threads!
1784 if (!keepMask.IsEmpty()) {
1785 _CreateScratchCache(output, dataHandle, keepMask, scratchBuffer);
1786 }
1787}
1788
1789template < typename Derived, typename DataManager >
1790void
1792 const VdfOutput &output,
1793 const _DataHandle dataHandle,
1794 const VdfMask &mask,
1795 VdfExecutorBufferData *scratchBuffer)
1796{
1797 VdfExecutorBufferData *publicBuffer =
1798 _dataManager->GetPublicBufferData(dataHandle);
1799 const VdfMask &publicMask = publicBuffer->GetExecutorCacheMask();
1800
1801 // If there is no public data at the output, the size of the scratch cache
1802 // is determined by the mask alone.
1803 if (publicMask.IsEmpty() || publicMask.IsAllZeros()) {
1804 _dataManager->CreateOutputCache(output, scratchBuffer, mask.GetBits());
1805 }
1806
1807 // If there is public data at the output, we are later going to absorb that
1808 // data into the scratch cache. Hence, we will make sure that the buffer is
1809 // sized to accomodate both the specified mask, and the publicMask.
1810 else {
1811 VdfMask::Bits unionBits(
1812 mask.GetSize(),
1813 std::min(mask.GetFirstSet(), publicMask.GetFirstSet()),
1814 std::max(mask.GetLastSet(), publicMask.GetLastSet()));
1815 _dataManager->CreateOutputCache(output, scratchBuffer, unionBits);
1816 }
1817}
1818
1819template < typename Derived, typename DataManager >
1820void
1822 const VdfScheduleComputeTask &scheduleTask,
1823 const VdfEvaluationState &state,
1824 const VdfNode &node,
1825 WorkTaskGraph::BaseTask *successor)
1826{
1827 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_EvaluateNode");
1828
1829 // Log execution stats for node evaluation.
1831 _executor.GetExecutionStats(),
1832 node, VdfExecutionStats::NodeEvaluateEvent);
1833
1834 // Compute the node, if it is affective.
1835 if (scheduleTask.flags.isAffective) {
1836 _ComputeNode(scheduleTask, state, node);
1837 }
1838
1839 // If the node is not affective, make sure that all its data has been
1840 // passed through.
1841 else {
1842 _PassThroughNode(scheduleTask, state, node);
1843 }
1844}
1845
1846template < typename Derived, typename DataManager >
1847void
1849 const VdfScheduleComputeTask &scheduleTask,
1850 const VdfEvaluationState &state,
1851 const VdfNode &node)
1852{
1853 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_ComputeNode");
1854
1855 // Log an event indicating this node has been computed.
1856 if (VdfExecutionStats *stats = _executor.GetExecutionStats()) {
1857 stats->LogTimestamp(VdfExecutionStats::NodeDidComputeEvent, node);
1858 }
1859
1860 // Execute the node callback. Make sure to also pass the invocation index,
1861 // to the VdfContext. The node may not have multiple invocations, i.e. the
1862 // invocation index may be VdfScheduleTaskInvalid.
1863 node.Compute(VdfContext(state, node, scheduleTask.invocationIndex));
1864
1865 // If interruption occurred while the callback was running, the data
1866 // produced by the callback may not all be correct. If this happens, we
1867 // want to avoid processing any of the outputs since doing so may publish
1868 // results to the buffers.
1869 if (_DetectInterruption(state, node)) {
1870 return;
1871 }
1872
1873 // We need to finalize all the scheduled outputs. This will take care of
1874 // populating scratch buffers with kept data, as well as publishing any
1875 // output data, for example.
1876 const VdfSchedule &schedule = state.GetSchedule();
1877 VDF_FOR_EACH_SCHEDULED_OUTPUT_ID(outputId, schedule, node) {
1878 const VdfOutput &output = *schedule.GetOutput(outputId);
1879
1880 // Retrieve the data handle for this output.
1881 _DataHandle dataHandle = _dataManager->GetDataHandle(output.GetId());
1882 TF_DEV_AXIOM(_dataManager->IsValidDataHandle(dataHandle));
1883
1884 // Get the private executor buffer.
1885 VdfExecutorBufferData *privateBuffer =
1886 _dataManager->GetPrivateBufferData(dataHandle);
1887
1888 // Check to see if the node did indeed produce values for this output.
1889 // The node callback is expected to produce buffers for all the
1890 // scheduled outputs. By definition, read/write outputs will always
1891 // have produced a value, even if that value was just an unmodified
1892 // pass-through.
1893 if (!privateBuffer->GetExecutorCache()) {
1894 // No output value: Spit out a warning.
1895 TF_WARN(
1896 "No value set for output " + output.GetDebugName() +
1897 " of type " + output.GetSpec().GetType().GetTypeName() +
1898 " named " + output.GetName().GetString());
1899
1900 // Fill the output with a default value.
1902 output.GetSpec().GetType(),
1903 schedule.GetRequestMask(outputId).GetSize(),
1904 _dataManager->GetOrCreateOutputValueForWriting(
1905 output, dataHandle));
1906 }
1907
1908 // Make sure the output has been processed. This will take care of
1909 // keeping all the relevant data, as well as publishing buffers for
1910 // consumption by dependents.
1911 const bool hasAssociatedInput = output.GetAssociatedInput();
1912 _ProcessOutput(
1913 scheduleTask, state, output, outputId, dataHandle,
1914 hasAssociatedInput, privateBuffer);
1915 }
1916}
1917
1918template < typename Derived, typename DataManager >
1919void
1921 const VdfScheduleComputeTask &scheduleTask,
1922 const VdfEvaluationState &state,
1923 const VdfNode &node)
1924{
1925 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_PassThroughNode");
1926
1927 // Iterate over all the scheduled outputs on a node, and make sure that
1928 // they have been properly processed.
1929 const VdfSchedule &schedule = state.GetSchedule();
1930 VDF_FOR_EACH_SCHEDULED_OUTPUT_ID(outputId, schedule, node) {
1931 const VdfOutput &output = *schedule.GetOutput(outputId);
1932
1933 // Retrieve the data handle for this output.
1934 _DataHandle dataHandle = _dataManager->GetDataHandle(output.GetId());
1935 TF_DEV_AXIOM(_dataManager->IsValidDataHandle(dataHandle));
1936
1937 // Get the private executor buffer.
1938 VdfExecutorBufferData *privateBuffer =
1939 _dataManager->GetPrivateBufferData(dataHandle);
1940
1941 // Make sure the output has been processed. This will take care of
1942 // keeping all the relevant data, as well as publishing buffers for
1943 // consumption by dependents.
1944 const bool hasAssociatedInput = output.GetAssociatedInput();
1945 _ProcessOutput(
1946 scheduleTask, state, output, outputId, dataHandle,
1947 hasAssociatedInput, privateBuffer);
1948 }
1949}
1950
1951template < typename Derived, typename DataManager >
1952void
1954 const VdfScheduleComputeTask &scheduleTask,
1955 const VdfEvaluationState &state,
1956 const VdfOutput &output,
1957 const VdfSchedule::OutputId outputId,
1958 const _DataHandle dataHandle,
1959 const bool hasAssociatedInput,
1960 VdfExecutorBufferData *privateBuffer)
1961{
1962 // Is this a node have multiple invocations? If the invocation index is set
1963 // to VdfScheduleTaskInvalid, the node does only have one invocations.
1964 const VdfScheduleTaskIndex invocationIndex = scheduleTask.invocationIndex;
1965 const bool hasMultipleInvocations =
1966 !VdfScheduleTaskIsInvalid(invocationIndex);
1967
1968 // Does this output pass its buffer?
1969 const VdfSchedule &schedule = state.GetSchedule();
1970 const VdfOutput *passToOutput = schedule.GetPassToOutput(outputId);
1971
1972 // Allow the derived engine to finalize the output data before
1973 // publishing any buffers.
1974 _Self()._FinalizeOutput(
1975 state, output, outputId, dataHandle, invocationIndex, passToOutput);
1976
1977 // If this output does not pass its buffer, we need to make sure to
1978 // publish the entire private buffer to make it available for all
1979 // dependents.
1980 if (!passToOutput) {
1981 // Can't publish here, if there are multiple invocations scheduled
1982 // for the same node. We should never schedule multiple invocations for
1983 // nodes that don't pass their buffers.
1984 TF_DEV_AXIOM(!hasMultipleInvocations);
1985
1986 // Absorb any publicly available data, which is not also available in
1987 // the private buffer. Note that the missing data will be written to
1988 // the scratch buffer. The private buffer may still be in use by other
1989 // node invocations, and doing the merging is a potentially destructive
1990 // (i.e. racy) operation.
1991 const VdfMask &privateMask = privateBuffer->GetExecutorCacheMask();
1992 VdfVector *scratchBuffer =
1993 _AbsorbPublicBuffer(output, dataHandle, privateMask);
1994
1995 // If publicly available data has been absorbed into the scratch buffer,
1996 // also copy the private buffer there, and then publish the whole
1997 // shebang.
1998 if (scratchBuffer) {
1999 scratchBuffer->Merge(
2000 *privateBuffer->GetExecutorCache(), privateMask);
2001 _dataManager->PublishScratchBufferData(dataHandle);
2002 }
2003
2004 // If no data has been written to the scratch buffer, we can simply
2005 // publish the private buffer.
2006 else {
2007 _dataManager->PublishPrivateBufferData(dataHandle);
2008 }
2009 }
2010
2011 // We are passing this buffer, so let's see if we need to keep anything.
2012 else {
2013 // Get the scratch buffer data.
2014 VdfExecutorBufferData *scratchBuffer =
2015 _dataManager->GetScratchBufferData(dataHandle);
2016
2017 // If a scratch buffer has been prepared for this output, then make
2018 // sure to keep the relevant data currently in the private buffer.
2019 if (VdfVector *scratchValue = scratchBuffer->GetExecutorCache()) {
2020 // Get the keep mask. If the node has multiple invocations, this
2021 // should be the keep mask relevant to the current invocation.
2022 const VdfMask &keepMask = hasMultipleInvocations
2023 ? schedule.GetKeepMask(invocationIndex)
2024 : schedule.GetKeepMask(outputId);
2025
2026 // Merge the relevant data into the scratch buffer. Note that the
2027 // scratch buffer must be appropriately sized to accommodate all the
2028 // data. Otherwise, Merge will expand the buffer, which is not
2029 // thread-safe. Making sure that the buffer is appropriately sized
2030 // is the responsibility of node preparation.
2031 {
2032 PEE_TRACE_SCOPE(
2033 "VdfParallelExecutorEngineBase::_FinalizeOutput (keep)");
2034 scratchValue->Merge(
2035 *privateBuffer->GetExecutorCache(), keepMask);
2036 }
2037
2038 // If this is not a node invocation, publish the scratch buffer
2039 // right here. This way, we can avoid creating a separate keep task
2040 // for any node that has only one compute task in the first place.
2041 if (!hasMultipleInvocations) {
2042 _AbsorbPublicBuffer(
2043 output, dataHandle, scratchBuffer->GetExecutorCacheMask());
2044 _dataManager->PublishScratchBufferData(dataHandle);
2045 }
2046 }
2047 }
2048}
2049
2050template < typename Derived, typename DataManager >
2051void
2053 const VdfOutput &output,
2054 const VdfSchedule::OutputId outputId,
2055 const VdfMask &mask,
2056 const VdfSchedule &schedule,
2057 VdfExecutorBufferData *privateBuffer)
2058{
2059 // If there is a from-buffer output, pass straight from the from-buffer
2060 // source. Also make sure to touch any output in between, but we can do
2061 // that in a separate, background task.
2062 if (const VdfOutput *source = schedule.GetFromBufferOutput(outputId)) {
2063 _PassOrCopyBuffer(output, *source, mask, schedule, privateBuffer);
2064 _InvokeTouchTask(output, *source);
2065 return;
2066 }
2067
2068 // XXX: Don't do this connection nonsense here. All this information can
2069 // be stored in the schedule.
2070
2071 const VdfInput *input = output.GetAssociatedInput();
2072 const size_t numInputNodes = input->GetNumConnections();
2073
2074 // If there is exactly one input, we can pass or copy that buffer down.
2075 if (numInputNodes == 1 && !(*input)[0].GetMask().IsAllZeros()) {
2076 const VdfOutput &source = (*input)[0].GetSourceOutput();
2077 _PassOrCopyBuffer(output, source, mask, schedule, privateBuffer);
2078 return;
2079 }
2080
2081 // If we have no inputs, a buffer cannot be passed. Instead, create a
2082 // brand new one.
2083 _dataManager->CreateOutputCache(output, privateBuffer);
2084}
2085
2086template < typename Derived, typename DataManager >
2087void
2089 const VdfOutput &output,
2090 const VdfOutput &source,
2091 const VdfMask &inputMask,
2092 const VdfSchedule &schedule,
2093 VdfExecutorBufferData *privateBuffer)
2094{
2095 // Decide whether to pass or copy the buffer from the source output.
2096 bool passBuffer = false;
2097
2098 // If the source data handle is valid...
2099 _DataHandle sourceHandle = _dataManager->GetDataHandle(source.GetId());
2100 if (_dataManager->IsValidDataHandle(sourceHandle)) {
2101
2102 // ... and the destination is the pass-to output of the source ...
2103 const VdfSchedule::OutputId sourceOid = schedule.GetOutputId(source);
2104 if (schedule.GetPassToOutput(sourceOid) == &output) {
2105
2106 // ... and the cache lookup resulted in a cache miss (i.e. the
2107 // output value had to be computed by evaluating the corresponding
2108 // compute tasks.) Pass the buffer down from the source output,
2109 // instead of copying it.
2110 const VdfScheduleInputDependencyUniqueIndex uniqueIndex =
2111 schedule.GetUniqueIndex(sourceOid);
2112 TF_DEV_AXIOM(uniqueIndex != VdfScheduleTaskInvalid);
2113 passBuffer = !_IsInputDependencyCached(
2114 uniqueIndex, source, inputMask);
2115 }
2116 }
2117
2118 // Pass the buffer from the source output. This is the fast path.
2119 if (passBuffer) {
2120 VdfExecutorBufferData *sourcePrivateBuffer =
2121 _dataManager->GetPrivateBufferData(sourceHandle);
2122 _PassBuffer(sourcePrivateBuffer, privateBuffer);
2123 }
2124
2125 // Copy the buffer instead.
2126 else {
2127 _CopyBuffer(output, source, inputMask, privateBuffer);
2128 }
2129}
2130
2131template < typename Derived, typename DataManager >
2132void
2134 VdfExecutorBufferData *fromBuffer,
2135 VdfExecutorBufferData *toBuffer) const
2136{
2137 VdfVector *sourceValue = fromBuffer->GetExecutorCache();
2138 TF_DEV_AXIOM(sourceValue);
2139
2140 // Pass the data along. Assume ownership of the source vector
2141 // and relinquish the ownership at the source private buffer.
2142 toBuffer->TakeOwnership(sourceValue);
2143 fromBuffer->YieldOwnership();
2144}
2145
2146template < typename Derived, typename DataManager >
2147void
2149 const VdfOutput &output,
2150 const VdfOutput &source,
2151 const VdfMask &fromMask,
2152 VdfExecutorBufferData *toBuffer) const
2153{
2154 PEE_TRACE_SCOPE("VdfParallelExecutorEngineBase::_CopyBuffer");
2155
2156 // Note that we must look up the data through the executor, instead of the
2157 // data manager, because we may have initially received a cache hit by
2158 // looking up the executor. The data may live on the parent executor, for
2159 // example, instead of the local data manager.
2160 const VdfVector *sourceVector = _executor.GetOutputValue(source, fromMask);
2161 if (!sourceVector) {
2162 VDF_FATAL_ERROR(
2163 source.GetNode(), "No cache for output " + source.GetDebugName());
2164 }
2165
2166 // Create a new output cache at the destination output, and copy all the
2167 // data from the source output.
2168 VdfVector *destValue = _dataManager->CreateOutputCache(output, toBuffer);
2169 destValue->Copy(*sourceVector, fromMask);
2170}
2171
2172template < typename Derived, typename DataManager >
2173void
2175 const VdfSchedule &schedule,
2176 const VdfNode &node)
2177{
2178 // Iterate over all the outputs scheduled on this node.
2179 VDF_FOR_EACH_SCHEDULED_OUTPUT_ID(outputId, schedule, node) {
2180 const VdfOutput &output = *schedule.GetOutput(outputId);
2181
2182 // Get the data handle for this output.
2183 const _DataHandle dataHandle =
2184 _dataManager->GetDataHandle(output.GetId());
2185 TF_DEV_AXIOM(_dataManager->IsValidDataHandle(dataHandle));
2186
2187 // Retrieve the scratch buffer.
2188 VdfExecutorBufferData *scratchBuffer =
2189 _dataManager->GetScratchBufferData(dataHandle);
2190
2191 // If the scratch buffer contains any data, absorb the public data still
2192 // living on this output, and publish the whole shebang.
2193 if (const VdfVector* value = scratchBuffer->GetExecutorCache()) {
2194 _AbsorbPublicBuffer(
2195 output, dataHandle, scratchBuffer->GetExecutorCacheMask());
2196 _dataManager->PublishScratchBufferData(dataHandle);
2197 }
2198 }
2199}
2200
2201template < typename Derived, typename DataManager >
2202VdfVector *
2204 const VdfOutput &output,
2205 const _DataHandle dataHandle,
2206 const VdfMask &haveMask)
2207{
2208 // Get the public buffer value and mask.
2209 const VdfExecutorBufferData *publicBuffer =
2210 _dataManager->GetPublicBufferData(dataHandle);
2211 const VdfVector *publicValue = publicBuffer->GetExecutorCache();
2212 const VdfMask &publicMask = publicBuffer->GetExecutorCacheMask();
2213
2214 // If there is no public data available, or all that data is already
2215 // included in the destination mask, bail out.
2216 if (!publicValue || publicMask.IsEmpty() || publicMask == haveMask) {
2217 return nullptr;
2218 }
2219
2220 // Determine the mask of data to copy from the public buffer, and bail out
2221 // if there is no data to copy.
2222 const VdfMask::Bits mergeBits = publicMask.GetBits() - haveMask.GetBits();
2223 if (mergeBits.AreAllUnset()) {
2224 return nullptr;
2225 }
2226
2227 // The destination buffer is the scratch buffer.
2228 VdfExecutorBufferData *scratchBuffer =
2229 _dataManager->GetScratchBufferData(dataHandle);
2230
2231 // Let's make sure the scratch buffer has an executor cache to write into,
2232 // and create a new one if it doesn't.
2233 VdfVector *scratchValue = scratchBuffer->GetExecutorCache();
2234 const VdfMask extendedMask = publicMask | haveMask;
2235 if (!scratchValue) {
2236 scratchValue = _dataManager->CreateOutputCache(
2237 output, scratchBuffer, extendedMask.GetBits());
2238 }
2239
2240 // Merge the public value into the scratch buffer. We only merge the missing
2241 // elements, in order to avoid redundant copies. Also make sure that the
2242 // cache mask has been properly extended.
2243 scratchValue->Merge(*publicValue, mergeBits);
2244 scratchBuffer->SetExecutorCacheMask(extendedMask);
2245 return scratchValue;
2246}
2247
2248template < typename Derived, typename DataManager >
2249bool
2251 const VdfEvaluationState &state,
2252 const VdfNode &node)
2253{
2254 // First, call into the derived engine to detect any cycles. If the engine
2255 // gets trapped in a cycle we need to interrupted the engine, such that we
2256 // do not get stuck in an infinite loop.
2257 const bool hasCycle = _Self()._DetectCycle(state, node);
2258
2259 // If either a cycle has been detected, or the interruption API on the
2260 // executor returns that the executor has been interrupted, we need to set
2261 // the internal interruption flag. _HasDetectedInterruption() will then be
2262 // queried at various stages of evaluation, which allows us to gracefully
2263 // wind down the engine.
2264 if (hasCycle || _executor.HasBeenInterrupted()) {
2265 _isInterrupted.store(true, std::memory_order_relaxed);
2266 return true;
2267 }
2268
2269 // This will return true if the interruption flag has previously been set.
2270 return _HasDetectedInterruption();
2271}
2272
2273template < typename Derived, typename DataManager >
2274bool
2277{
2278 return _isInterrupted.load(std::memory_order_relaxed);
2279}
2280
2281template < typename Derived, typename DataManager >
2282void
2284 const TfErrorMark &errorMark)
2285{
2286 TfErrorTransport transport = errorMark.Transport();
2287 _errors.grow_by(1)->swap(transport);
2288}
2289
2290template < typename Derived, typename DataManager >
2291void
2293{
2294 if (_errors.empty()) {
2295 return;
2296 }
2297
2298 // Post all the transported errors on the calling thread.
2299 for (TfErrorTransport &errorTransport : _errors) {
2300 errorTransport.Post();
2301 }
2302
2303 // Clear the transported errors container.
2304 _errors.clear();
2305}
2306
2307PXR_NAMESPACE_CLOSE_SCOPE
2308
2309#endif
Fast, compressed bit array which is capable of performing logical operations without first decompress...
bool AreAllUnset() const
Returns true, if all the bits in this bit array are unset.
Class used to record the end of the error-list.
Definition: errorMark.h:48
TfErrorTransport Transport() const
Remove all errors in this mark fom the error system and return them in a TfErrorTransport.
Definition: errorMark.h:109
bool IsClean() const
Return true if no new errors were posted in this thread since the last call to SetMark(),...
Definition: errorMark.h:82
A facility for transporting errors from thread to thread.
void swap(TfErrorTransport &other)
Swap this TfErrorTransport's content with other.
std::string const & GetString() const
Return the string that this token represents.
Definition: token.h:190
TF_API const std::string & GetTypeName() const
Return the machine-independent name for this type.
A context is the parameter bundle passed to callbacks of computations.
Definition: context.h:40
This object holds state that remains persistent during one round of network evaluation.
const VdfSchedule & GetSchedule() const
The schedule used for evaluation.
Execution stats profiling event logger.
static VDF_API void FillVector(TfType type, size_t numElements, VdfVector *vector)
Fills vector with the fallback value registered for the given type.
This object is responsible for storing the executor buffer data, comprised of the executor cache vect...
void ResetExecutorCache(const VdfMask &mask)
Reset the executor cache without releasing any memory and set the executor cache mask to mask.
const VdfMask & GetExecutorCacheMask() const
Get the available mask.
VdfVector * GetExecutorCache() const
Returns the executor cache stored at this buffer data instance.
void TakeOwnership(VdfVector *v)
Assumes ownership of the given vector.
void SetExecutorCacheMask(const VdfMask &mask)
Sets the available mask.
void YieldOwnership()
Yields ownership of the internal vector, i.e.
A client may instantiate an object of this class and set it in an executor, to collect errors that ma...
Abstract base class for classes that execute a VdfNetwork to compute a requested set of values.
A VdfInput is used to connect a VdfNode to one or more VdfNodes' outputs.
Definition: input.h:36
size_t GetNumConnections() const
Returns the number of connections for this input.
Definition: input.h:58
A VdfMask is placed on connections to specify the data flowing through them.
Definition: mask.h:37
size_t GetSize() const
Returns the size of the mask.
Definition: mask.h:158
bool IsAllZeros() const
Returns true if this mask has all entries unset.
Definition: mask.h:206
size_t GetLastSet() const
Returns the last set bit in the mask.
Definition: mask.h:236
bool IsEmpty() const
Returns true if this mask is empty, i.e.
Definition: mask.h:168
size_t GetFirstSet() const
Returns the first set bit in the mask.
Definition: mask.h:226
VdfMask::Bits const & GetBits() const
Get this mask's content as CtCompressedfBits.
Definition: mask.h:556
Class to hold on to an externally owned output and a mask.
Definition: maskedOutput.h:32
VdfOutput * GetOutput() const
Returns the VdfOutput.
Definition: maskedOutput.h:52
const VdfMask & GetMask() const
Returns the VdfMask.
Definition: maskedOutput.h:64
This is the base class for all nodes in a VdfNetwork.
Definition: node.h:53
virtual VDF_API VdfRequiredInputsPredicate GetRequiredInputsPredicate(const VdfContext &context) const
Returns a predicate, determining whether a given input and its connections are required in order to f...
virtual void Compute(const VdfContext &context) const =0
This is the method called to perform computation.
A VdfOutput represents an output on a node.
Definition: output.h:32
const VdfNode & GetNode() const
Returns the owning node for this output.
Definition: output.h:57
VDF_API std::string GetDebugName() const
Returns the debug name for this output.
VdfId GetId() const
The unique id of this output.
Definition: output.h:100
VDF_API const VdfOutputSpec & GetSpec() const
Returns the connector specification object for this output.
VDF_API const TfToken & GetName() const
Returns the name of this output.
const VdfInput * GetAssociatedInput() const
Returns the in/out connector associated with this output.
Definition: output.h:76
TfType GetType() const
Returns the type of this spec.
Definition: outputSpec.h:60
The base class for all parallel executor engines.
VdfParallelExecutorEngineBase(const VdfExecutorInterface &executor, DataManager *dataManager)
Constructor.
void RunSchedule(const VdfSchedule &schedule, const VdfRequest &computeRequest, VdfExecutorErrorLogger *errorLogger)
Executes the given schedule with a computeRequest and an optional errorLogger.
virtual ~VdfParallelExecutorEngineBase()
Destructor.
VdfParallelExecutorEngineBase(const VdfParallelExecutorEngineBase &)=delete
Noncopyable.
void RunSchedule(const VdfSchedule &schedule, const VdfRequest &computeRequest, VdfExecutorErrorLogger *errorLogger, Callback &&callback)
Executes the given schedule with a computeRequest and an optional errorLogger.
Instances of this class are used to synchronize dynamic, acyclic task graphs, allowing tasks to claim...
State
The different states a task can be in.
@ Wait
The task is already done.
@ Claimed
The task is currently running, the claimant must wait for the task to complete.
This predicate determines whether a given input value is needed to fulfill the input dependencies req...
bool IsRequiredRead(const VdfInput &input) const
Is this input a required read? Note that read/writes as well as prerequisite inputs are not required ...
bool HasRequiredReads() const
Are any inputs required?
bool RequiresAllReads() const
Are all of the inputs required?
Minimal iterator range that the schedule returns instances of, in order to facilitate iterating over ...
Definition: schedule.h:47
An OutputId is a small key object that, once obtained for a particular VdfOutput, can be used to quer...
Definition: schedule.h:91
bool IsValid() const
Returns whether this OutputId can be used to make queries about an output's scheduling.
Definition: schedule.h:97
Contains a specification of how to execute a particular VdfNetwork.
Definition: schedule.h:41
const VdfNetwork * GetNetwork() const
Returns the network for this schedule.
Definition: schedule.h:178
InputDependencyRange GetRequiredInputDependencies(const VdfScheduleComputeTask &task) const
Returns an iterable range of required (i.e.
Definition: schedule.h:468
VDF_API OutputId GetOutputId(const VdfOutput &output) const
Returns a small, cheap OutputId, which can be passed to other Get* methods in this class to efficient...
VDF_API const VdfMask & GetRequestMask(const OutputId &outputId) const
Returns the request mask associated with the given OutputId.
size_t GetNumUniqueInputDependencies() const
Returns the number of unique input dependencies created for the scheduled task graph.
Definition: schedule.h:371
InputDependencyRange GetPrereqInputDependencies(const VdfScheduleInputsTask &task) const
Returns an iterable range of prereq input dependencies for the given inputs task.
Definition: schedule.h:448
VDF_API const VdfOutput * GetFromBufferOutput(const OutputId &outputId) const
Returns the "from buffer's" output associated with the given OutputId.
const VdfScheduleTaskIndex GetKeepTaskIndex(const VdfNode &node) const
Returns an index to the keep task associated with the given node.
Definition: schedule.h:422
const VdfScheduleComputeTask & GetComputeTask(const VdfScheduleTaskIndex index) const
Returns the compute task associated with the given task index.
Definition: schedule.h:431
size_t GetNumKeepTasks() const
Returns the total number of keep tasks in the schedule.
Definition: schedule.h:395
VDF_API InputsRange GetInputs(const VdfNode &node) const
Returns a range of inputs scheduled for the given node.
VDF_API const VdfMask & GetKeepMask(const OutputId &outputId) const
Returns the keep mask associated with the given OutputId.
VDF_API VdfScheduleInputDependencyUniqueIndex GetUniqueIndex(const OutputId outputId) const
Returns the unique index assigned to the output.
VDF_API const VdfOutput * GetPassToOutput(const OutputId &outputId) const
Returns the "pass to" output associated with the given OutputId.
const TaskIdRange GetComputeTaskIds(const VdfNode &node) const
Returns a range of ids describing compute tasks associated with the given node.
Definition: schedule.h:402
size_t GetNumInputsTasks() const
Returns the total number of inputs tasks in the schedule.
Definition: schedule.h:383
VDF_API const VdfOutput * GetOutput(const OutputId &outputId) const
Returns the scheduled VdfOutput associated with the given OutputId.
size_t GetNumPrepTasks() const
Returns the total number of prep tasks in the schedule.
Definition: schedule.h:389
InputDependencyRange GetOptionalInputDependencies(const VdfScheduleInputsTask &task) const
Returns an iterable range of optional (i.e.
Definition: schedule.h:458
size_t GetNumComputeTasks() const
Returns the total number of compute tasks in the schedule.
Definition: schedule.h:377
This class is used to abstract away knowledge of the cache data used for each node.
Definition: vector.h:56
void Copy(const VdfVector &rhs, const VdfMask &mask)
Copies the contents of rhs into this vector.
Definition: vector.h:274
VDF_API void Merge(const VdfVector &rhs, const VdfMask::Bits &bits)
Merges the contents of rhs into this vector.
The isolating work dispatcher is a specialization of WorkDispatcher, mirroring its public API and doc...
Base class for a parallel task that emulates tbb::task (deprecated in the oneTBB version upgrade....
Definition: taskGraph.h:106
F * AllocateChild(Args &&... args)
Construct a new subtask and increment the reference count of the calling task.
Definition: taskGraph.h:133
Instances of this class are used to spawn and wait on a directed graph of tasks, where tasks preserve...
Definition: taskGraph.h:37
WORK_API void RunLists(const TaskLists &taskLists)
Submit concurrent tasks accumulated in thread-local lists for execution.
tbb::enumerable_thread_specific< TaskList > TaskLists
Thread-local storage for allocated tasks to be spawned.
Definition: taskGraph.h:60
void Wait()
Wait on all the running tasks to complete.
Definition: taskGraph.h:88
std::vector< BaseTask * > TaskList
Container for allocated tasks to be spawned.
Definition: taskGraph.h:57
#define TF_DEV_AXIOM(cond)
The same as TF_AXIOM, but compiled only in dev builds.
Definition: diagnostic.h:205
#define TF_WARN(...)
Issue a warning, but continue execution.
Definition: diagnostic.h:132
void WorkParallelForN(size_t n, Fn &&callback, size_t grainSize)
WorkParallelForN(size_t n, CallbackType callback, size_t grainSize = 1)
Definition: loops.h:57
VDF_API const VdfOutput * VdfGetAssociatedSourceOutput(const VdfOutput &output)
Returns the output that is the source of the associated input of output, if any and NULL otherwise.
This class contains scheduling information for an input.
Definition: scheduleNode.h:73
uint32_t VdfScheduleInputDependencyUniqueIndex
A sequential index assigned to the unique output and mask combination of a VdfScheduleInputDependency...
bool VdfScheduleTaskIsInvalid(uint32_t task)
Returns true if the given task index or id is invalid.
Definition: scheduleTasks.h:41
uint32_t VdfScheduleTaskIndex
Type describing a task index.
Definition: scheduleTasks.h:29
uint32_t VdfScheduleTaskId
Type describing a task id.
Definition: scheduleTasks.h:25
A VdfScheduleComputeTask represents a unit of computation for the parallel evaluation engine.
Definition: scheduleTasks.h:64
Describes a single input dependency, i.e.
Structure describing an additional task used to run prereqs and reads concurrently with read/write in...
Scoped event that automatically logs when created and destroyed.
Scoped event that automatically pushes and pops malloc tags for the given VdfNode.