Kokkos Core Kernels Package  Version of the Day
Kokkos_WorkGraphPolicy.hpp
1 /*
2 //@HEADER
3 // ************************************************************************
4 //
5 // Kokkos v. 3.0
6 // Copyright (2020) National Technology & Engineering
7 // Solutions of Sandia, LLC (NTESS).
8 //
9 // Under the terms of Contract DE-NA0003525 with NTESS,
10 // the U.S. Government retains certain rights in this software.
11 //
12 // Redistribution and use in source and binary forms, with or without
13 // modification, are permitted provided that the following conditions are
14 // met:
15 //
16 // 1. Redistributions of source code must retain the above copyright
17 // notice, this list of conditions and the following disclaimer.
18 //
19 // 2. Redistributions in binary form must reproduce the above copyright
20 // notice, this list of conditions and the following disclaimer in the
21 // documentation and/or other materials provided with the distribution.
22 //
23 // 3. Neither the name of the Corporation nor the names of the
24 // contributors may be used to endorse or promote products derived from
25 // this software without specific prior written permission.
26 //
27 // THIS SOFTWARE IS PROVIDED BY NTESS "AS IS" AND ANY
28 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NTESS OR THE
31 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 //
39 // Questions? Contact Christian R. Trott (crtrott@sandia.gov)
40 //
41 // ************************************************************************
42 //@HEADER
43 */
44 
45 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
46 #define KOKKOS_WORKGRAPHPOLICY_HPP
47 
48 #include <impl/Kokkos_AnalyzePolicy.hpp>
49 #include <Kokkos_Crs.hpp>
50 
51 namespace Kokkos {
52 namespace Impl {
53 
54 template <class functor_type, class execution_space, class... policy_args>
55 class WorkGraphExec;
56 
57 }
58 } // namespace Kokkos
59 
60 namespace Kokkos {
61 
62 template <class... Properties>
63 class WorkGraphPolicy : public Kokkos::Impl::PolicyTraits<Properties...> {
64  public:
65  using execution_policy = WorkGraphPolicy<Properties...>;
66  using self_type = WorkGraphPolicy<Properties...>;
67  using traits = Kokkos::Impl::PolicyTraits<Properties...>;
68  using index_type = typename traits::index_type;
69  using member_type = index_type;
70  using execution_space = typename traits::execution_space;
71  using memory_space = typename execution_space::memory_space;
73 
74  enum : std::int32_t {
75  END_TOKEN = -1,
76  BEGIN_TOKEN = -2,
77  COMPLETED_TOKEN = -3
78  };
79 
80  private:
82 
83  // Let N = m_graph.numRows(), the total work
84  // m_queue[ 0 .. N-1] = the ready queue
85  // m_queue[ N .. 2*N-1] = the waiting queue counts
86  // m_queue[2*N .. 2*N+2] = the ready queue hints
87 
88  graph_type const m_graph;
89  ints_type m_queue;
90 
91  KOKKOS_INLINE_FUNCTION
92  void push_work(const std::int32_t w) const noexcept {
93  const std::int32_t N = m_graph.numRows();
94 
95  std::int32_t volatile* const ready_queue = &m_queue[0];
96  std::int32_t volatile* const end_hint = &m_queue[2 * N + 1];
97 
98  // Push work to end of queue
99  const std::int32_t j = atomic_fetch_add(end_hint, 1);
100 
101  if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) {
102  // ERROR: past the end of queue or did not replace END_TOKEN
103  Kokkos::abort("WorkGraphPolicy push_work error");
104  }
105 
106  memory_fence();
107  }
108 
109  public:
124  KOKKOS_INLINE_FUNCTION
125  std::int32_t pop_work() const noexcept {
126  const std::int32_t N = m_graph.numRows();
127 
128  std::int32_t volatile* const ready_queue = &m_queue[0];
129  std::int32_t volatile* const begin_hint = &m_queue[2 * N];
130 
131  // begin hint is guaranteed to be less than or equal to
132  // actual begin location in the queue.
133 
134  for (std::int32_t i = *begin_hint; i < N; ++i) {
135  const std::int32_t w = ready_queue[i];
136 
137  if (w == END_TOKEN) {
138  return END_TOKEN;
139  }
140 
141  if ((w != BEGIN_TOKEN) &&
142  (w == atomic_compare_exchange(ready_queue + i, w,
143  (std::int32_t)BEGIN_TOKEN))) {
144  // Attempt to claim ready work index succeeded,
145  // update the hint and return work index
146  atomic_increment(begin_hint);
147  return w;
148  }
149  // arrive here when ready_queue[i] == BEGIN_TOKEN
150  }
151 
152  return COMPLETED_TOKEN;
153  }
154 
155  KOKKOS_INLINE_FUNCTION
156  void completed_work(std::int32_t w) const noexcept {
157  Kokkos::memory_fence();
158 
159  // Make sure the completed work function's memory accesses are flushed.
160 
161  const std::int32_t N = m_graph.numRows();
162 
163  std::int32_t volatile* const count_queue = &m_queue[N];
164 
165  const std::int32_t B = m_graph.row_map(w);
166  const std::int32_t E = m_graph.row_map(w + 1);
167 
168  for (std::int32_t i = B; i < E; ++i) {
169  const std::int32_t j = m_graph.entries(i);
170  if (1 == atomic_fetch_add(count_queue + j, -1)) {
171  push_work(j);
172  }
173  }
174  }
175 
176  struct TagInit {};
177  struct TagCount {};
178  struct TagReady {};
179 
186  KOKKOS_INLINE_FUNCTION
187  void operator()(const TagInit, int i) const noexcept {
188  m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0;
189  }
190 
191  KOKKOS_INLINE_FUNCTION
192  void operator()(const TagCount, int i) const noexcept {
193  std::int32_t volatile* const count_queue = &m_queue[m_graph.numRows()];
194 
195  atomic_increment(count_queue + m_graph.entries[i]);
196  }
197 
198  KOKKOS_INLINE_FUNCTION
199  void operator()(const TagReady, int w) const noexcept {
200  std::int32_t const* const count_queue = &m_queue[m_graph.numRows()];
201 
202  if (0 == count_queue[w]) push_work(w);
203  }
204 
205  execution_space space() const { return execution_space(); }
206 
207  WorkGraphPolicy(const graph_type& arg_graph)
208  : m_graph(arg_graph),
209  m_queue(view_alloc("queue", WithoutInitializing),
210  arg_graph.numRows() * 2 + 2) {
211  { // Initialize
212  using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
214  const closure_type closure(*this, policy_type(0, m_queue.size()));
215  closure.execute();
216  execution_space().fence();
217  }
218 
219  { // execute-after counts
220  using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
222  const closure_type closure(*this, policy_type(0, m_graph.entries.size()));
223  closure.execute();
224  execution_space().fence();
225  }
226 
227  { // Scheduling ready tasks
228  using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
230  const closure_type closure(*this, policy_type(0, m_graph.numRows()));
231  closure.execute();
232  execution_space().fence();
233  }
234  }
235 };
236 
237 } // namespace Kokkos
238 
239 #ifdef KOKKOS_ENABLE_SERIAL
240 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
241 #endif
242 
243 #ifdef KOKKOS_ENABLE_OPENMP
244 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
245 #endif
246 
247 #ifdef KOKKOS_ENABLE_CUDA
248 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
249 #endif
250 
251 #ifdef KOKKOS_ENABLE_HIP
252 #include "HIP/Kokkos_HIP_WorkGraphPolicy.hpp"
253 #endif
254 
255 #ifdef KOKKOS_ENABLE_THREADS
256 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
257 #endif
258 
259 #ifdef KOKKOS_ENABLE_HPX
260 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
261 #endif
262 
263 #endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
Compressed row storage array.
Definition: Kokkos_Crs.hpp:86
Implementation of the ParallelFor operator that has a partial specialization for the device...
Definition: dummy.cpp:3