7#include <kernel/util/dist_file_io.hpp>
17 const std::size_t p = pattern.find_first_of(
'*');
18 XASSERTM(p != pattern.npos,
"sequence filename template is missing rank wildcat pattern");
21 const std::size_t q = pattern.find_first_not_of(
'*', p);
24 const std::size_t n = (q != pattern.npos ? q - p : pattern.size() - p);
27 String filename(pattern.substr(std::size_t(0), p));
30 filename += pattern.substr(q);
39 std::ifstream ifs(filename, std::ios_base::in);
40 if(!ifs.is_open() || !ifs.good())
44 stream << ifs.rdbuf();
56 std::ifstream ifs(filename, std::ios_base::in|std::ios_base::binary);
57 if(!ifs.is_open() || !ifs.good())
73 std::ios_base::openmode mode = std::ios_base::out;
75 mode |= std::ios_base::trunc;
78 std::ofstream ofs(filename, mode);
79 if(!ofs.is_open() || !ofs.good())
83 ofs << stream.rdbuf();
92 std::ios_base::openmode mode = std::ios_base::out|std::ios_base::binary;
94 mode |= std::ios_base::trunc;
97 std::ofstream ofs(filename, mode);
98 if(!ofs.is_open() || !ofs.good())
113 if(comm.
rank() == root_rank)
125 void DistFileIO::read_common(BinaryStream& stream,
const String& filename,
const Dist::Comm& comm,
int root_rank)
128 if(comm.rank() == root_rank)
134 comm.bcast_binarystream(stream, root_rank);
137 stream.seekg(std::streamoff(0), std::ios_base::beg);
146 int rank = comm.rank();
147 int nprocs = comm.size();
150 String filename =
_rankname(pattern, rank);
153 for(
int i(0); i < nprocs; ++i)
167 int rank = comm.rank();
168 int nprocs = comm.size();
171 String filename =
_rankname(pattern, rank);
174 for(
int i(0); i < nprocs; ++i)
188 int rank = comm.rank();
189 int nprocs = comm.size();
192 String filename =
_rankname(pattern, rank);
195 for(
int i(0); i < nprocs; ++i)
209 int rank = comm.rank();
210 int nprocs = comm.size();
213 String filename =
_rankname(pattern, rank);
216 for(
int i(0); i < nprocs; ++i)
229 XASSERT((buffer !=
nullptr) || (size == std::size_t(0)));
233 MPI_File file = MPI_FILE_NULL;
234 MPI_File_open(comm.mpi_comm(), filename.c_str(), MPI_MODE_RDONLY, MPI_INFO_NULL, &file);
235 XASSERTM(file != MPI_FILE_NULL,
"failed to open file via MPI_File_open");
238 MPI_File_read_ordered(file, buffer,
int(size), MPI_BYTE, &status);
241 MPI_File_close(&file);
244 void DistFileIO::write_ordered(
const void* buffer,
const std::size_t size,
const String& filename,
const Dist::Comm& comm,
bool truncate)
246 XASSERT((buffer !=
nullptr) || (size == std::size_t(0)));
249 const int amode = MPI_MODE_WRONLY | MPI_MODE_CREATE;
253 MPI_File file = MPI_FILE_NULL;
254 MPI_File_open(comm.mpi_comm(), filename.c_str(), amode, MPI_INFO_NULL, &file);
255 XASSERTM(file != MPI_FILE_NULL,
"failed to open file via MPI_File_open");
260 MPI_File_set_size(file, MPI_Offset(0));
264 MPI_File_write_ordered(file, buffer,
int(size), MPI_BYTE, &status);
267 MPI_File_close(&file);
271 const String& filename,
const Dist::Comm& comm,
int root_rank,
bool bcast_common)
273 typedef std::uint64_t
u64;
275 XASSERTM((root_rank >= 0) && (root_rank < comm.size()),
"invalid root rank");
277 std::vector<char> header(std::size_t(32), 0);
278 u64* head_u64 =
reinterpret_cast<u64*
>(header.data());
282 MPI_File file = MPI_FILE_NULL;
283 MPI_File_open(comm.mpi_comm(), filename.c_str(), MPI_MODE_RDONLY, MPI_INFO_NULL, &file);
284 XASSERTM(file != MPI_FILE_NULL,
"failed to open file via MPI_File_open");
287 if(comm.rank() == root_rank)
290 MPI_File_read_shared(file, header.data(),
int(header.size()), MPI_BYTE, &status);
293 String msg1 = String(
"input file is not a valid FEAT3 file: ") + filename;
297 String msg2 = String(
"invalid number of processes: comm.size() is ") +
stringify(comm.size()) +
298 " but file contains data for " +
stringify(head_u64[2]) +
" processes";
299 XASSERTM(head_u64[2] ==
u64(comm.size()), msg2.c_str());
303 comm.bcast(header.data(), header.size(), root_rank);
306 const u64 common_size = head_u64[3];
310 MPI_File_read_ordered(file, &buffer_size, 8, MPI_BYTE, &status);
313 if(common_size >
u64(0))
316 if(bcast_common || (comm.rank() == root_rank))
317 common.resize(common_size);
320 if(comm.rank() == root_rank)
323 MPI_File_read_shared(file, common.data(),
int(common_size), MPI_BYTE, &status);
328 comm.bcast(common.data(), common.size(), root_rank);
332 buffer.resize(buffer_size, 0);
335 MPI_File_read_ordered(file, buffer.data(),
int(buffer_size), MPI_BYTE, &status);
338 MPI_File_close(&file);
342 const String& filename,
const Dist::Comm& comm,
int root_rank)
344 typedef std::uint64_t
u64;
346 XASSERTM((root_rank >= 0) && (root_rank < comm.size()),
"invalid root rank");
349 std::vector<char> header(std::size_t(32), 0);
350 u64* head_u64 =
reinterpret_cast<u64*
>(header.data());
353 const u64 common_size = common.size();
354 const u64 buffer_size = buffer.size();
361 comm.allreduce(&buffer_size, &file_size, std::size_t(1),
Dist::op_sum);
362 file_size += common_size;
363 file_size +=
u64(comm.size())*
u64(8);
364 file_size +=
u64(header.size());
365 head_u64[1] = file_size;
368 head_u64[2] =
u64(comm.size());
371 head_u64[3] = common_size;
374 const int amode = MPI_MODE_WRONLY | MPI_MODE_CREATE;
378 MPI_File file = MPI_FILE_NULL;
379 MPI_File_open(comm.mpi_comm(), filename.c_str(), amode, MPI_INFO_NULL, &file);
380 XASSERTM(file != MPI_FILE_NULL,
"failed to open file via MPI_File_open");
383 comm.bcast(&file_size, std::size_t(1), root_rank);
386 MPI_File_set_size(file, MPI_Offset(file_size));
389 if(comm.rank() == root_rank)
392 MPI_File_write_shared(file, header.data(),
int(header.size()), MPI_BYTE, &status);
396 MPI_File_write_ordered(file, &buffer_size, 8, MPI_BYTE, &status);
399 if(!common.empty() && (comm.rank() == root_rank))
402 MPI_File_write_shared(file, common.data(),
int(common_size), MPI_BYTE, &status);
406 MPI_File_write_ordered(file, buffer.data(),
int(buffer_size), MPI_BYTE, &status);
409 MPI_File_close(&file);
446 XASSERT((buffer !=
nullptr) || (size == std::size_t(0)));
449 std::ios_base::openmode mode = std::ios_base::in|std::ios_base::binary;
452 std::ifstream ifs(filename, mode);
453 if(!ifs.is_open() || !ifs.good())
457 if(size > std::size_t(0))
459 ifs.read(
reinterpret_cast<char*
>(buffer), std::streamsize(size));
468 XASSERT((buffer !=
nullptr) || (size == std::size_t(0)));
471 std::ios_base::openmode mode = std::ios_base::out|std::ios_base::binary;
473 mode |= std::ios_base::trunc;
476 std::ofstream ofs(filename, mode);
477 if(!ofs.is_open() || !ofs.good())
481 if(size > std::size_t(0))
483 ofs.write(
reinterpret_cast<const char*
>(buffer), std::streamsize(size));
493 typedef std::uint64_t u64;
495 std::vector<char> header(std::size_t(40), 0);
496 u64* head_u64 =
reinterpret_cast<u64*
>(header.data());
499 std::ios_base::openmode mode = std::ios_base::in|std::ios_base::binary;
502 std::ifstream ifs(filename, mode);
503 if(!ifs.is_open() || !ifs.good())
507 ifs.read(header.data(), std::streamsize(header.size()));
510 String msg1 =
String(
"input file is not a valid FEAT3 file: ") + filename;
514 String msg2 =
String(
"invalid number of processes: serial program but file contains data for ") +
516 XASSERTM(head_u64[2] == u64(1), msg2.c_str());
519 const u64 shared_size = head_u64[3];
520 const u64 buffer_size = head_u64[4];
523 if(shared_size > u64(0))
526 shared.resize(shared_size);
527 ifs.read(shared.data(), std::streamsize(shared.size()));
531 if(buffer_size > u64(0))
533 buffer.resize(buffer_size);
534 ifs.read(buffer.data(), std::streamsize(buffer.size()));
544 typedef std::uint64_t u64;
547 std::ios_base::openmode mode = std::ios_base::out|std::ios_base::binary|std::ios_base::trunc;
550 std::ofstream ofs(filename, mode);
551 if(!ofs.is_open() || !ofs.good())
555 std::vector<char> header(std::size_t(40), 0);
556 u64* head_u64 =
reinterpret_cast<u64*
>(header.data());
559 const u64 shared_size = shared.size();
560 const u64 buffer_size = buffer.size();
566 head_u64[1] = u64(40) + buffer_size + shared_size;
569 head_u64[2] = u64(1);
572 head_u64[3] = shared_size;
575 head_u64[4] = buffer_size;
578 ofs.write(header.data(), std::streamsize(header.size()));
582 ofs.write(shared.data(), std::streamsize(shared.size()));
586 ofs.write(buffer.data(), std::streamsize(buffer.size()));
#define XASSERT(expr)
Assertion macro definition.
#define XASSERTM(expr, msg)
Assertion macro definition with custom message.
void read_stream(std::istream &is)
Reads the content of a binary input stream.
void write_stream(std::ostream &os) const
Writes the content to a binary output stream.
int rank() const
Returns the rank of this process in this communicator.
void bcast_stringstream(std::stringstream &stream, int root=0) const
Blocking broadcast of a std::stringstream.
static void write_sequence(std::stringstream &stream, const String &pattern, const Dist::Comm &comm, bool truncate=true)
Writes a rank-indexed text file sequence.
static constexpr std::uint64_t magic_combined
Magic number for combined distributed files as used by read_combined() and write_combined()
static void _write_file(std::stringstream &stream, const String &filename, bool truncate)
auxiliary function: write a string stream to a file
static void write_ordered(const void *buffer, const std::size_t size, const String &filename, const Dist::Comm &comm, bool truncate=true)
Writes a buffer into a common binary file in rank order.
static void read_common(std::stringstream &stream, const String &filename, const Dist::Comm &comm, int root_rank=0)
Reads a common text file for all ranks.
static void read_ordered(void *buffer, const std::size_t size, const String &filename, const Dist::Comm &comm)
Reads a buffer from a common binary file in rank order.
static void write_combined(const std::vector< char > &common, const std::vector< char > &buffer, const String &filename, const Dist::Comm &comm, int root_rank=0)
Writes a combined shared/ordered binary file.
static void read_combined(std::vector< char > &common, std::vector< char > &buffer, const String &filename, const Dist::Comm &comm, int root_rank=0, bool bcast_common=true)
Reads a combined shared/ordered binary file as written by the write_combined function.
static String _rankname(const String &pattern, int rank)
auxiliary function: build a rank filename from a pattern
static void _read_file(std::stringstream &stream, const String &filename)
auxiliary function: read a file into a string stream
static void read_sequence(std::stringstream &stream, const String &pattern, const Dist::Comm &comm)
Reads a rank-indexed text file sequence.
File-Not-Created exception.
File-Not-Found exception.
String class implementation.
String pad_front(size_type len, char c=' ') const
Pads the front of the string up to a desired length.
const Operation op_sum(MPI_SUM)
Operation wrapper for MPI_SUM.
std::uint64_t u64
unsigned 64-bit integer type
String stringify(const T_ &item)
Converts an item into a String.
std::uint64_t Index
Index data type.