Rosetta
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
mpistream.ipp
Go to the documentation of this file.
1 // -*- mode:c++;tab-width:2;indent-tabs-mode:t;show-trailing-whitespace:t;rm-trailing-spaces:t -*-
2 // vi: set ts=2 noet:
3 //
4 // (c) Copyright Rosetta Commons Member Institutions.
5 // (c) This file is part of the Rosetta software suite and is made available under license.
6 // (c) The Rosetta software is developed by the contributing members of the Rosetta Commons.
7 // (c) For more information, see http://www.rosettacommons.org. Questions about this can be
8 // (c) addressed to University of Washington UW TechTransfer, email: license@u.washington.edu.
9 
10 
11 #ifndef INCLUDED_utility_io_mpistream_IPP
12 #define INCLUDED_utility_io_mpistream_IPP
13 
14 // Unit headers
15 #include <utility/io/zipstream.hpp>
16 
17 #include <utility/assert.hh> //for MPI_ONLY macro
18 // C++ headers
19 #include <sstream>
20 
21 namespace utility {
22 namespace io {
23 namespace mpi_stream {
24 
25 template<
26 typename Elem,
27 typename Tr,
28 typename ElemA,
29 typename ByteT,
30 typename ByteAT
31 >
33  std::string MPI_ONLY( filename ),
34  size_t buffer_size_,
35  int master_rank,
36  bool MPI_ONLY( append )
37 ) : m_buffer( buffer_size_, 0 ), master_rank_( master_rank )
38 {
39  //find channel_id_;
40  this->setp( &(m_buffer[0]), &(m_buffer[m_buffer.size()-1]) );
41 
42 #ifdef USEMPI
43  MPI_Comm_rank (MPI_COMM_WORLD, &my_rank_);/* get current process id */
44  // std::cerr << "open " << filename << " from client " << my_rank_ << std::endl;
45  //establish connection with master
46  // akin to "open file"
47  int buf[ 4 ];
48  buf[ 0 ] = my_rank_;
49  buf[ 1 ] = filename.size();
50  buf[ 2 ] = append ? MPI_STREAM_OPEN_APPEND : MPI_STREAM_OPEN;
51  buf[ 3 ] = 0;
52 
53  MPI_Send(buf, 4, MPI_INT, master_rank_, MPI_STREAM_TAG, MPI_COMM_WORLD );
54  MPI_Send(const_cast<char*> (filename.data()), filename.size(), MPI_CHAR, master_rank_, MPI_STREAM_TAG, MPI_COMM_WORLD );
55  MPI_Status stat;
56 
57  MPI_Recv(&buf, 2, MPI_INT, master_rank_, MPI_STREAM_TAG, MPI_COMM_WORLD, &stat );
58  channel_id_ = buf[ 0 ];
59  file_status_ = buf[ 1 ];
60  if ( file_status_ == MPI_FAIL ) {
61  std::cerr << "ERROR when opening mpistream to write to " << filename << std::endl;
62  }
63 
64 #endif
65 }
66 
67 template<
68 typename Elem,
69 typename Tr,
70 typename ElemA,
71 typename ByteT,
72 typename ByteAT
73 >
74 void
76 #ifdef USEMPI
77  int buf[ 4 ];
78  buf[ 0 ] = my_rank_;
79  buf[ 1 ] = header.size();
80  buf[ 2 ] = MPI_STREAM_SEND;
81  buf[ 3 ] = channel_id_;
82  int master_rank = 0;
83 
84  // std::cerr << "sending from client " << my_rank_ << std::endl;
85  MPI_Send(buf, 4, MPI_INT, master_rank, MPI_STREAM_TAG, MPI_COMM_WORLD );
86  MPI_Send(const_cast<char*> (header.data()), header.size(), MPI_CHAR, master_rank, MPI_STREAM_TAG, MPI_COMM_WORLD ); //MPI_CHAR OR MPI_INT ?
87 #endif
88 }
89 
90 template<
91 typename Elem,
92 typename Tr,
93 typename ElemA,
94 typename ByteT,
95 typename ByteAT
96 >
98 {
99  // std::cerr << "destruct mpi_streambuf on node " << my_rank_ << std::endl;
100  flush_final();//communicate closing
101 }
102 
103 ////SYNC
104 template<
105 typename Elem,
106 typename Tr,
107 typename ElemA,
108 typename ByteT,
109 typename ByteAT
110 >
111 int
113 {
114  if ( pptr() && pptr() > pbase() ) {
115  if ( traits_type::eq_int_type( overflow( traits_type::eof() ), traits_type::eof() ) ) return -1;
116  }
117 
118  return 0;
119 }
120 
121 
122 ///OVERFLOW
123 template<
124 typename Elem,
125 typename Tr,
126 typename ElemA,
127 typename ByteT,
128 typename ByteAT
129 >
133 )
134 {
135  bool const test_eof = traits_type::eq_int_type( c, traits_type::eof() );
136  int w = static_cast< int >( pptr() - pbase() );
137  if ( !test_eof ) {
138  *pptr() = c;
139  ++w;
140  }
141  if ( send_to_master( pbase(), w ) ) {
142  this->setp( pbase(), epptr() - 1 );
143  return traits_type::not_eof( c );
144  } else {
145  return traits_type::eof();
146  }
147 }
148 
149 ///+++ SEND_TO_MASTER ++++
150 template<
151 typename Elem,
152 typename Tr,
153 typename ElemA,
154 typename ByteT,
155 typename ByteAT
156 >
157 bool
160 #ifdef USEMPI
161 buffer_
162 #endif
163  ,std::streamsize
164 #ifdef USEMPI
165  buffer_size_
166 #endif
167 )
168 {
169  // std::streamsize written_byte_size = 0, total_written_byte_size = 0;
170 
171 
172 #ifdef USEMPI
173  byte_buffer_type next_out = (byte_buffer_type)buffer_;
174  uInt avail_out = static_cast< uInt >( buffer_size_ * sizeof(char_type) );
175 
176  //size_t remainder = 0;
177 
178  int buf[ 4 ];
179  buf[ 0 ] = my_rank_;
180  buf[ 1 ] = avail_out;
181  buf[ 2 ] = MPI_STREAM_SEND;
182  buf[ 3 ] = channel_id_;
183  int master_rank = 0;
184 
185  // std::cerr << "sending from client " << my_rank_ << std::endl;
186  MPI_Send(buf, 4, MPI_INT, master_rank, MPI_STREAM_TAG, MPI_COMM_WORLD );
187  MPI_Send(next_out, avail_out, MPI_CHAR, master_rank, MPI_STREAM_TAG, MPI_COMM_WORLD ); //MPI_CHAR OR MPI_INT ?
188  return true; // success detection ?
189 
190 #else
191  return false;
192 #endif
193 }
194 
195 template<
196 typename Elem,
197 typename Tr,
198 typename ElemA,
199 typename ByteT,
200 typename ByteAT
201 >
203 #ifdef USEMPI
204  final
205 #endif
206 )
207 {
208  // std::streamsize written_byte_size = 0,
209 
210 
211  int const buffer_size = static_cast< int >( pptr() - pbase() ); // amount of data currently in buffer
212  send_to_master( pbase(), buffer_size );
213  std::streamsize total_written_byte_size = buffer_size;
214 #ifdef USEMPI
215  MPI_Comm_rank (MPI_COMM_WORLD, &my_rank_);/* get current process id */
216  //establish connection with master
217  // akin to "open file"
218  int buf[ 4 ];
219  buf[ 0 ] = my_rank_;
220  buf[ 1 ] = 0;
221  buf[ 2 ] = final ? MPI_STREAM_CLOSE : MPI_STREAM_FLUSH;
222  buf[ 3 ] = channel_id_;
223 
224  MPI_Send(buf, 4, MPI_INT, master_rank_, MPI_STREAM_TAG, MPI_COMM_WORLD );
225 
226 #endif
227 
228  return total_written_byte_size;
229 }
230 
231 
232 // template<
233 // typename Elem,
234 // typename Tr,
235 // typename ElemA,
236 // typename ByteT,
237 // typename ByteAT
238 // >
239 // void basic_mpi_streambuf< Elem, Tr, ElemA, ByteT, ByteAT >::release_file()
240 // {
241 // #ifdef USEMPI
242 // int buf[ 4 ];
243 // buf[ 0 ] = my_rank_;
244 // buf[ 1 ] = filename_.size();
245 // buf[ 2 ] = MPI_RELEASE_FILE;
246 // buf[ 3 ] = 0;
247 // MPI_Send(buf, 4, MPI_INT, master_rank_, MPI_STREAM_TAG, MPI_COMM_WORLD );
248 // #endif
249 // }
250 
251 
252 template<
253 typename Elem,
254 typename Tr,
255 typename ElemA,
256 typename ByteT,
257 typename ByteAT
258 >
260 {
261  std::cout << "call to reset_state" << std::endl;
262 }
263 
264 
265 template<
266 typename Elem,
267 typename Tr,
268 typename ElemA,
269 typename ByteT,
270 typename ByteAT
271 >
272 
275  unsigned long x_
276 )
277 {
278  // yab: 20090414, modified to conform to gmpi standard where
279  // trailer crc and length must both be 32-bit, otherwise there
280  // is breakage in systems where 'unsigned long' is not 32-bit
281  // and external archiving programs end up complaining.
282  char b1, b2, b3, b4; // assuming char is 8 bits
283  b1 = 0xFF & x_;
284  b2 = 0xFF & ( x_ >> 8 );
285  b3 = 0xFF & ( x_ >> 16 );
286  b4 = 0xFF & ( x_ >> 24 );
287 
288  out_.write( &b1, 1 );
289  out_.write( &b2, 1 );
290  out_.write( &b3, 1 );
291  out_.write( &b4, 1 );
292 }
293 
294 } // namespace mpi_stream
295 }
296 }
297 
298 #endif // INCLUDED_utility_io_mpistream_IPP
ocstream cerr(std::cerr)
Wrapper around std::cerr.
Definition: ocstream.hh:290
const int MPI_STREAM_TAG
Definition: mpistream.hh:37
Altered zipstream library header.
int_type overflow(int_type c)
OVERFLOW.
Definition: mpistream.ipp:131
cmplx w(cmplx z, double relerr)
Definition: functions.cc:470
basic_mpi_ostream & write(char const *str, std::streamsize const count)
write a string
Definition: mpistream.hh:309
bool send_to_master(char_type *, std::streamsize)
+++ SEND_TO_MASTER ++++
Definition: mpistream.ipp:158
void print_header(std::string const &)
Definition: mpistream.ipp:75
static void put_long_as_uint32(ostream_reference out_, unsigned long x_)
Definition: mpistream.ipp:273
std::basic_ostream< Elem, Tr > & ostream_reference
Definition: mpistream.hh:227
void reset_state()
resets the mpi stream and zeros the crc
Definition: mpistream.ipp:259
std::string filename(const std::string &path)
Definition: string_util.cc:373
virtual std::streamsize flush()
flushes the mpi buffer and output buffer
Definition: mpistream.hh:101
#define MPI_ONLY(x)
Definition: assert.hh:34
ocstream cout(std::cout)
Wrapper around std::cout.
Definition: ocstream.hh:287
basic_mpi_streambuf(std::string filename, std::size_t buffer_size_, int master_rank_, bool append)
Construct a mpi stream.
Definition: mpistream.ipp:32