No Such Blog or Diary

«Prev || 1 | 2 | 3 |...| 11 | 12 | 13 |...| 57 | 58 | 59 || Next»

istringstream + binary_iarchive ではまってみる

MPIで可変サイズのデータを通信するために boost::serialization を使おうとして,受信部に下のようなコードを書いた.繰り返しサイズ不定なデータを受信するので,バッファはvector<char>にしてサイズを楽に変えられるようにしている.そして,受信したデータの復元のためにバッファをistringstreamに包んでbinary_iarchiveに投げている.細かいことを気にしないと正しく動くように見える.コンパイルも通るし.

std::vector<char> buf;
for (int stage = 1; stage < procs; stage <<= 1) {
  // snip
  unsigned int s = 0;
  MPI_Recv(&s, 1, MPI_INT, target, TAG1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  if(s > buf.size()) buf.resize(s);
  MPI_Recv(&(*buf.begin()), s, MPI_BYTE, target, TAG2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  std::istringstream  is(&(*buf.begin()), std::ios_base::in | std::ios_base::binary);
  boost::archive::binary_iarchive bai(is);
  // snip
}

が,これを動かすとbinary_iarchiveのコンストラクタでbad_allocを食らって落ちる.なぜ?

しばらく頭も回らず原因が分からなかったけどよく考えるとistringstreamのeofが判定できないんじゃないかと気づく.そのせいでbinary_iarchiveがめちゃくちゃな量を読みに行くかなんかしてメモリ不足で落ちているのではないかと.

ということで,下のように書き換えたら動いた.でもstring作る分だけ無駄だよなぁ.どうすりゃいいんだろう?

  std::string str(&(*buf.begin()), s);
  std::istringstream  is(str, std::ios_base::in | std::ios_base::binary);
  boost::archive::binary_iarchive bai(is);

MPICH1とMPICH2とスレッド

とりあえず,スレッドサポートはMPI-2で初期化関数がMPI_Init_threadとなる.んで,こいつの第三引数に使いたいスレッドサポートレベルを投げる.その値は,MPI_THREAD_SINGLE(スレッドなし),MPI_THREAD_FUNNELED(メインのスレッドだけMPI使う),MPI_THREAD_SERIALIZED(同時には一つのスレッドだけがMPIを使用),MPI_THREAD_MULTIPLE(どうとでも使え)のいずれか.ということで,MPI_THREAD_MULTIPLEを投げておけば安全.でも実際にはサポートされていない場合もあるのが残念.

MPICHの1.2.7p1だと手元でコンパイルした限りではMPI_THREAD_FUNNELEDしかサポートしてないらしい(ch_p4, ch_shmem).よって,メインスレッド以外でMPI通信しつつ,メインスレッドが別スレッドを寝て待つとかすると挙動不審になる.ときどきメインスレッドが起きるせいか,待っていると取り合えず動いてくれるように見えるときもある.ま,基本MPICH1でスレッド使うなと.

MPICH2のほうだと,ch3:shmとch3:ssmがMPI_THREAD_SERIALIZEDを,ch3:sockとch3:nemesisがMPI_THREAD_MULTIPLEをサポートしているらしい(少なくともconfigureで指定できる).ch3:sockに関してはそれなりにスレッドでMPIをぐちゃぐちゃに使う下のプログラムでもちゃんと動いた.ch3:shmだと別スレッドがrecvで待っているときにbarrierとか叫んだときに動かなくなったりする.

ということで,MPICH2使えと.

#include <mpi.h>
#include <iostream>
#include <iomanip>
#include <unistd.h>
#include <pthread.h>
 
enum {
  TAG_1,
  TAG_4,
};
 
pthread_t thread;
int rank;
int procs;
 
void* thread_func(void* arg)
{
  std::cout << "[" << rank << ",t] entering thread_func" << std::endl;
  int d = 0;
  int target = (rank+procs-1) % procs;
  MPI_Status st;
  std::cout << "[" << rank << ",t] receiving form " << target << std::endl;
  MPI_Recv(&d, sizeof(int), MPI_BYTE, target, TAG_1, MPI_COMM_WORLD, &st);
  std::cout << "[" << rank << ",t] received form " << target << std::endl;
  d = d + 1;
  std::cout << "[" << rank << ",t] BARRIOR!!! " << std::endl;
  MPI_Barrier(MPI_COMM_WORLD);
  std::cout << "[" << rank << ",t] sending to " << target << std::endl;
  MPI_Send(&d, sizeof(int), MPI_BYTE, target, TAG_4, MPI_COMM_WORLD);
  std::cout << "[" << rank << ",t] sent to " << target << std::endl;
 
  std::cout << "[" << rank << ",t] exitting thread_func" << std::endl;
  return NULL;
}
 
void* main_func(void* arg)
{
  std::cout << "[" << rank << ",m] entering main_func" << std::endl;
  int d = rank;
  int target = (rank+1) % procs;
  MPI_Status st;
  std::cout << "[" << rank << ",m] sending to " << target << std::endl;
  MPI_Send(&d, sizeof(int), MPI_BYTE, target, TAG_1, MPI_COMM_WORLD);
  std::cout << "[" << rank << ",m] sent to " << target << std::endl;
  std::cout << "[" << rank << ",m] receiving form " << target << std::endl;
  MPI_Recv(&d, sizeof(int), MPI_BYTE, target, TAG_4, MPI_COMM_WORLD, &st);
  std::cout << "[" << rank << ",m] received form " << target << std::endl;
  void *ret;
  std::cout << "[" << rank << ",m] joining" << std::endl;
  pthread_join(thread, &ret);
  std::cout << "[" << rank << ",m] joined" << std::endl;
  std::cout << "[" << rank << ",m] exiting main_func" << std::endl;
  return NULL;
}
 
const char *thread_support_str(int p)
{
  switch(p){
  case MPI_THREAD_SINGLE:     return "MPI_THREAD_SINGLE";
  case MPI_THREAD_FUNNELED:   return "MPI_THREAD_FUNNELED"; 
  case MPI_THREAD_SERIALIZED: return "MPI_THREAD_SERIALIZED";
  case MPI_THREAD_MULTIPLE:   return "MPI_THREAD_MULTIPLE";
  default: return "What?";
  }
}
 
int main(int argc, char *argv[])
{
  int prob;
  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prob);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &procs);
  if(rank==0) {
    std::cout << "# " << procs << " procs" << std::endl;
    std::cout << "# thread support = " << thread_support_str(prob)  << std::endl;
  }
  pthread_create(&thread, NULL, thread_func, NULL);
  main_func(NULL);
  MPI_Finalize();
  return 0;
}

