FEAT 3
Finite Element Analysis Toolbox
Loading...
Searching...
No Matches
synch_scal.hpp
1// FEAT3: Finite Element Analysis Toolbox, Version 3
2// Copyright (C) 2010 by Stefan Turek & the FEAT group
3// FEAT3 is released under the GNU General Public License version 3,
4// see the file 'copyright.txt' in the top level directory for details.
5
6#pragma once
7
9#include <kernel/util/dist.hpp>
10#include <kernel/util/time_stamp.hpp>
11#include <kernel/util/statistics.hpp>
12
13#include <thread>
14#include <functional>
15#include <mutex>
16#include <condition_variable>
17
18namespace FEAT
19{
20 namespace Global
21 {
29 template <typename DT_>
31 {
32 protected:
34 DT_ _r;
36 DT_ _x;
38 bool _sqrt;
40 double _mpi_exec;
42 double _mpi_wait;
43#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
44#ifdef FEAT_MPI_THREAD_MULTIPLE
46 std::mutex _mutex;
48 std::condition_variable _cv_allreduce_called;
50 bool _flag_allreduce_called;
52 std::thread _thread;
53#else // no MPI_THREAD_MULTIPLE
55 Dist::Request _req;
56#endif // MPI_THREAD_MULTIPLE
57#endif //(defined(FEAT_HAVE_MPI) && defined(FEAT_MPI_THREAD_MULTIPLE)) || defined(DOXYGEN)
59 bool _finished;
60
61 public:
64 _r(DT_(0)),
65 _x(DT_(0)),
66 _sqrt(false),
67 _mpi_exec(0.0),
68 _mpi_wait(0.0),
69#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
70#ifdef FEAT_MPI_THREAD_MULTIPLE
71 _mutex(),
72 _cv_allreduce_called(),
73 _flag_allreduce_called(false),
74 _thread(),
75#else // no MPI_THREAD_MULTIPLE
76 _req(),
77#endif // MPI_THREAD_MULTIPLE
78#endif //(defined(FEAT_HAVE_MPI) && defined(FEAT_MPI_THREAD_MULTIPLE)) || defined(DOXYGEN)
79 _finished(true)
80 {
81 }
82
98#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
99 explicit SynchScalarTicket(DT_ x, const Dist::Comm& comm, const Dist::Operation& op, bool sqrt = false) :
100 _r(DT_(0)),
101 _x(x),
102 _sqrt(sqrt),
103 _mpi_exec(double(0)),
104 _mpi_wait(double(0)),
105#ifdef FEAT_MPI_THREAD_MULTIPLE
106 _flag_allreduce_called(false),
107 _thread(_wait_function, std::cref(_x), std::cref(comm), std::cref(op), std::ref(_r), std::ref(_mpi_exec), std::ref(_mpi_wait),
108 std::ref(_mutex), std::ref(_cv_allreduce_called), std::ref(_flag_allreduce_called)),
109#else
110 _req(),
111#endif
112 _finished(false)
113 {
114#ifdef FEAT_MPI_THREAD_MULTIPLE
115 std::unique_lock<std::mutex> l(_mutex);
116 _cv_allreduce_called.wait(l, [this]() {return _flag_allreduce_called == true; });
117#else // no FEAT_MPI_THREAD_MULTIPLE
118 TimeStamp ts_start;
119 _req = comm.iallreduce(&_x, &_r, std::size_t(1), op);
120 Statistics::add_time_mpi_execute_reduction(ts_start.elapsed_now());
121#endif // FEAT_MPI_THREAD_MULTIPLE
122 }
123#else // non-MPI version
124 explicit SynchScalarTicket(const DT_ & in, const Dist::Comm&, const Dist::Operation&, bool sqrt = false) :
125 _r(in),
126 _x(in),
127 _sqrt(sqrt),
128 _finished(false)
129 {
130 }
131#endif // FEAT_HAVE_MPI || DOXYGEN
132
137
140 _r(other._r),
141 _x(other._x),
142 _sqrt(other._sqrt),
143 _mpi_exec(other._mpi_exec),
144 _mpi_wait(other._mpi_wait),
145#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
146#ifdef FEAT_MPI_THREAD_MULTIPLE
147 _mutex(),
148 _cv_allreduce_called(),
149 _flag_allreduce_called(other._flag_allreduce_called),
150 _thread(std::move(other._thread)),
151#else // no MPI_THREAD_MULTIPLE
152 _req(std::move(other._req)),
153#endif // MPI_THREAD_MULTIPLE
154#endif //(defined(FEAT_HAVE_MPI) && defined(FEAT_MPI_THREAD_MULTIPLE)) || defined(DOXYGEN)
155 _finished(other._finished)
156 {
157 #ifdef FEAT_MPI_THREAD_MULTIPLE
158 XASSERTM(other._mutex.try_lock(), "Other is still locked by other thread!");
159 XASSERTM(other._flag_allreduce_called, "Allreduce not called!");
160 #endif // FEAT_MPI_THREAD_MULTIPLE
161 other._finished = true;
162 }
163
166 {
167 if(this == &other)
168 return *this;
169
170 _r = other._r;
171 _x = other._x;
172 _sqrt = other._sqrt;
173 _mpi_exec = other._mpi_exec;
174 _mpi_wait = other._mpi_wait;
175#if defined(FEAT_HAVE_MPI) || defined(DOXYGEN)
176#ifdef FEAT_MPI_THREAD_MULTIPLE
177 XASSERTM(other._mutex.try_lock(), "Other is still locked by other thread!");
178 XASSERTM(other._flag_allreduce_called, "Allreduce not called!");
179 _flag_allreduce_called = other._flag_allreduce_called;
180 _thread = std::move(other._thread);
181#else // no MPI_THREAD_MULTIPLE
182 _req = std::move(other._req);
183#endif // MPI_THREAD_MULTIPLE
184#endif //(defined(FEAT_HAVE_MPI) && defined(FEAT_MPI_THREAD_MULTIPLE)) || defined(DOXYGEN)
185 _finished = other._finished;
186
187 other._finished = true;
188 return *this;
189 }
190
198 DT_ wait()
199 {
200 XASSERTM(!_finished, "ticket was already completed by a wait call");
201
202#ifdef FEAT_HAVE_MPI
203#ifdef FEAT_MPI_THREAD_MULTIPLE
204 _thread.join();
205 Statistics::add_time_mpi_execute_reduction(_mpi_exec);
206 Statistics::add_time_mpi_wait_reduction(_mpi_wait);
207#else // no FEAT_MPI_THREAD_MULTIPLE
208 TimeStamp ts_start;
209 _req.wait();
210 Statistics::add_time_mpi_wait_reduction(ts_start.elapsed_now());
211#endif // FEAT_MPI_THREAD_MULTIPLE
212#endif // FEAT_HAVE_MPI
213 _finished = true;
214 return (_sqrt ? Math::sqrt(_r) : _r);
215 }
216
219 {
221 }
222
223 private:
224#ifdef FEAT_HAVE_MPI
225 static void _wait_function(const DT_ & x, const Dist::Comm& comm, const Dist::Operation & op, DT_ & r, double & mpi_exec, double & mpi_wait, std::mutex & mutex,
226 std::condition_variable & cv_allreduce_called, bool & flag_allreduce_called)
227 {
228 TimeStamp ts_start;
229 std::unique_lock<std::mutex> l(mutex);
230 Dist::Request req = comm.iallreduce(&x, &r, std::size_t(1), op);
231 flag_allreduce_called = true;
232 l.unlock();
233 cv_allreduce_called.notify_all();
234 mpi_exec = ts_start.elapsed_now();
235 ts_start.stamp();
236 req.wait();
237 mpi_wait = ts_start.elapsed_now();
238 }
239#endif // FEAT_HAVE_MPI
240 }; // class SynchScalarTicket
241 } // namespace Global
242} // namespace FEAT
#define XASSERT(expr)
Assertion macro definition.
Definition: assertion.hpp:262
#define XASSERTM(expr, msg)
Assertion macro definition with custom message.
Definition: assertion.hpp:263
FEAT Kernel base header.
Communicator class.
Definition: dist.hpp:1349
Request iallreduce(const void *sendbuf, void *recvbuf, std::size_t count, const Datatype &datatype, const Operation &op) const
Nonblocking All-Reduce.
Definition: dist.cpp:660
Communication Request class.
Definition: dist.hpp:423
bool wait(Status &status)
Blocks until the request is fulfilled (or null).
Definition: dist.cpp:269
Ticket class for asynchronous global operations on scalars.
Definition: synch_scal.hpp:31
SynchScalarTicket()
standard constructor
Definition: synch_scal.hpp:63
SynchScalarTicket(DT_ x, const Dist::Comm &comm, const Dist::Operation &op, bool sqrt=false)
Constructor.
Definition: synch_scal.hpp:99
DT_ _r
buffer containing the received data
Definition: synch_scal.hpp:34
SynchScalarTicket(const SynchScalarTicket &)=delete
Unwanted copy constructor: Do not implement!
bool _finished
signals, whether wait was already called
Definition: synch_scal.hpp:59
SynchScalarTicket(SynchScalarTicket &&other)
move constructor
Definition: synch_scal.hpp:139
Dist::Request _req
Our request for the corresponding iallreduce mpi call.
Definition: synch_scal.hpp:55
double _mpi_wait
holds our mpi reduction wait toe
Definition: synch_scal.hpp:42
double _mpi_exec
holds our mpi execution toe
Definition: synch_scal.hpp:40
SynchScalarTicket & operator=(const SynchScalarTicket &)=delete
Unwanted copy assignment operator: Do not implement!
SynchScalarTicket & operator=(SynchScalarTicket &&other)
move-assign operator
Definition: synch_scal.hpp:165
bool _sqrt
should we compute the sqrt of the result
Definition: synch_scal.hpp:38
DT_ _x
buffer containing the send data
Definition: synch_scal.hpp:36
Time stamp class.
Definition: time_stamp.hpp:54
TimeStamp & stamp()
Stamps the current time-stamp.
Definition: time_stamp.hpp:79
double elapsed_now() const
Calculates the time elapsed between the time stamp and now.
Definition: time_stamp.hpp:121
T_ sqrt(T_ x)
Returns the square-root of a value.
Definition: math.hpp:300
FEAT namespace.
Definition: adjactor.hpp:12
Communication Operation class.
Definition: dist.hpp:237