11 #ifndef INCLUDED_utility_io_mpistream_IPP
12 #define INCLUDED_utility_io_mpistream_IPP
23 namespace mpi_stream {
37 ) : m_buffer( buffer_size_, 0 ), master_rank_( master_rank )
43 MPI_Comm_rank (MPI_COMM_WORLD, &
my_rank_);
57 MPI_Recv(&buf, 2, MPI_INT, master_rank_, MPI_STREAM_TAG, MPI_COMM_WORLD, &stat );
61 std::cerr <<
"ERROR when opening mpistream to write to " <<
filename << std::endl;
79 buf[ 1 ] = header.size();
81 buf[ 3 ] = channel_id_;
85 MPI_Send(buf, 4, MPI_INT, master_rank,
MPI_STREAM_TAG, MPI_COMM_WORLD );
86 MPI_Send(const_cast<char*> (header.data()), header.size(), MPI_CHAR, master_rank,
MPI_STREAM_TAG, MPI_COMM_WORLD );
114 if ( pptr() && pptr() > pbase() ) {
115 if ( traits_type::eq_int_type( overflow( traits_type::eof() ), traits_type::eof() ) )
return -1;
135 bool const test_eof = traits_type::eq_int_type( c, traits_type::eof() );
136 int w =
static_cast< int >( pptr() - pbase() );
141 if ( send_to_master( pbase(), w ) ) {
142 this->setp( pbase(), epptr() - 1 );
143 return traits_type::not_eof( c );
145 return traits_type::eof();
174 uInt avail_out =
static_cast< uInt
>( buffer_size_ *
sizeof(
char_type) );
180 buf[ 1 ] = avail_out;
182 buf[ 3 ] = channel_id_;
186 MPI_Send(buf, 4, MPI_INT, master_rank,
MPI_STREAM_TAG, MPI_COMM_WORLD );
187 MPI_Send(next_out, avail_out, MPI_CHAR, master_rank,
MPI_STREAM_TAG, MPI_COMM_WORLD );
211 int const buffer_size =
static_cast< int >( pptr() - pbase() );
212 send_to_master( pbase(), buffer_size );
213 std::streamsize total_written_byte_size = buffer_size;
215 MPI_Comm_rank (MPI_COMM_WORLD, &my_rank_);
222 buf[ 3 ] = channel_id_;
224 MPI_Send(buf, 4, MPI_INT, master_rank_,
MPI_STREAM_TAG, MPI_COMM_WORLD );
228 return total_written_byte_size;
261 std::cout <<
"call to reset_state" << std::endl;
284 b2 = 0xFF & ( x_ >> 8 );
285 b3 = 0xFF & ( x_ >> 16 );
286 b4 = 0xFF & ( x_ >> 24 );
288 out_.
write( &b1, 1 );
289 out_.
write( &b2, 1 );
290 out_.
write( &b3, 1 );
291 out_.
write( &b4, 1 );
298 #endif // INCLUDED_utility_io_mpistream_IPP
ocstream cerr(std::cerr)
Wrapper around std::cerr.
Altered zipstream library header.
int_type overflow(int_type c)
OVERFLOW.
cmplx w(cmplx z, double relerr)
char_vector_type m_buffer
basic_mpi_ostream & write(char const *str, std::streamsize const count)
write a string
bool send_to_master(char_type *, std::streamsize)
+++ SEND_TO_MASTER ++++
void print_header(std::string const &)
static void put_long_as_uint32(ostream_reference out_, unsigned long x_)
std::basic_ostream< Elem, Tr > & ostream_reference
void reset_state()
resets the mpi stream and zeros the crc
std::string filename(const std::string &path)
virtual std::streamsize flush()
flushes the mpi buffer and output buffer
ocstream cout(std::cout)
Wrapper around std::cout.
virtual ~basic_mpi_streambuf()
basic_mpi_streambuf(std::string filename, std::size_t buffer_size_, int master_rank_, bool append)
Construct a mpi stream.
byte_type * byte_buffer_type