9#include <kernel/util/dist.hpp>
10#include <kernel/util/time_stamp.hpp>
11#include <kernel/util/statistics.hpp>
16#include <condition_variable>
29 template <
typename DT_>
43#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
44#ifdef FEAT_MPI_THREAD_MULTIPLE
48 std::condition_variable _cv_allreduce_called;
50 bool _flag_allreduce_called;
69#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
70#ifdef FEAT_MPI_THREAD_MULTIPLE
72 _cv_allreduce_called(),
73 _flag_allreduce_called(false),
98#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
105#ifdef FEAT_MPI_THREAD_MULTIPLE
106 _flag_allreduce_called(false),
107 _thread(_wait_function, std::cref(
_x), std::cref(comm), std::cref(op), std::ref(
_r), std::ref(
_mpi_exec), std::ref(
_mpi_wait),
108 std::ref(_mutex), std::ref(_cv_allreduce_called), std::ref(_flag_allreduce_called)),
114#ifdef FEAT_MPI_THREAD_MULTIPLE
115 std::unique_lock<std::mutex> l(_mutex);
116 _cv_allreduce_called.wait(l, [
this]() {
return _flag_allreduce_called ==
true; });
120 Statistics::add_time_mpi_execute_reduction(ts_start.
elapsed_now());
145#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
146#ifdef FEAT_MPI_THREAD_MULTIPLE
148 _cv_allreduce_called(),
149 _flag_allreduce_called(other._flag_allreduce_called),
150 _thread(std::move(other._thread)),
157 #ifdef FEAT_MPI_THREAD_MULTIPLE
158 XASSERTM(other._mutex.try_lock(),
"Other is still locked by other thread!");
159 XASSERTM(other._flag_allreduce_called,
"Allreduce not called!");
161 other._finished =
true;
175#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
176#ifdef FEAT_MPI_THREAD_MULTIPLE
177 XASSERTM(other._mutex.try_lock(),
"Other is still locked by other thread!");
178 XASSERTM(other._flag_allreduce_called,
"Allreduce not called!");
179 _flag_allreduce_called = other._flag_allreduce_called;
180 _thread = std::move(other._thread);
182 _req = std::move(other._req);
187 other._finished =
true;
203#ifdef FEAT_MPI_THREAD_MULTIPLE
205 Statistics::add_time_mpi_execute_reduction(
_mpi_exec);
206 Statistics::add_time_mpi_wait_reduction(
_mpi_wait);
210 Statistics::add_time_mpi_wait_reduction(ts_start.
elapsed_now());
225 static void _wait_function(
const DT_ & x,
const Dist::Comm& comm,
const Dist::Operation & op, DT_ & r,
double & mpi_exec,
double & mpi_wait, std::mutex & mutex,
226 std::condition_variable & cv_allreduce_called,
bool & flag_allreduce_called)
229 std::unique_lock<std::mutex> l(mutex);
231 flag_allreduce_called =
true;
233 cv_allreduce_called.notify_all();
#define XASSERT(expr)
Assertion macro definition.
#define XASSERTM(expr, msg)
Assertion macro definition with custom message.
Request iallreduce(const void *sendbuf, void *recvbuf, std::size_t count, const Datatype &datatype, const Operation &op) const
Nonblocking All-Reduce.
Communication Request class.
bool wait(Status &status)
Blocks until the request is fulfilled (or null).
Ticket class for asynchronous global operations on scalars.
SynchScalarTicket()
standard constructor
SynchScalarTicket(DT_ x, const Dist::Comm &comm, const Dist::Operation &op, bool sqrt=false)
Constructor.
DT_ _r
buffer containing the received data
SynchScalarTicket(const SynchScalarTicket &)=delete
Unwanted copy constructor: Do not implement!
~SynchScalarTicket()
Destructor.
bool _finished
signals, whether wait was already called
SynchScalarTicket(SynchScalarTicket &&other)
move constructor
Dist::Request _req
Our request for the corresponding iallreduce mpi call.
double _mpi_wait
holds our mpi reduction wait toe
double _mpi_exec
holds our mpi execution toe
SynchScalarTicket & operator=(const SynchScalarTicket &)=delete
Unwanted copy assignment operator: Do not implement!
SynchScalarTicket & operator=(SynchScalarTicket &&other)
move-assign operator
bool _sqrt
should we compute the sqrt of the result
DT_ _x
buffer containing the send data
TimeStamp & stamp()
Stamps the current time-stamp.
double elapsed_now() const
Calculates the time elapsed between the time stamp and now.
T_ sqrt(T_ x)
Returns the square-root of a value.
Communication Operation class.