Home > Archives > 2008年12月

2008年12月

apple lines (あなごる)

130Bから縮まない.相変わらず数字をディクリメントする部分が長すぎる.入力に特化してしまえばもう少し縮まる気もするけれど… 負けだよなぁ.

それはさておきそろそろ帰る支度をするか.

istringstream + binary_iarchive ではまってみる

MPIで可変サイズのデータを通信するために boost::serialization を使おうとして,受信部に下のようなコードを書いた.繰り返しサイズ不定なデータを受信するので,バッファはvector<char>にしてサイズを楽に変えられるようにしている.そして,受信したデータの復元のためにバッファをistringstreamに包んでbinary_iarchiveに投げている.細かいことを気にしないと正しく動くように見える.コンパイルも通るし.

std::vector<char> buf;
for (int stage = 1; stage < procs; stage <<= 1) {
  // snip
  unsigned int s = 0;
  MPI_Recv(&s, 1, MPI_INT, target, TAG1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  if(s > buf.size()) buf.resize(s);
  MPI_Recv(&(*buf.begin()), s, MPI_BYTE, target, TAG2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  std::istringstream  is(&(*buf.begin()), std::ios_base::in | std::ios_base::binary);
  boost::archive::binary_iarchive bai(is);
  // snip
}

が,これを動かすとbinary_iarchiveのコンストラクタでbad_allocを食らって落ちる.なぜ?

しばらく頭も回らず原因が分からなかったけどよく考えるとistringstreamのeofが判定できないんじゃないかと気づく.そのせいでbinary_iarchiveがめちゃくちゃな量を読みに行くかなんかしてメモリ不足で落ちているのではないかと.

ということで,下のように書き換えたら動いた.でもstring作る分だけ無駄だよなぁ.どうすりゃいいんだろう?

  std::string str(&(*buf.begin()), s);
  std::istringstream  is(str, std::ios_base::in | std::ios_base::binary);
  boost::archive::binary_iarchive bai(is);

MPICH1とMPICH2とスレッド

とりあえず,スレッドサポートはMPI-2で初期化関数がMPI_Init_threadとなる.んで,こいつの第三引数に使いたいスレッドサポートレベルを投げる.その値は,MPI_THREAD_SINGLE(スレッドなし),MPI_THREAD_FUNNELED(メインのスレッドだけMPI使う),MPI_THREAD_SERIALIZED(同時には一つのスレッドだけがMPIを使用),MPI_THREAD_MULTIPLE(どうとでも使え)のいずれか.ということで,MPI_THREAD_MULTIPLEを投げておけば安全.でも実際にはサポートされていない場合もあるのが残念.

MPICHの1.2.7p1だと手元でコンパイルした限りではMPI_THREAD_FUNNELEDしかサポートしてないらしい(ch_p4, ch_shmem).よって,メインスレッド以外でMPI通信しつつ,メインスレッドが別スレッドを寝て待つとかすると挙動不審になる.ときどきメインスレッドが起きるせいか,待っていると取り合えず動いてくれるように見えるときもある.ま,基本MPICH1でスレッド使うなと.

MPICH2のほうだと,ch3:shmとch3:ssmがMPI_THREAD_SERIALIZEDを,ch3:sockとch3:nemesisがMPI_THREAD_MULTIPLEをサポートしているらしい(少なくともconfigureで指定できる).ch3:sockに関してはそれなりにスレッドでMPIをぐちゃぐちゃに使う下のプログラムでもちゃんと動いた.ch3:shmだと別スレッドがrecvで待っているときにbarrierとか叫んだときに動かなくなったりする.

ということで,MPICH2使えと.

#include <mpi.h>
#include <iostream>
#include <iomanip>
#include <unistd.h>
#include <pthread.h>
 
enum {
  TAG_1,
  TAG_4,
};
 
pthread_t thread;
int rank;
int procs;
 
void* thread_func(void* arg)
{
  std::cout << "[" << rank << ",t] entering thread_func" << std::endl;
  int d = 0;
  int target = (rank+procs-1) % procs;
  MPI_Status st;
  std::cout << "[" << rank << ",t] receiving form " << target << std::endl;
  MPI_Recv(&d, sizeof(int), MPI_BYTE, target, TAG_1, MPI_COMM_WORLD, &st);
  std::cout << "[" << rank << ",t] received form " << target << std::endl;
  d = d + 1;
  std::cout << "[" << rank << ",t] BARRIOR!!! " << std::endl;
  MPI_Barrier(MPI_COMM_WORLD);
  std::cout << "[" << rank << ",t] sending to " << target << std::endl;
  MPI_Send(&d, sizeof(int), MPI_BYTE, target, TAG_4, MPI_COMM_WORLD);
  std::cout << "[" << rank << ",t] sent to " << target << std::endl;
 
  std::cout << "[" << rank << ",t] exitting thread_func" << std::endl;
  return NULL;
}
 
void* main_func(void* arg)
{
  std::cout << "[" << rank << ",m] entering main_func" << std::endl;
  int d = rank;
  int target = (rank+1) % procs;
  MPI_Status st;
  std::cout << "[" << rank << ",m] sending to " << target << std::endl;
  MPI_Send(&d, sizeof(int), MPI_BYTE, target, TAG_1, MPI_COMM_WORLD);
  std::cout << "[" << rank << ",m] sent to " << target << std::endl;
  std::cout << "[" << rank << ",m] receiving form " << target << std::endl;
  MPI_Recv(&d, sizeof(int), MPI_BYTE, target, TAG_4, MPI_COMM_WORLD, &st);
  std::cout << "[" << rank << ",m] received form " << target << std::endl;
  void *ret;
  std::cout << "[" << rank << ",m] joining" << std::endl;
  pthread_join(thread, &ret);
  std::cout << "[" << rank << ",m] joined" << std::endl;
  std::cout << "[" << rank << ",m] exiting main_func" << std::endl;
  return NULL;
}
 
const char *thread_support_str(int p)
{
  switch(p){
  case MPI_THREAD_SINGLE:     return "MPI_THREAD_SINGLE";
  case MPI_THREAD_FUNNELED:   return "MPI_THREAD_FUNNELED"; 
  case MPI_THREAD_SERIALIZED: return "MPI_THREAD_SERIALIZED";
  case MPI_THREAD_MULTIPLE:   return "MPI_THREAD_MULTIPLE";
  default: return "What?";
  }
}
 
int main(int argc, char *argv[])
{
  int prob;
  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prob);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &procs);
  if(rank==0) {
    std::cout << "# " << procs << " procs" << std::endl;
    std::cout << "# thread support = " << thread_support_str(prob)  << std::endl;
  }
  pthread_create(&thread, NULL, thread_func, NULL);
  main_func(NULL);
  MPI_Finalize();
  return 0;
}

Home > Archives > 2008年12月

Search
Feeds

Page Top