FEAT 3
Finite Element Analysis Toolbox
Loading...
Searching...
No Matches
dist_file_io.cpp
1// FEAT3: Finite Element Analysis Toolbox, Version 3
2// Copyright (C) 2010 by Stefan Turek & the FEAT group
3// FEAT3 is released under the GNU General Public License version 3,
4// see the file 'copyright.txt' in the top level directory for details.
5
7#include <kernel/util/dist_file_io.hpp>
8
9#include <iostream>
10#include <fstream>
11
12namespace FEAT
13{
14 String DistFileIO::_rankname(const String& pattern, int rank)
15 {
16 // find the first asterisk in the filename
17 const std::size_t p = pattern.find_first_of('*');
18 XASSERTM(p != pattern.npos, "sequence filename template is missing rank wildcat pattern");
19
20 // find the first character after the pattern
21 const std::size_t q = pattern.find_first_not_of('*', p);
22
23 // compute wildcat pattern length
24 const std::size_t n = (q != pattern.npos ? q - p : pattern.size() - p);
25
26 // build the filename of our rank
27 String filename(pattern.substr(std::size_t(0), p));
28 filename += stringify(rank).pad_front(n, '0');
29 if(q != pattern.npos)
30 filename += pattern.substr(q);
31
32 // that's it
33 return filename;
34 }
35
36 void DistFileIO::_read_file(std::stringstream& stream, const String& filename)
37 {
38 // open input file
39 std::ifstream ifs(filename, std::ios_base::in);
40 if(!ifs.is_open() || !ifs.good())
41 throw FileNotFound(filename);
42
43 // read into our stream buffer
44 stream << ifs.rdbuf();
45
46 // close file
47 ifs.close();
48
49 // sanity check
50 XASSERT(stream.good());
51 }
52
53 void DistFileIO::_read_file(BinaryStream& stream, const String& filename)
54 {
55 // open input file
56 std::ifstream ifs(filename, std::ios_base::in|std::ios_base::binary);
57 if(!ifs.is_open() || !ifs.good())
58 throw FileNotFound(filename);
59
60 // read our input stream
61 stream.read_stream(ifs);
62
63 // close file
64 ifs.close();
65
66 // sanity check
67 XASSERT(stream.good());
68 }
69
70 void DistFileIO::_write_file(std::stringstream& stream, const String& filename, bool truncate)
71 {
72 // determine output mode
73 std::ios_base::openmode mode = std::ios_base::out;
74 if(truncate)
75 mode |= std::ios_base::trunc;
76
77 // open output file
78 std::ofstream ofs(filename, mode);
79 if(!ofs.is_open() || !ofs.good())
80 throw FileNotCreated(filename);
81
82 // write stream
83 ofs << stream.rdbuf();
84
85 // close file
86 ofs.close();
87 }
88
89 void DistFileIO::_write_file(BinaryStream& stream, const String& filename, bool truncate)
90 {
91 // determine output mode
92 std::ios_base::openmode mode = std::ios_base::out|std::ios_base::binary;
93 if(truncate)
94 mode |= std::ios_base::trunc;
95
96 // open output file
97 std::ofstream ofs(filename, mode);
98 if(!ofs.is_open() || !ofs.good())
99 throw FileNotCreated(filename);
100
101 // write buffer
102 stream.write_stream(ofs);
103
104 // close file
105 ofs.close();
106 }
107
108#ifdef FEAT_HAVE_MPI
109
110 void DistFileIO::read_common(std::stringstream& stream, const String& filename, const Dist::Comm& comm, int root_rank)
111 {
112 // root rank reads the file
113 if(comm.rank() == root_rank)
114 {
115 _read_file(stream, filename);
116 }
117
118 // broadcast
119 comm.bcast_stringstream(stream, root_rank);
120
121 // sanity check
122 XASSERT(stream.good());
123 }
124
125 void DistFileIO::read_common(BinaryStream& stream, const String& filename, const Dist::Comm& comm, int root_rank)
126 {
127 // root rank reads the file
128 if(comm.rank() == root_rank)
129 {
130 _read_file(stream, filename);
131 }
132
133 // broadcast
134 comm.bcast_binarystream(stream, root_rank);
135
136 // seek to beginning
137 stream.seekg(std::streamoff(0), std::ios_base::beg);
138
139 // sanity check
140 XASSERT(stream.good());
141 }
142
143 void DistFileIO::read_sequence(std::stringstream& stream, const String& pattern, const Dist::Comm& comm)
144 {
145 // retrieve our rank and nprocs
146 int rank = comm.rank();
147 int nprocs = comm.size();
148
149 // build our filename
150 String filename = _rankname(pattern, rank);
151
152 // round-robin we go
153 for(int i(0); i < nprocs; ++i)
154 {
155 // is it our turn?
156 if(i == rank)
157 {
158 _read_file(stream, filename);
159 }
160 comm.barrier();
161 }
162 }
163
164 void DistFileIO::read_sequence(BinaryStream& stream, const String& pattern, const Dist::Comm& comm)
165 {
166 // retrieve our rank and nprocs
167 int rank = comm.rank();
168 int nprocs = comm.size();
169
170 // build our filename
171 String filename = _rankname(pattern, rank);
172
173 // round-robin we go
174 for(int i(0); i < nprocs; ++i)
175 {
176 // is it our turn?
177 if(i == rank)
178 {
179 _read_file(stream, filename);
180 }
181 comm.barrier();
182 }
183 }
184
185 void DistFileIO::write_sequence(std::stringstream& stream, const String& pattern, const Dist::Comm& comm, bool truncate)
186 {
187 // retrieve our rank and nprocs
188 int rank = comm.rank();
189 int nprocs = comm.size();
190
191 // build our filename
192 String filename = _rankname(pattern, rank);
193
194 // round-robin we go
195 for(int i(0); i < nprocs; ++i)
196 {
197 // is it our turn?
198 if(i == rank)
199 {
200 _write_file(stream, filename, truncate);
201 }
202 comm.barrier();
203 }
204 }
205
206 void DistFileIO::write_sequence(BinaryStream& stream, const String& pattern, const Dist::Comm& comm, bool truncate)
207 {
208 // retrieve our rank and nprocs
209 int rank = comm.rank();
210 int nprocs = comm.size();
211
212 // build our filename
213 String filename = _rankname(pattern, rank);
214
215 // round-robin we go
216 for(int i(0); i < nprocs; ++i)
217 {
218 // is it our turn?
219 if(i == rank)
220 {
221 _write_file(stream, filename, truncate);
222 }
223 comm.barrier();
224 }
225 }
226
227 void DistFileIO::read_ordered(void* buffer, const std::size_t size, const String& filename, const Dist::Comm& comm)
228 {
229 XASSERT((buffer != nullptr) || (size == std::size_t(0)));
230
231 // open file
232 MPI_Status status;
233 MPI_File file = MPI_FILE_NULL;
234 MPI_File_open(comm.mpi_comm(), filename.c_str(), MPI_MODE_RDONLY, MPI_INFO_NULL, &file);
235 XASSERTM(file != MPI_FILE_NULL, "failed to open file via MPI_File_open");
236
237 // read buffer via collective
238 MPI_File_read_ordered(file, buffer, int(size), MPI_BYTE, &status);
239
240 // close file
241 MPI_File_close(&file);
242 }
243
244 void DistFileIO::write_ordered(const void* buffer, const std::size_t size, const String& filename, const Dist::Comm& comm, bool truncate)
245 {
246 XASSERT((buffer != nullptr) || (size == std::size_t(0)));
247
248 // select file access mode
249 const int amode = MPI_MODE_WRONLY | MPI_MODE_CREATE;
250
251 // open file
252 MPI_Status status;
253 MPI_File file = MPI_FILE_NULL;
254 MPI_File_open(comm.mpi_comm(), filename.c_str(), amode, MPI_INFO_NULL, &file);
255 XASSERTM(file != MPI_FILE_NULL, "failed to open file via MPI_File_open");
256
257 // truncate file?
258 if(truncate)
259 {
260 MPI_File_set_size(file, MPI_Offset(0));
261 }
262
263 // write buffer via collective
264 MPI_File_write_ordered(file, buffer, int(size), MPI_BYTE, &status);
265
266 // close file
267 MPI_File_close(&file);
268 }
269
270 void DistFileIO::read_combined(std::vector<char>& common, std::vector<char>& buffer,
271 const String& filename, const Dist::Comm& comm, int root_rank, bool bcast_common)
272 {
273 typedef std::uint64_t u64;
274
275 XASSERTM((root_rank >= 0) && (root_rank < comm.size()), "invalid root rank");
276
277 std::vector<char> header(std::size_t(32), 0);//, padding(std::size_t(64), 0);
278 u64* head_u64 = reinterpret_cast<u64*>(header.data());
279
280 // open file
281 MPI_Status status;
282 MPI_File file = MPI_FILE_NULL;
283 MPI_File_open(comm.mpi_comm(), filename.c_str(), MPI_MODE_RDONLY, MPI_INFO_NULL, &file);
284 XASSERTM(file != MPI_FILE_NULL, "failed to open file via MPI_File_open");
285
286 // read header on root rank
287 if(comm.rank() == root_rank)
288 {
289 // read header
290 MPI_File_read_shared(file, header.data(), int(header.size()), MPI_BYTE, &status);
291
292 // check file magic
293 String msg1 = String("input file is not a valid FEAT3 file: ") + filename;
294 XASSERTM(head_u64[0] == magic_combined, msg1.c_str());
295
296 // check number of processes
297 String msg2 = String("invalid number of processes: comm.size() is ") + stringify(comm.size()) +
298 " but file contains data for " + stringify(head_u64[2]) + " processes";
299 XASSERTM(head_u64[2] == u64(comm.size()), msg2.c_str());
300 }
301
302 // broadcast header from root
303 comm.bcast(header.data(), header.size(), root_rank);
304
305 // compute shared buffer padding size
306 const u64 common_size = head_u64[3];
307
308 // read buffer sizes
309 u64 buffer_size(0u);
310 MPI_File_read_ordered(file, &buffer_size, 8, MPI_BYTE, &status);
311
312 // read shared data if given
313 if(common_size > u64(0))
314 {
315 // allocate shared buffer
316 if(bcast_common || (comm.rank() == root_rank))
317 common.resize(common_size);
318
319 // read shared data on root rank
320 if(comm.rank() == root_rank)
321 {
322 // write shared data
323 MPI_File_read_shared(file, common.data(), int(common_size), MPI_BYTE, &status);
324 }
325
326 // broadcast shared buffer if desired
327 if(bcast_common)
328 comm.bcast(common.data(), common.size(), root_rank);
329 }
330
331 // allocate output buffer
332 buffer.resize(buffer_size, 0);
333
334 // read buffers via collective
335 MPI_File_read_ordered(file, buffer.data(), int(buffer_size), MPI_BYTE, &status);
336
337 // close file
338 MPI_File_close(&file);
339 }
340
341 void DistFileIO::write_combined(const std::vector<char>& common, const std::vector<char>& buffer,
342 const String& filename, const Dist::Comm& comm, int root_rank)
343 {
344 typedef std::uint64_t u64;
345
346 XASSERTM((root_rank >= 0) && (root_rank < comm.size()), "invalid root rank");
347
348 // set up file header
349 std::vector<char> header(std::size_t(32), 0);//, padding(std::size_t(64), 0);
350 u64* head_u64 = reinterpret_cast<u64*>(header.data());
351
352 // get buffer sizes
353 const u64 common_size = common.size();
354 const u64 buffer_size = buffer.size();
355
356 // set magic
357 head_u64[0] = magic_combined;
358
359 // gather combined file size at rank 0
360 u64 file_size = 0u;
361 comm.allreduce(&buffer_size, &file_size, std::size_t(1), Dist::op_sum);
362 file_size += common_size; // shared buffer size
363 file_size += u64(comm.size())*u64(8); // buffer size for each rank
364 file_size += u64(header.size()); // header size
365 head_u64[1] = file_size;
366
367 // number of processes
368 head_u64[2] = u64(comm.size());
369
370 // common buffer size without padding
371 head_u64[3] = common_size;
372
373 // select file access mode
374 const int amode = MPI_MODE_WRONLY | MPI_MODE_CREATE;
375
376 // open file
377 MPI_Status status;
378 MPI_File file = MPI_FILE_NULL;
379 MPI_File_open(comm.mpi_comm(), filename.c_str(), amode, MPI_INFO_NULL, &file);
380 XASSERTM(file != MPI_FILE_NULL, "failed to open file via MPI_File_open");
381
382 // broadcast file size
383 comm.bcast(&file_size, std::size_t(1), root_rank);
384
385 // set final file size
386 MPI_File_set_size(file, MPI_Offset(file_size));
387
388 // write header on root rank
389 if(comm.rank() == root_rank)
390 {
391 // write header
392 MPI_File_write_shared(file, header.data(), int(header.size()), MPI_BYTE, &status);
393 }
394
395 // write buffer sizes via collective
396 MPI_File_write_ordered(file, &buffer_size, 8, MPI_BYTE, &status);
397
398 // write shared data if given
399 if(!common.empty() && (comm.rank() == root_rank))
400 {
401 // write shared data
402 MPI_File_write_shared(file, common.data(), int(common_size), MPI_BYTE, &status);
403 }
404
405 // write buffers via collective
406 MPI_File_write_ordered(file, buffer.data(), int(buffer_size), MPI_BYTE, &status);
407
408 // close file
409 MPI_File_close(&file);
410 }
411
412#else // non-MPI implementation
413
414 void DistFileIO::read_common(std::stringstream& stream, const String& filename, const Dist::Comm&, int)
415 {
416 _read_file(stream, filename);
417 }
418
419 void DistFileIO::read_common(BinaryStream& stream, const String& filename, const Dist::Comm&, int)
420 {
421 _read_file(stream, filename);
422 }
423
424 void DistFileIO::read_sequence(std::stringstream& stream, const String& pattern, const Dist::Comm&)
425 {
426 _read_file(stream, _rankname(pattern, Index(0)));
427 }
428
429 void DistFileIO::read_sequence(BinaryStream& stream, const String& pattern, const Dist::Comm&)
430 {
431 _read_file(stream, _rankname(pattern, Index(0)));
432 }
433
434 void DistFileIO::write_sequence(std::stringstream& stream, const String& pattern, const Dist::Comm&, bool truncate)
435 {
436 _write_file(stream, _rankname(pattern, Index(0)), truncate);
437 }
438
439 void DistFileIO::write_sequence(BinaryStream& stream, const String& pattern, const Dist::Comm&, bool truncate)
440 {
441 _write_file(stream, _rankname(pattern, Index(0)), truncate);
442 }
443
444 void DistFileIO::read_ordered(void* buffer, const std::size_t size, const String& filename, const Dist::Comm&)
445 {
446 XASSERT((buffer != nullptr) || (size == std::size_t(0)));
447
448 // determine output mode
449 std::ios_base::openmode mode = std::ios_base::in|std::ios_base::binary;
450
451 // open output file
452 std::ifstream ifs(filename, mode);
453 if(!ifs.is_open() || !ifs.good())
454 throw FileNotFound(filename);
455
456 // read buffer
457 if(size > std::size_t(0))
458 {
459 ifs.read(reinterpret_cast<char*>(buffer), std::streamsize(size));
460 }
461
462 // close stream
463 ifs.close();
464 }
465
466 void DistFileIO::write_ordered(const void* buffer, const std::size_t size, const String& filename, const Dist::Comm&, bool truncate)
467 {
468 XASSERT((buffer != nullptr) || (size == std::size_t(0)));
469
470 // determine output mode
471 std::ios_base::openmode mode = std::ios_base::out|std::ios_base::binary;
472 if(truncate)
473 mode |= std::ios_base::trunc;
474
475 // open output file
476 std::ofstream ofs(filename, mode);
477 if(!ofs.is_open() || !ofs.good())
478 throw FileNotCreated(filename);
479
480 // write buffer
481 if(size > std::size_t(0))
482 {
483 ofs.write(reinterpret_cast<const char*>(buffer), std::streamsize(size));
484 }
485
486 // close stream
487 ofs.close();
488 }
489
490 void DistFileIO::read_combined(std::vector<char>& shared, std::vector<char>& buffer,
491 const String& filename, const Dist::Comm&, int, bool)
492 {
493 typedef std::uint64_t u64;
494
495 std::vector<char> header(std::size_t(40), 0);
496 u64* head_u64 = reinterpret_cast<u64*>(header.data());
497
498 // determine output mode
499 std::ios_base::openmode mode = std::ios_base::in|std::ios_base::binary;
500
501 // open output file
502 std::ifstream ifs(filename, mode);
503 if(!ifs.is_open() || !ifs.good())
504 throw FileNotFound(filename);
505
506 // read header
507 ifs.read(header.data(), std::streamsize(header.size()));
508
509 // check file magic
510 String msg1 = String("input file is not a valid FEAT3 file: ") + filename;
511 XASSERTM(head_u64[0] == magic_combined, msg1.c_str());
512
513 // check number of processes
514 String msg2 = String("invalid number of processes: serial program but file contains data for ") +
515 stringify(head_u64[2]) + " processes";
516 XASSERTM(head_u64[2] == u64(1), msg2.c_str());
517
518 // get shared buffer size
519 const u64 shared_size = head_u64[3];
520 const u64 buffer_size = head_u64[4];
521
522 // read shared data if given
523 if(shared_size > u64(0))
524 {
525 // allocate and read shared buffer
526 shared.resize(shared_size);
527 ifs.read(shared.data(), std::streamsize(shared.size()));
528 }
529
530 // read buffer data
531 if(buffer_size > u64(0))
532 {
533 buffer.resize(buffer_size);
534 ifs.read(buffer.data(), std::streamsize(buffer.size()));
535 }
536
537 // close stream
538 ifs.close();
539 }
540
541 void DistFileIO::write_combined(const std::vector<char>& shared, const std::vector<char>& buffer,
542 const String& filename, const Dist::Comm&, int)
543 {
544 typedef std::uint64_t u64;
545
546 // determine output mode
547 std::ios_base::openmode mode = std::ios_base::out|std::ios_base::binary|std::ios_base::trunc;
548
549 // open output file
550 std::ofstream ofs(filename, mode);
551 if(!ofs.is_open() || !ofs.good())
552 throw FileNotCreated(filename);
553
554 // set up file header
555 std::vector<char> header(std::size_t(40), 0);
556 u64* head_u64 = reinterpret_cast<u64*>(header.data());
557
558 // get buffer sizes
559 const u64 shared_size = shared.size();
560 const u64 buffer_size = buffer.size();
561
562 // set magic
563 head_u64[0] = magic_combined;
564
565 // gather combined file size at rank 0
566 head_u64[1] = u64(40) + buffer_size + shared_size;
567
568 // number of processes
569 head_u64[2] = u64(1);
570
571 // shared buffer size without padding
572 head_u64[3] = shared_size;
573
574 // actually the buffer size for rank 0
575 head_u64[4] = buffer_size;
576
577 // write header
578 ofs.write(header.data(), std::streamsize(header.size()));
579
580 // write shared data
581 if(!shared.empty())
582 ofs.write(shared.data(), std::streamsize(shared.size()));
583
584 // write process data
585 if(!buffer.empty())
586 ofs.write(buffer.data(), std::streamsize(buffer.size()));
587
588 // close stream
589 ofs.close();
590 }
591#endif // FEAT_HAVE_MPI
592
593} // namespace FEAT
#define XASSERT(expr)
Assertion macro definition.
Definition: assertion.hpp:262
#define XASSERTM(expr, msg)
Assertion macro definition with custom message.
Definition: assertion.hpp:263
Binary Stream class.
void read_stream(std::istream &is)
Reads the content of a binary input stream.
void write_stream(std::ostream &os) const
Writes the content to a binary output stream.
Communicator class.
Definition: dist.hpp:1349
int rank() const
Returns the rank of this process in this communicator.
Definition: dist.hpp:1494
void bcast_stringstream(std::stringstream &stream, int root=0) const
Blocking broadcast of a std::stringstream.
Definition: dist.cpp:723
static void write_sequence(std::stringstream &stream, const String &pattern, const Dist::Comm &comm, bool truncate=true)
Writes a rank-indexed text file sequence.
static constexpr std::uint64_t magic_combined
Magic number for combined distributed files as used by read_combined() and write_combined()
static void _write_file(std::stringstream &stream, const String &filename, bool truncate)
auxiliary function: write a string stream to a file
static void write_ordered(const void *buffer, const std::size_t size, const String &filename, const Dist::Comm &comm, bool truncate=true)
Writes a buffer into a common binary file in rank order.
static void read_common(std::stringstream &stream, const String &filename, const Dist::Comm &comm, int root_rank=0)
Reads a common text file for all ranks.
static void read_ordered(void *buffer, const std::size_t size, const String &filename, const Dist::Comm &comm)
Reads a buffer from a common binary file in rank order.
static void write_combined(const std::vector< char > &common, const std::vector< char > &buffer, const String &filename, const Dist::Comm &comm, int root_rank=0)
Writes a combined shared/ordered binary file.
static void read_combined(std::vector< char > &common, std::vector< char > &buffer, const String &filename, const Dist::Comm &comm, int root_rank=0, bool bcast_common=true)
Reads a combined shared/ordered binary file as written by the write_combined function.
static String _rankname(const String &pattern, int rank)
auxiliary function: build a rank filename from a pattern
static void _read_file(std::stringstream &stream, const String &filename)
auxiliary function: read a file into a string stream
static void read_sequence(std::stringstream &stream, const String &pattern, const Dist::Comm &comm)
Reads a rank-indexed text file sequence.
File-Not-Created exception.
Definition: exception.hpp:223
File-Not-Found exception.
Definition: exception.hpp:198
String class implementation.
Definition: string.hpp:46
String pad_front(size_type len, char c=' ') const
Pads the front of the string up to a desired length.
Definition: string.hpp:392
const Operation op_sum(MPI_SUM)
Operation wrapper for MPI_SUM.
Definition: dist.hpp:271
std::uint64_t u64
unsigned 64-bit integer type
Definition: voxel_map.cpp:21
FEAT namespace.
Definition: adjactor.hpp:12
String stringify(const T_ &item)
Converts an item into a String.
Definition: string.hpp:944
std::uint64_t Index
Index data type.