久々にあなごる

bubble sortぐらいならsedでも素で組める.ということで474B.比較とインクリメントの計算がそれなりに大きくなっているのでうまく縮まらない.さてどうするか.

GCCのOpenMPでboolの和をとると…

C++のOpenMPでは最大値が取れないと言ったら,「boolなら+で最大値取れるじゃん,嘘言うな.」と返ってきた.意味があるかどうかはさておき,確かに出来ることなので実際にやってみた.

ソースは次のとおり.bsにbool値を適当に入れて,それの和をbool変数rに得る計算をreduction(+:r)で並列化する.最後に結果の出力として r を吐き出す.コンパイルはg++に-fopenmpo付けてやった.

#include<iostream>
#include<vector>
 
int k = 2;
int main(int argc, char *argv[])
{
  int n = 10000000;
  std::vector<bool> bs(n);
  for(int i = 0; i < n; i++) {
    bs[i] = i % k;
  }
  bool r = false;
#pragma omp parallel for reduction(+:r)
  for(int i = 0; i < n; i++) {
    r += bs[i];
  }
  int res = r;
  std::cout << res << std::endl;
}

結果:逐次で動かすと1が返ってくるが,OpenMPで並列化したら8コア(スレッド)で8が返ってきた.

普通に考えればrはboolなのでintへの変換時に0か1にしかならないと思うのだけど…,なぜに8? そもそも逐次と結果違うってのもどうかと.なんとなくreductionの結果をとるときにrの型がboolでなくintとかにされている気がする.ま,詳しくはまた今度調べよう.

最大値を取りたいだけだけど

コンテナxに入ってる正整数(int)の最大値をとるのに

std::accumulate(x.begin(), x.end(), 0, (const int& (*)(const int&,const int&))std::max<int>)

とか書くのはバカなのかなぁ.素直にstd::max_element使えと言われそう.イテレータが返ってくることに注意する必要があるけど.

libstdc++ parallel mode のバグ

コンパイル時に -D_GLIBCXX_PARALLEL つけても,プログラム中で __gnu_parallel::transform とか明示的に呼んでも,ちっとも並列化してくれない.最後の引数に __gnu_parallel::parallel_balanced とか指定するとやっと並列で動いてくれる.こんな動作が意図したものなのかどうかを問い合わせたらやっぱりバグだそうで.

とりあえず問い合わせ時に書いておいた対処法でパッチ作ってもらえたのでそれを当てつつ様子を見る.

«Prev || 1 | 2 | 3 |...| 11 | 12 | 13 |...| 57 | 58 | 59 || Next»
Search
Feeds

Page Top