9#include <kernel/util/dist.hpp> 
   10#include <kernel/util/time_stamp.hpp> 
   11#include <kernel/util/statistics.hpp> 
   16#include <condition_variable> 
   29    template <
typename DT_>
 
   43#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN) 
   44#ifdef FEAT_MPI_THREAD_MULTIPLE 
   48      std::condition_variable _cv_allreduce_called;
 
   50      bool _flag_allreduce_called;
 
   69#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
 
   70#ifdef FEAT_MPI_THREAD_MULTIPLE
 
   72        _cv_allreduce_called(),
 
   73        _flag_allreduce_called(false),
 
   98#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN) 
  105#ifdef FEAT_MPI_THREAD_MULTIPLE
 
  106        _flag_allreduce_called(false),
 
  107        _thread(_wait_function, std::cref(
_x), std::cref(comm), std::cref(op), std::ref(
_r), std::ref(
_mpi_exec), std::ref(
_mpi_wait),
 
  108            std::ref(_mutex), std::ref(_cv_allreduce_called), std::ref(_flag_allreduce_called)),
 
  114#ifdef FEAT_MPI_THREAD_MULTIPLE 
  115        std::unique_lock<std::mutex> l(_mutex);
 
  116        _cv_allreduce_called.wait(l, [
this]() {
return _flag_allreduce_called == 
true; });
 
  120        Statistics::add_time_mpi_execute_reduction(ts_start.
elapsed_now());
 
  145#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
 
  146#ifdef FEAT_MPI_THREAD_MULTIPLE
 
  148        _cv_allreduce_called(),
 
  149        _flag_allreduce_called(other._flag_allreduce_called),
 
  150        _thread(std::move(other._thread)),
 
  157        #ifdef FEAT_MPI_THREAD_MULTIPLE 
  158        XASSERTM(other._mutex.try_lock(), 
"Other is still locked by other thread!");
 
  159        XASSERTM(other._flag_allreduce_called, 
"Allreduce not called!");
 
  161        other._finished = 
true;
 
  175#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN) 
  176#ifdef FEAT_MPI_THREAD_MULTIPLE 
  177        XASSERTM(other._mutex.try_lock(), 
"Other is still locked by other thread!");
 
  178        XASSERTM(other._flag_allreduce_called, 
"Allreduce not called!");
 
  179        _flag_allreduce_called = other._flag_allreduce_called;
 
  180        _thread = std::move(other._thread);
 
  182        _req = std::move(other._req);
 
  187        other._finished = 
true;
 
  203#ifdef FEAT_MPI_THREAD_MULTIPLE 
  205        Statistics::add_time_mpi_execute_reduction(
_mpi_exec);
 
  206        Statistics::add_time_mpi_wait_reduction(
_mpi_wait);
 
  210        Statistics::add_time_mpi_wait_reduction(ts_start.
elapsed_now());
 
  225      static void _wait_function(
const DT_ & x, 
const Dist::Comm& comm, 
const Dist::Operation & op, DT_ & r, 
double & mpi_exec, 
double & mpi_wait, std::mutex & mutex,
 
  226          std::condition_variable & cv_allreduce_called, 
bool & flag_allreduce_called)
 
  229        std::unique_lock<std::mutex> l(mutex);
 
  231        flag_allreduce_called = 
true;
 
  233        cv_allreduce_called.notify_all();
 
#define XASSERT(expr)
Assertion macro definition.
#define XASSERTM(expr, msg)
Assertion macro definition with custom message.
Request iallreduce(const void *sendbuf, void *recvbuf, std::size_t count, const Datatype &datatype, const Operation &op) const
Nonblocking All-Reduce.
Communication Request class.
bool wait(Status &status)
Blocks until the request is fulfilled (or null).
Ticket class for asynchronous global operations on scalars.
SynchScalarTicket()
standard constructor
SynchScalarTicket(DT_ x, const Dist::Comm &comm, const Dist::Operation &op, bool sqrt=false)
Constructor.
DT_ _r
buffer containing the received data
SynchScalarTicket(const SynchScalarTicket &)=delete
Unwanted copy constructor: Do not implement!
~SynchScalarTicket()
Destructor.
bool _finished
signals, whether wait was already called
SynchScalarTicket(SynchScalarTicket &&other)
move constructor
Dist::Request _req
Our request for the corresponding iallreduce mpi call.
double _mpi_wait
holds our mpi reduction wait toe
double _mpi_exec
holds our mpi execution toe
SynchScalarTicket & operator=(const SynchScalarTicket &)=delete
Unwanted copy assignment operator: Do not implement!
SynchScalarTicket & operator=(SynchScalarTicket &&other)
move-assign operator
bool _sqrt
should we compute the sqrt of the result
DT_ _x
buffer containing the send data
TimeStamp & stamp()
Stamps the current time-stamp.
double elapsed_now() const
Calculates the time elapsed between the time stamp and now.
T_ sqrt(T_ x)
Returns the square-root of a value.
Communication Operation class.