Rosetta
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
mpistream.hh
Go to the documentation of this file.
1 // -*- mode:c++;tab-width:2;indent-tabs-mode:t;show-trailing-whitespace:t;rm-trailing-spaces:t -*-
2 // vi: set ts=2 noet:
3 //
4 // (c) Copyright Rosetta Commons Member Institutions.
5 // (c) This file is part of the Rosetta software suite and is made available under license.
6 // (c) The Rosetta software is developed by the contributing members of the Rosetta Commons.
7 // (c) For more information, see http://www.rosettacommons.org. Questions about this can be
8 // (c) addressed to University of Washington UW TechTransfer, email: license@u.washington.edu.
9 
10 /// @file utility/io/mpistream.hpp
11 /// @detail this is a stream class to communicate with MpiFileBuffer which should be in his .run() loop on the node given by master_rank_
12 /// all file-opening, file-closing and writing is rerouted via MPI to the dedicated FileBuf process
13 /// this source is copy-pasted from zipstream.hpp...
14 
15 
16 #ifndef INCLUDED_utility_io_mpistream_hh
17 #define INCLUDED_utility_io_mpistream_hh
18 
19 #ifdef USEMPI
20 #include <mpi.h>
21 #endif
22 
23 // C++ headers
24 #include <algorithm>
25 #include <iostream>
26 #include <vector>
27 #include <sstream>
28 #include <ios>
29 
30 namespace utility {
31 namespace io {
32 namespace mpi_stream {
33 
34 
35 /// @brief Default gzip buffer size, change this to suite your needs
36 const std::size_t default_buffer_size = 921600; // Was 102400; Was 4096;
37 const int MPI_STREAM_TAG = 42; //should be a unique number...
38 
39 /// messages to send to MpiFileBuffer
47 };
48 
49 ///reported file status after opening
51  MPI_SUCCESS_NEW = 1, //file didn't exist and was opened
52  MPI_SUCCESS_APPEND,//file existed and will be appended
53  MPI_FAIL //file didn't exist
54 };
55 /// @brief A stream decorator that takes raw input and zips it to a ostream.
56 /// @note The class wraps up the inflate method of the zlib library 1.1.4 http://www.gzip.org/zlib/
57 template<
58 typename Elem,
59 typename Tr = std::char_traits< Elem >,
60 typename ElemA = std::allocator< Elem >,
61 typename ByteT = unsigned char,
62 typename ByteAT = std::allocator< ByteT >
63 >
65  public std::basic_streambuf< Elem, Tr >
66 {
67 
68 public:
69 
70  typedef std::basic_streambuf< Elem, Tr > basic_streambuf_type;
71  typedef std::basic_ostream< Elem, Tr > & ostream_reference;
72  typedef Elem char_type;
73  typedef ElemA char_allocator_type;
74  typedef ByteT byte_type;
75  typedef ByteAT byte_allocator_type;
77  typedef std::vector< byte_type, byte_allocator_type > byte_vector_type;
78  typedef std::vector< char_type, char_allocator_type > char_vector_type;
79  typedef Tr traits_type;
80  typedef typename Tr::int_type int_type;
81 
82  using basic_streambuf_type::epptr;
83  using basic_streambuf_type::pbase;
84  using basic_streambuf_type::pptr;
85 
86  /// @brief Construct a mpi stream
87  /// @note More info on the following parameters can be found in the zlib documentation
89  std::string filename,
90  std::size_t buffer_size_,
91  int master_rank_,
92  bool append
93  );
94 
95  virtual ~basic_mpi_streambuf();
96 
97  int sync();
99 
100  /// @brief flushes the mpi buffer and output buffer
101  virtual std::streamsize flush() {
102  return flush( false );
103  }
104 
105 
106  virtual std::streamsize flush_final() {
107  return flush( true );
108  }
109 
110  /// @brief resets the mpi stream and zeros the crc
111  /// @details This method should be called after flush_finalize()
112  /// @deatils to allow future writes
113  void reset_state();
114  int file_status() const { return file_status_; }
115  //void release_file();
116  void print_header( std::string const& );
117 private:
118  virtual std::streamsize flush( bool final );
119  bool send_to_master( char_type*, std::streamsize );
120  std::size_t fill_input_buffer();
121 
123 
125  int my_rank_;
128  //std::string filename_;
129 }; // basic_mpi_streambuf
130 
131 
132 /// @brief Base class for mpi ostreams
133 /// @note Contains a basic_mpi_streambuf
134 template<
135 typename Elem,
136 typename Tr = std::char_traits< Elem >,
137 typename ElemA = std::allocator< Elem >,
138 typename ByteT = unsigned char,
139 typename ByteAT = std::allocator< ByteT >
140 >
142  virtual public std::basic_ios< Elem, Tr >
143 {
144 
145 public:
146 
147  typedef std::basic_ostream<Elem, Tr> & ostream_reference;
148  typedef basic_mpi_streambuf<
149  Elem,
150  Tr,
151  ElemA,
152  ByteT,
153  ByteAT
155 
156  /// @brief Construct a mpi stream
157  /// @note More info on the following parameters can be found in the zlib documentation.
159  std::string filename,
160  size_t buffer_size_,
161  int master_rank,
162  bool append
163  ) :
164  m_buf( filename, buffer_size_, master_rank, append )
165  {
166  this->init( &m_buf );
167  }
168 
169  /// @brief returns the underlying mpi ostream object
170  mpi_streambuf_type * rdbuf() { return &m_buf; }
171  int file_status() const { return m_buf.file_status(); };
172  void release_file() {
173  m_buf.release_file();
174  }
175  void print_header( std::string const& header ) {
176  m_buf.print_header( header );
177  }
178 private:
179 
181 
182 }; // basic_mpi_ostreambase
183 
184 // basic_mpi_istreambase
185 
186 
187 /// @brief A mpiper ostream
188 ///
189 /// @remarks
190 ///
191 /// This class is a ostream decorator that behaves 'almost' like any other ostream.
192 ///
193 /// At construction, it takes any ostream that shall be used to output of the compressed data.
194 ///
195 /// When finished, you need to call the special method zflush or call the destructor
196 /// to flush all the intermidiate streams.
197 ///
198 /// Example:
199 /// \code
200 /// // creating the target mpi string, could be a fstream
201 /// ostringstream ostringstream_;
202 /// // creating the mpi layer
203 /// mpi_ostream mpiper(ostringstream_);
204 ///
205 ///
206 /// // writing data
207 /// mpiper<<f<<" "<<d<<" "<<ui<<" "<<ul<<" "<<us<<" "<<c<<" "<<dum;
208 /// // mpi ostream needs special flushing...
209 /// mpiper.zflush();
210 /// \endcode
211 template<
212 typename Elem,
213 typename Tr = std::char_traits< Elem >,
214 typename ElemA = std::allocator< Elem >,
215 typename ByteT = unsigned char,
216 typename ByteAT = std::allocator< ByteT >
217 >
219  public basic_mpi_ostreambase< Elem, Tr, ElemA, ByteT, ByteAT >,
220  public std::basic_ostream< Elem, Tr >
221 {
222 
223 public:
224 
226  typedef std::basic_ostream< Elem, Tr > ostream_type;
227  typedef std::basic_ostream< Elem, Tr > & ostream_reference;
228  typedef Elem char_type;
229 
230  using ostream_type::flush;
232 
233  /// @brief Constructs a mpiper ostream decorator
234  ///
235  /// @param ostream_ ostream where the compressed output is written
236  /// @param is_gmpi_ true if gmpi header and footer have to be added
237  /// @param level_ level of compression 0, bad and fast, 9, good and slower,
238  /// @param strategy_ compression strategy
239  /// @param window_size_ see zlib doc
240  /// @param memory_level_ see zlib doc
241  /// @param buffer_size_ the buffer size used to mpi data
242  ///
243  /// @note When is_gmpi_ is true, a gmpi header and footer is automatically added
245  std::string filename, // int open_mode = std::ios::out,
246  int master_rank,
247  std::stringstream& header,
248  bool append = false,
249  std::size_t buffer_size_ = default_buffer_size
250  ) :
252  filename,
253  buffer_size_,
254  master_rank,
255  append
256  ),
257  ostream_type( rdbuf() ),
258  m_mpi_stream_finalized( false )
259  {
261  //print_header();
262  // (*this) << header.str();
263  // mpi_ostreambase_type::release_file();
264  mpi_ostreambase_type::print_header( header.str() );
265  } else if ( mpi_ostreambase_type::file_status() == MPI_FAIL ) {
266  // Set failbit so failure can be detected
267  ostream_type::setstate( std::ios_base::failbit );
268  }
269  };
270 
271  void close() {
272  };
273 
275  {
276  // adding a footer is not necessary here, as it will be
277  // taken care of during the last zflush_finalize()
278  // called by the higher level close() routines
279  }
280 
281  /// @brief stream output
282  /// @details if mpi stream has been finalized, will reset
283  /// @details the stream and add header if necessary
284  template< typename T >
285  inline
287  operator <<( T const & t )
288  {
289  static_cast< std::ostream & >( *this ) << t;
290  return *this;
291  }
292 
293  /// @brief write char
294  /// @details if mpi stream has been finalized, will reset
295  /// @details the stream and add header if necessary
296  inline
298  put( char const c )
299  {
300  static_cast< std::ostream & >( *this ).put( c );
301  return *this;
302  }
303 
304  /// @brief write a string
305  /// @details if mpi stream has been finalized, will reset
306  /// @details the stream and add header if necessary
307  inline
309  write( char const * str, std::streamsize const count )
310  {
311  static_cast< std::ostream & >( *this ).write( str, count );
312  return *this;
313  }
314 
315  inline
317  flush() {
318  ostream_type::flush();
319  // std::basic_ostream::flush() only calls rdbuf()->sync() not rdbuf()->flush()
320  // therefore, include explicit call below to trigger MpiFileBuffer::flush_channel()
321  rdbuf()->flush();
322  return *this;
323  }
324 
325 private:
326 
327 
328  static void put_long_as_uint32( ostream_reference out_, unsigned long x_ );
329 
330  /// @brief tracks to see if mpi stream was finalized
331  /// @details set to true during zflush_finalize()
332  /// @details set to false during reset_state()
334 
335 }; // basic_mpi_ostream
336 
337 
338 // Types
341 
342 
343 } // mpistream
344 } //io
345 } //utility
346 
347 // Implementation [adding extra space before #include so PyRosetta skipp it...]
348 #include <utility/io/mpistream.ipp>
349 
350 
351 #endif // INCLUDED_utility_io_mpistream_HPP
352 
std::vector< char_type, char_allocator_type > char_vector_type
Definition: mpistream.hh:78
basic_mpi_ostream & operator<<(T const &t)
stream output
Definition: mpistream.hh:287
const int MPI_STREAM_TAG
Definition: mpistream.hh:37
basic_mpi_ostreambase< Elem, Tr, ElemA, ByteT, ByteAT > mpi_ostreambase_type
Definition: mpistream.hh:225
int_type overflow(int_type c)
OVERFLOW.
Definition: mpistream.ipp:131
std::basic_ostream< Elem, Tr > ostream_type
Definition: mpistream.hh:226
MPI_STREAM_MSG
messages to send to MpiFileBuffer
Definition: mpistream.hh:40
std::basic_streambuf< Elem, Tr > basic_streambuf_type
Definition: mpistream.hh:70
basic_mpi_ostream & write(char const *str, std::streamsize const count)
write a string
Definition: mpistream.hh:309
bool send_to_master(char_type *, std::streamsize)
+++ SEND_TO_MASTER ++++
Definition: mpistream.ipp:158
void print_header(std::string const &)
Definition: mpistream.ipp:75
std::vector< byte_type, byte_allocator_type > byte_vector_type
Definition: mpistream.hh:77
static void put_long_as_uint32(ostream_reference out_, unsigned long x_)
Definition: mpistream.ipp:273
std::basic_ostream< Elem, Tr > & ostream_reference
Definition: mpistream.hh:227
basic_mpi_ostream< char > mpi_ostream
Definition: mpistream.hh:339
void reset_state()
resets the mpi stream and zeros the crc
Definition: mpistream.ipp:259
basic_mpi_ostream(std::string filename, int master_rank, std::stringstream &header, bool append=false, std::size_t buffer_size_=default_buffer_size)
Constructs a mpiper ostream decorator.
Definition: mpistream.hh:244
std::string filename(const std::string &path)
Definition: string_util.cc:373
basic_mpi_streambuf< Elem, Tr, ElemA, ByteT, ByteAT > mpi_streambuf_type
Definition: mpistream.hh:154
bool m_mpi_stream_finalized
tracks to see if mpi stream was finalized
Definition: mpistream.hh:333
virtual std::streamsize flush()
flushes the mpi buffer and output buffer
Definition: mpistream.hh:101
basic_mpi_ostream & put(char const c)
write char
Definition: mpistream.hh:298
basic_mpi_ostreambase(std::string filename, size_t buffer_size_, int master_rank, bool append)
Construct a mpi stream.
Definition: mpistream.hh:158
void print_header(std::string const &header)
Definition: mpistream.hh:175
const std::size_t default_buffer_size
Default gzip buffer size, change this to suite your needs.
Definition: mpistream.hh:36
std::basic_ostream< Elem, Tr > & ostream_reference
Definition: mpistream.hh:71
basic_mpi_streambuf(std::string filename, std::size_t buffer_size_, int master_rank_, bool append)
Construct a mpi stream.
Definition: mpistream.ipp:32
MPI_FILE_STATUS
reported file status after opening
Definition: mpistream.hh:50
basic_mpi_ostream< wchar_t > mpi_wostream
Definition: mpistream.hh:340
void init()
set global 'init_was_called' to true
Definition: init.cc:26
A stream decorator that takes raw input and zips it to a ostream.
Definition: mpistream.hh:64
std::basic_ostream< Elem, Tr > & ostream_reference
Definition: mpistream.hh:147
virtual std::streamsize flush_final()
Definition: mpistream.hh:106
mpi_streambuf_type * rdbuf()
returns the underlying mpi ostream object
Definition: mpistream.hh:170