44 #ifndef KOKKOS_TASKSCHEDULER_HPP 45 #define KOKKOS_TASKSCHEDULER_HPP 49 #include <Kokkos_Macros.hpp> 50 #if defined( KOKKOS_ENABLE_TASKDAG ) 52 #include <Kokkos_Core_fwd.hpp> 55 #include <Kokkos_MemoryPool.hpp> 56 #include <impl/Kokkos_Tags.hpp> 64 template<
typename Arg1 =
void ,
typename Arg2 =
void >
67 template<
typename Space >
70 template<
typename Space >
71 void wait( TaskScheduler< Space >
const & );
73 template<
typename Space >
74 struct is_scheduler :
public std::false_type {};
76 template<
typename Space >
77 struct is_scheduler< TaskScheduler< Space > > :
public std::true_type {};
81 #include <impl/Kokkos_TaskQueue.hpp> 103 template<
typename Space ,
typename ResultType ,
typename FunctorType >
120 template<
typename Arg1 ,
typename Arg2 >
124 template<
typename >
friend class TaskScheduler ;
125 template<
typename ,
typename >
friend class Future ;
126 template<
typename ,
typename ,
typename >
friend class Impl::TaskBase ;
128 enum { Arg1_is_space = Kokkos::is_space< Arg1 >::value };
129 enum { Arg2_is_space = Kokkos::is_space< Arg2 >::value };
130 enum { Arg1_is_value = ! Arg1_is_space &&
131 ! std::is_same< Arg1 , void >::value };
132 enum { Arg2_is_value = ! Arg2_is_space &&
133 ! std::is_same< Arg2 , void >::value };
135 static_assert( ! ( Arg1_is_space && Arg2_is_space )
136 ,
"Future cannot be given two spaces" );
138 static_assert( ! ( Arg1_is_value && Arg2_is_value )
139 ,
"Future cannot be given two value types" );
142 typename std::conditional< Arg1_is_value , Arg1 ,
143 typename std::conditional< Arg2_is_value , Arg2 ,
void 147 typename std::conditional< Arg1_is_space , Arg1 ,
148 typename std::conditional< Arg2_is_space , Arg2 ,
void 151 using task_base = Impl::TaskBase< void , void , void > ;
152 using queue_type = Impl::TaskQueue< Space > ;
156 KOKKOS_INLINE_FUNCTION
explicit 157 Future( task_base * task ) : m_task(0)
158 {
if ( task ) queue_type::assign( & m_task , task ); }
164 using execution_space =
typename Space::execution_space ;
165 using value_type = ValueType ;
169 KOKKOS_INLINE_FUNCTION
170 bool is_null()
const {
return 0 == m_task ; }
172 KOKKOS_INLINE_FUNCTION
173 int reference_count()
const 174 {
return 0 != m_task ? m_task->reference_count() : 0 ; }
178 KOKKOS_INLINE_FUNCTION
180 {
if ( m_task ) queue_type::assign( & m_task , (task_base*)0 ); }
184 KOKKOS_INLINE_FUNCTION
185 ~Future() { clear(); }
189 KOKKOS_INLINE_FUNCTION
190 constexpr Future() noexcept : m_task(0) {}
192 KOKKOS_INLINE_FUNCTION
193 Future( Future && rhs )
194 : m_task( rhs.m_task ) { rhs.m_task = 0 ; }
196 KOKKOS_INLINE_FUNCTION
197 Future(
const Future & rhs )
199 {
if ( rhs.m_task ) queue_type::assign( & m_task , rhs.m_task ); }
201 KOKKOS_INLINE_FUNCTION
202 Future & operator = ( Future && rhs )
205 m_task = rhs.m_task ;
210 KOKKOS_INLINE_FUNCTION
211 Future & operator = (
const Future & rhs )
213 if ( m_task || rhs.m_task ) queue_type::assign( & m_task , rhs.m_task );
219 template<
class A1 ,
class A2 >
220 KOKKOS_INLINE_FUNCTION
221 Future( Future<A1,A2> && rhs )
222 : m_task( rhs.m_task )
225 ( std::is_same< Space , void >::value ||
226 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
227 ,
"Assigned Futures must have the same space" );
230 ( std::is_same< value_type , void >::value ||
231 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
232 ,
"Assigned Futures must have the same value_type" );
237 template<
class A1 ,
class A2 >
238 KOKKOS_INLINE_FUNCTION
239 Future(
const Future<A1,A2> & rhs )
243 ( std::is_same< Space , void >::value ||
244 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
245 ,
"Assigned Futures must have the same space" );
248 ( std::is_same< value_type , void >::value ||
249 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
250 ,
"Assigned Futures must have the same value_type" );
252 if ( rhs.m_task ) queue_type::assign( & m_task , rhs.m_task );
255 template<
class A1 ,
class A2 >
256 KOKKOS_INLINE_FUNCTION
257 Future & operator = (
const Future<A1,A2> & rhs )
260 ( std::is_same< Space , void >::value ||
261 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
262 ,
"Assigned Futures must have the same space" );
265 ( std::is_same< value_type , void >::value ||
266 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
267 ,
"Assigned Futures must have the same value_type" );
269 if ( m_task || rhs.m_task ) queue_type::assign( & m_task , rhs.m_task );
273 template<
class A1 ,
class A2 >
274 KOKKOS_INLINE_FUNCTION
275 Future & operator = ( Future<A1,A2> && rhs )
278 ( std::is_same< Space , void >::value ||
279 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
280 ,
"Assigned Futures must have the same space" );
283 ( std::is_same< value_type , void >::value ||
284 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
285 ,
"Assigned Futures must have the same value_type" );
288 m_task = rhs.m_task ;
295 KOKKOS_INLINE_FUNCTION
296 int is_ready() const noexcept
297 {
return ( 0 == m_task ) || ( ((task_base*) task_base::LockTag) == m_task->m_wait ); }
299 KOKKOS_INLINE_FUNCTION
300 const typename Impl::TaskResult< ValueType >::reference_type
304 Kokkos::abort(
"Kokkos:::Future::get ERROR: is_null()");
306 return Impl::TaskResult< ValueType >::get( m_task );
311 template<
typename ,
typename ExecSpace =
void >
312 struct is_future :
public std::false_type {};
314 template<
typename Arg1 ,
typename Arg2 ,
typename ExecSpace >
315 struct is_future< Future<Arg1,Arg2> , ExecSpace >
316 :
public std::integral_constant
318 ( std::is_same< ExecSpace , void >::value ||
319 std::is_same< ExecSpace
320 , typename Future<Arg1,Arg2>::execution_space >::value )
330 enum class TaskPriority : int { High = 0
344 template<
int TaskEnum ,
typename DepFutureType >
345 struct TaskPolicyData
347 using execution_space =
typename DepFutureType::execution_space ;
348 using scheduler_type = TaskScheduler< execution_space > ;
350 enum :
int { m_task_type = TaskEnum };
352 scheduler_type
const * m_scheduler ;
353 DepFutureType
const m_dependence ;
356 TaskPolicyData() = delete ;
357 TaskPolicyData( TaskPolicyData && ) = default ;
358 TaskPolicyData( TaskPolicyData
const & ) = default ;
359 TaskPolicyData & operator = ( TaskPolicyData && ) = default ;
360 TaskPolicyData & operator = ( TaskPolicyData
const & ) = default ;
362 KOKKOS_INLINE_FUNCTION
363 TaskPolicyData( DepFutureType
const & arg_future
364 , Kokkos::TaskPriority
const & arg_priority )
366 , m_dependence( arg_future )
367 , m_priority( static_cast<int>( arg_priority ) )
370 KOKKOS_INLINE_FUNCTION
371 TaskPolicyData( scheduler_type
const & arg_scheduler
372 , Kokkos::TaskPriority
const & arg_priority )
373 : m_scheduler( & arg_scheduler )
375 , m_priority( static_cast<int>( arg_priority ) )
378 KOKKOS_INLINE_FUNCTION
379 TaskPolicyData( scheduler_type
const & arg_scheduler
380 , DepFutureType
const & arg_future
381 , Kokkos::TaskPriority
const & arg_priority )
382 : m_scheduler( & arg_scheduler )
383 , m_dependence( arg_future )
384 , m_priority( static_cast<int>( arg_priority ) )
396 template<
typename ExecSpace >
401 using track_type = Kokkos::Impl::SharedAllocationTracker ;
402 using queue_type = Kokkos::Impl::TaskQueue< ExecSpace > ;
403 using task_base = Impl::TaskBase< void , void , void > ;
406 queue_type * m_queue ;
412 using execution_space = ExecSpace ;
413 using memory_space =
typename queue_type::memory_space ;
414 using memory_pool =
typename queue_type::memory_pool ;
416 typename Kokkos::Impl::TaskQueueSpecialization< ExecSpace >::member_type ;
418 KOKKOS_INLINE_FUNCTION
419 TaskScheduler() : m_track(), m_queue(0) {}
421 KOKKOS_INLINE_FUNCTION
422 TaskScheduler( TaskScheduler && rhs )
423 : m_track( rhs.m_track ), m_queue( rhs.m_queue ) {}
425 KOKKOS_INLINE_FUNCTION
426 TaskScheduler( TaskScheduler
const & rhs )
427 : m_track( rhs.m_track ), m_queue( rhs.m_queue ) {}
429 KOKKOS_INLINE_FUNCTION
430 TaskScheduler & operator = ( TaskScheduler && rhs )
431 { m_track = rhs.m_track ; m_queue = rhs.m_queue ;
return *this ; }
433 KOKKOS_INLINE_FUNCTION
434 TaskScheduler & operator = ( TaskScheduler
const & rhs )
435 { m_track = rhs.m_track ; m_queue = rhs.m_queue ;
return *this ; }
437 TaskScheduler( memory_pool
const & arg_memory_pool )
441 typedef Kokkos::Impl::SharedAllocationRecord
442 < memory_space ,
typename queue_type::Destroy >
445 record_type * record =
446 record_type::allocate( memory_space()
451 m_queue =
new( record->data() ) queue_type( arg_memory_pool );
453 record->m_destroy.m_queue = m_queue ;
455 m_track.assign_allocated_record_to_uninitialized( record );
458 TaskScheduler( memory_space
const & arg_memory_space
459 ,
size_t const mempool_capacity
460 ,
unsigned const mempool_min_block_size
461 ,
unsigned const mempool_max_block_size
462 ,
unsigned const mempool_superblock_size
464 : TaskScheduler( memory_pool( arg_memory_space
466 , mempool_min_block_size
467 , mempool_max_block_size
468 , mempool_superblock_size ) )
473 KOKKOS_INLINE_FUNCTION
474 memory_pool * memory() const noexcept
475 {
return m_queue ? &( m_queue->m_memory ) : (memory_pool*) 0 ; }
479 template<
typename FunctorType >
481 size_t spawn_allocation_size()
const 482 {
return m_queue->template spawn_allocation_size< FunctorType >(); }
486 size_t when_all_allocation_size(
int narg )
const 487 {
return m_queue->when_all_allocation_size( narg ); }
491 template<
int TaskEnum ,
typename DepFutureType ,
typename FunctorType >
492 KOKKOS_FUNCTION
static 494 spawn( Impl::TaskPolicyData<TaskEnum,DepFutureType>
const & arg_policy
495 ,
typename task_base::function_type arg_function
496 , FunctorType && arg_functor
499 using value_type =
typename FunctorType::value_type ;
500 using future_type = Future< value_type , execution_space > ;
501 using task_type = Impl::TaskBase< execution_space
505 queue_type *
const queue =
506 arg_policy.m_scheduler ? arg_policy.m_scheduler->m_queue : (
507 arg_policy.m_dependence.m_task
508 ?
static_cast<queue_type*
>(arg_policy.m_dependence.m_task->m_queue)
512 Kokkos::abort(
"Kokkos spawn requires scheduler or non-null Future");
515 if ( arg_policy.m_dependence.m_task != 0 &&
516 arg_policy.m_dependence.m_task->m_queue != queue ) {
517 Kokkos::abort(
"Kokkos spawn given incompatible scheduler and Future");
524 queue->iff_single_thread_recursive_execute();
532 const size_t alloc_size =
533 queue->template spawn_allocation_size< FunctorType >();
536 reinterpret_cast< task_type *
>(queue->allocate(alloc_size) );
544 new ( f.m_task ) task_type( std::move(arg_functor) );
546 f.m_task->m_apply = arg_function ;
547 f.m_task->m_queue = queue ;
548 f.m_task->m_next = arg_policy.m_dependence.m_task ;
549 f.m_task->m_ref_count = 2 ;
550 f.m_task->m_alloc_size = alloc_size ;
551 f.m_task->m_task_type = arg_policy.m_task_type ;
552 f.m_task->m_priority = arg_policy.m_priority ;
554 Kokkos::memory_fence();
561 queue->schedule_runnable( f.m_task );
569 template<
typename FunctorType ,
typename A1 ,
typename A2 >
570 KOKKOS_FUNCTION
static 572 respawn( FunctorType * arg_self
573 , Future<A1,A2>
const & arg_dependence
574 , TaskPriority
const & arg_priority
579 using value_type =
typename FunctorType::value_type ;
580 using task_type = Impl::TaskBase< execution_space
584 task_type *
const task =
static_cast< task_type *
>( arg_self );
586 task->m_priority =
static_cast<int>(arg_priority);
588 task->add_dependence( arg_dependence.m_task );
593 template<
typename FunctorType >
594 KOKKOS_FUNCTION
static 596 respawn( FunctorType * arg_self
597 , TaskScheduler
const &
598 , TaskPriority
const & arg_priority
603 using value_type =
typename FunctorType::value_type ;
604 using task_type = Impl::TaskBase< execution_space
608 task_type *
const task =
static_cast< task_type *
>( arg_self );
610 task->m_priority =
static_cast<int>(arg_priority);
612 task->add_dependence( (task_base*) 0 );
621 template<
typename A1 ,
typename A2 >
622 KOKKOS_FUNCTION
static 623 Future< execution_space >
624 when_all( Future< A1 , A2 >
const arg[] ,
int narg )
626 using future_type = Future< execution_space > ;
627 using task_base = Kokkos::Impl::TaskBase< void , void , void > ;
633 queue_type * queue = 0 ;
635 for (
int i = 0 ; i < narg ; ++i ) {
636 task_base *
const t = arg[i].m_task ;
639 Kokkos::atomic_increment( &(t->m_ref_count) );
641 queue =
static_cast< queue_type *
>( t->m_queue );
643 else if ( queue != static_cast< queue_type * >( t->m_queue ) ) {
644 Kokkos::abort(
"Kokkos when_all Futures must be in the same scheduler" );
651 size_t const alloc_size = queue->when_all_allocation_size( narg );
654 reinterpret_cast< task_base *
>( queue->allocate( alloc_size ) );
662 new( f.m_task ) task_base();
664 f.m_task->m_queue = queue ;
665 f.m_task->m_ref_count = 2 ;
666 f.m_task->m_alloc_size = alloc_size ;
667 f.m_task->m_dep_count = narg ;
668 f.m_task->m_task_type = task_base::Aggregate ;
672 task_base *
volatile *
const dep =
673 f.m_task->aggregate_dependences();
675 for (
int i = 0 ; i < narg ; ++i ) { dep[i] = arg[i].m_task ; }
677 Kokkos::memory_fence();
679 queue->schedule_aggregate( f.m_task );
690 Future< execution_space >
691 when_all(
int narg , F
const func )
693 using input_type = decltype( func(0) );
694 using future_type = Future< execution_space > ;
695 using task_base = Kokkos::Impl::TaskBase< void , void , void > ;
697 static_assert( is_future< input_type >::value
698 ,
"Functor must return a Kokkos::Future" );
702 if ( 0 == narg )
return f ;
704 size_t const alloc_size = m_queue->when_all_allocation_size( narg );
707 reinterpret_cast< task_base *
>( m_queue->allocate( alloc_size ) );
715 new( f.m_task ) task_base();
717 f.m_task->m_queue = m_queue ;
718 f.m_task->m_ref_count = 2 ;
719 f.m_task->m_alloc_size = alloc_size ;
720 f.m_task->m_dep_count = narg ;
721 f.m_task->m_task_type = task_base::Aggregate ;
725 task_base *
volatile *
const dep =
726 f.m_task->aggregate_dependences();
728 for (
int i = 0 ; i < narg ; ++i ) {
729 const input_type arg_f = func(i);
730 if ( 0 != arg_f.m_task ) {
732 if ( m_queue != static_cast< queue_type * >( arg_f.m_task->m_queue ) ) {
733 Kokkos::abort(
"Kokkos when_all Futures must be in the same scheduler" );
736 Kokkos::atomic_increment( &(arg_f.m_task->m_ref_count) );
737 dep[i] = arg_f.m_task ;
741 Kokkos::memory_fence();
743 m_queue->schedule_aggregate( f.m_task );
751 KOKKOS_INLINE_FUNCTION
752 int allocation_capacity() const noexcept
753 {
return m_queue->m_memory.capacity(); }
755 KOKKOS_INLINE_FUNCTION
756 int allocated_task_count() const noexcept
757 {
return m_queue->m_count_alloc ; }
759 KOKKOS_INLINE_FUNCTION
760 int allocated_task_count_max() const noexcept
761 {
return m_queue->m_max_alloc ; }
763 KOKKOS_INLINE_FUNCTION
764 long allocated_task_count_accum() const noexcept
765 {
return m_queue->m_accum_alloc ; }
769 template<
typename S >
771 void Kokkos::wait( Kokkos::TaskScheduler< S >
const & );
785 template<
typename T >
786 Kokkos::Impl::TaskPolicyData
787 < Kokkos::Impl::TaskBase<void,void,void>::TaskTeam
788 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
791 KOKKOS_INLINE_FUNCTION
792 TaskTeam( T
const & arg
793 , TaskPriority
const & arg_priority = TaskPriority::Regular
796 static_assert( Kokkos::is_future<T>::value ||
797 Kokkos::is_scheduler<T>::value
798 ,
"Kokkos TaskTeam argument must be Future or TaskScheduler" );
801 Kokkos::Impl::TaskPolicyData
802 < Kokkos::Impl::TaskBase<void,void,void>::TaskTeam
803 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
805 >( arg , arg_priority );
808 template<
typename E ,
typename F >
810 TaskPolicyData< Kokkos::Impl::TaskBase<void,void,void>::TaskTeam , F >
811 KOKKOS_INLINE_FUNCTION
812 TaskTeam( TaskScheduler<E>
const & arg_scheduler
813 , F
const & arg_future
814 ,
typename std::enable_if< Kokkos::is_future<F>::value ,
815 TaskPriority >::type
const & arg_priority = TaskPriority::Regular
819 Kokkos::Impl::TaskPolicyData
820 < Kokkos::Impl::TaskBase<void,void,void>::TaskTeam , F >
821 ( arg_scheduler , arg_future , arg_priority );
826 template<
typename T >
827 Kokkos::Impl::TaskPolicyData
828 < Kokkos::Impl::TaskBase<void,void,void>::TaskSingle
829 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
832 KOKKOS_INLINE_FUNCTION
833 TaskSingle( T
const & arg
834 , TaskPriority
const & arg_priority = TaskPriority::Regular
837 static_assert( Kokkos::is_future<T>::value ||
838 Kokkos::is_scheduler<T>::value
839 ,
"Kokkos TaskSingle argument must be Future or TaskScheduler" );
842 Kokkos::Impl::TaskPolicyData
843 < Kokkos::Impl::TaskBase<void,void,void>::TaskSingle
844 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
846 >( arg , arg_priority );
849 template<
typename E ,
typename F >
851 TaskPolicyData< Kokkos::Impl::TaskBase<void,void,void>::TaskSingle , F >
852 KOKKOS_INLINE_FUNCTION
853 TaskSingle( TaskScheduler<E>
const & arg_scheduler
854 , F
const & arg_future
855 ,
typename std::enable_if< Kokkos::is_future<F>::value ,
856 TaskPriority >::type
const & arg_priority = TaskPriority::Regular
860 Kokkos::Impl::TaskPolicyData
861 < Kokkos::Impl::TaskBase<void,void,void>::TaskSingle , F >
862 ( arg_scheduler , arg_future , arg_priority );
873 template<
int TaskEnum
874 ,
typename DepFutureType
875 ,
typename FunctorType >
876 Future<
typename FunctorType::value_type
877 ,
typename DepFutureType::execution_space >
878 host_spawn( Impl::TaskPolicyData<TaskEnum,DepFutureType>
const & arg_policy
879 , FunctorType && arg_functor
882 using exec_space =
typename DepFutureType::execution_space ;
883 using scheduler = TaskScheduler< exec_space > ;
885 typedef Impl::TaskBase< exec_space
886 ,
typename FunctorType::value_type
890 static_assert( TaskEnum == task_type::TaskTeam ||
891 TaskEnum == task_type::TaskSingle
892 ,
"Kokkos host_spawn requires TaskTeam or TaskSingle" );
896 typename task_type::function_type
const ptr =
897 Kokkos::Impl::TaskQueueSpecialization< exec_space >::
898 template get_function_pointer< task_type >();
900 return scheduler::spawn( arg_policy , ptr , std::move(arg_functor) );
909 template<
int TaskEnum
910 ,
typename DepFutureType
911 ,
typename FunctorType >
912 Future<
typename FunctorType::value_type
913 ,
typename DepFutureType::execution_space >
914 KOKKOS_INLINE_FUNCTION
915 task_spawn( Impl::TaskPolicyData<TaskEnum,DepFutureType>
const & arg_policy
916 , FunctorType && arg_functor
919 using exec_space =
typename DepFutureType::execution_space ;
920 using scheduler = TaskScheduler< exec_space > ;
922 typedef Impl::TaskBase< exec_space
923 ,
typename FunctorType::value_type
927 #if defined( KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST ) && \ 928 defined( KOKKOS_ENABLE_CUDA ) 930 static_assert( ! std::is_same< Kokkos::Cuda , exec_space >::value
931 ,
"Error calling Kokkos::task_spawn for Cuda space within Host code" );
935 static_assert( TaskEnum == task_type::TaskTeam ||
936 TaskEnum == task_type::TaskSingle
937 ,
"Kokkos host_spawn requires TaskTeam or TaskSingle" );
939 typename task_type::function_type
const ptr = task_type::apply ;
941 return scheduler::spawn( arg_policy , ptr , std::move(arg_functor) );
949 template<
typename FunctorType ,
typename T >
951 KOKKOS_INLINE_FUNCTION
954 , TaskPriority
const & arg_priority = TaskPriority::Regular
957 static_assert( Kokkos::is_future<T>::value ||
958 Kokkos::is_scheduler<T>::value
959 ,
"Kokkos respawn argument must be Future or TaskScheduler" );
961 TaskScheduler< typename T::execution_space >::
962 respawn( arg_self , arg , arg_priority );
967 template<
typename A1 ,
typename A2 >
968 KOKKOS_INLINE_FUNCTION
969 Future< typename Future< A1 , A2 >::execution_space >
970 when_all( Future< A1 , A2 >
const arg[]
974 return TaskScheduler< typename Future<A1,A2>::execution_space >::
975 when_all( arg , narg );
981 template<
typename ExecSpace >
983 void wait( TaskScheduler< ExecSpace >
const & scheduler )
984 { scheduler.m_queue->execute(); }
Future< typename FunctorType::value_type, typename DepFutureType::execution_space > KOKKOS_INLINE_FUNCTION task_spawn(Impl::TaskPolicyData< TaskEnum, DepFutureType > const &arg_policy, FunctorType &&arg_functor)
A task spawns a task with options.
void KOKKOS_INLINE_FUNCTION respawn(FunctorType *arg_self, T const &arg, TaskPriority const &arg_priority=TaskPriority::Regular)
A task respawns itself with options.
Future< typename FunctorType::value_type, typename DepFutureType::execution_space > host_spawn(Impl::TaskPolicyData< TaskEnum, DepFutureType > const &arg_policy, FunctorType &&arg_functor)
A host control thread spawns a task with options.