No Such Blog or Diary

«Prev || 1 | 2 | 3 |...| 5 | 6 | 7 |...| 57 | 58 | 59 || Next»

文字コードとの戦い,というか,行儀の悪い html との戦い

Python で web のクローラを書いてるのだけど,文字コード周りで行儀の悪いページとの戦いが…… こちらの武器は chardet と encutils とで. 

行儀のよいファイルだけなら encutils で判定して終了なのだけど,世の中 ISO-2022-JP と宣言して EUC-JP を使うようなページも平気であるので困る.ここらは実際に unicode へ変換してみてエラーが出たら chardet の解析結果を使うとかで対処.

んで,一見行儀の良さそうな sjis ファイルなのだけど実は丸の中に数字の入った文字を含む cp932 だったりとかもある.大量に.行儀悪い.更に悪いことに,chardet が自信満々 (confidence = 1) で cp932 を sjis 判定してくれるので困る.面倒なので sjis は全部 cp932 として扱うと幸せかも知れない.

あとは x-euc-jp とか x-sjis とかも python は認識してくれないので手動で置き換えを……

とりあえず chardet と encutils の併用と sjis を cp932 に置き換えるのとでだいぶ安定してきた.

というか,ページ内でコードが違うとか何考えてるんだろう?

Hadoop で実行時に作ったオブジェクトを Mapper とかで使いたい

のだけど,良いやり方が思いつかない.

ByteArrayOutputStream を ObjectOutputStream に食わせてオブジェクトを書きだして,そいつを Apache Commons Codec の Base64 に食わして String にして,それを Configuration にセットして Mapper の setup メソッド内でオブジェクトを復元して…… とかやってみたのだけど,これって頭悪い気がする.

でもまあ,とりあえずこれで実行時に組み立てた計算を汎用的な Mapper にやらせるのが素直にできるようになったから良しとしようか.DistributedCache を使ってファイルにバイナリで書きだして共有するってのもアリな気もするけど面倒なのでやめた.

さて,次は InputFormat に手を入れて連続する複数のKey/Valueペアを一気に mapper に食わせるようにするか.commutative でない演算子での集約をするのに combiner は使えないし reducer に全部やらせるのもアホだし mapper で適度に順番通りに潰せるのが望ましい.

cluster SSHというもんを教えてもらった

昨日,複数のターミナルにコマンドをインタラクティブに実行する方法を適当にでっち上げたのだけど,それと似たことが出来る cluster SSH というものを教えてもらった.せっかくなのでちょいと試しに使ってみた.sudo apt-get install clusterssh で用意完了.

cluster SSH は、複数の ssh 接続を作って,それぞれにターミナルを用意して,もうひとつのウインドウにあるテキストフィールドへの入力を選択したターミナルにインタラクティブに送ってコマンドを実行できる.入力を送り出すターミナルの集合はGUIから選択できる.また,あとから ssh の接続を増やしたり減らしたり出来るみたい.

んで,cluster SSH だと,上矢印キーとかCrtl-Rとかのコントロールコードも即座に送ってくれるため,過去のコマンドを呼び出すのが一斉に簡単に出来る.これは良い機能.……それぞれのターミナルでヒストリが違う時には非常に困るけど.

昨日の方法だとコントロールコードを送るとか面倒だし1文字ずつ送るのも工夫がいる.

一方で,cluster SSH だと,for 文を回して各ターミナルに微妙に異なるコマンドを送り付けるという技が出来ない気がする.あと,コマンドを送り付けるターミナルの選択をスクリプトで自動的にやるとか出来ない気もする.そもそも,コマンドを入力するテキストフィールドに機械的にコマンドを投げ込む方法が分からない.

ということで,クラスタに対して均一な操作を加えるときには cluster SSH は非常に便利そうだなぁという印象.ソフトウェアのインストールとか.でも,変態的な使い方が出来そうに見えないのでつまらない気もする.

複数のシェルにコマンドをお届け

今回の論文を書くための実験で,複数のHadoopのクラスタセットひとつひとつに対応するターミナルを開き,それぞれで似たようなコマンドを実行して実験結果を取るという作業をした.コマンドをカットアンドペーストで全部のターミナルに張り付けて実行するとかバカすぎる.なんとかしたい.

という背景で,複数のターミナル(シェル)に対して一箇所から機械的にコマンドを打ち込みたい要求に駆られた.pssh とかだとインタラクティブに出来ないし,そもそも出力のためのターミナルは複数のウインドウにわかれていて欲しい.やりたいのは入力の一本化だけ.

単純に考えると,fifo を使って入力ターミナルからのコマンドを出力ターミナルの標準入力に送ればいい気がする.なので,とりあえず,出力用ターミナルで

cat pipe | bash

としておいて,入力用ターミナルでコマンドを

echo 'cd hoge' > pipe
echo 'ls' > pipe

とかやって送り付けようとした.

が,失敗.何故かコマンドをひとつ発行しただけで出力ターミナルの bash が終わってしまう.これでは複数のコマンドをインタラクティブに発行できず,psshと変わらない.これは望むものではないので原因を探って手法を改善せねばらない.

上の単純な仕組みが失敗した原因は次の通り:最初に送られたコマンドを出力側の bash が読み込んだとき,pipe が EOF に達してしまって "cat pipe | bash" のコマンドが終了してしまう.pipe には後から内容が追加されるけど,とりあえず最初のコマンドの後に一度はファイルの終端まで達するので,シェルのパイプが終了してしまう,と.

さてどうしたものか.

適当に検索してみたら同じようなことを考える人はいたようで,解決策が見つかった."cat pipe | bash" の代わりに次を使えばいい.

tail -f pipe | bash

この tail の -f オプションは,EOFに達しても tail を止めずEOF後に追加された内容も後ろに吐く,という指定.手前のコマンドの出力のプログレスを表示するとかの目的に使われるオプションらしい.これをやっておけばコマンドを間隔を置いて複数送り付けることが出来る.

ということで,前準備@入力ターミナル:

ps="1 2 4"
for p in $ps; do mkfifo pipe-$p; done

前準備@出力ターミナル達:

tail -f pipe-1 | bash
tail -f pipe-2 | bash
tail -f pipe-4 | bash

あとは,適宜全部のターミナルに対して for 文使って少しずつ違うコマンド送ったり,一斉にコマンド発行したり,特定のターミナルにだけ cat 使ってインタラクティブに仕事したり,tee と合わせて特定のサブセットにだけコマンドを送ったり,等など.それなりに使い勝手が良い気がする.

for p in $ps; do echo "procs=$p" > pipe-$p; done
for p in $ps; do echo 'this is pipe-$procs' > pipe-$p; done
for p in $ps; do echo "./runExp.sh" > pipe-$p; done
...
cat | while read line; do echo $line; done > pipe-4
...
cat | while read line; do echo $line; done | tee pipe-2 | tee pipe-4
...
for p in $ps; do echo "cat result-$p.txt" > pipe-$p; done

cat の while read line は1行単位でコマンドを送るため.ここらへんは関数を定義しておいたほうが頭よさげ.

あとは最初のセットアップのところを for 文でいけると楽.ということで,出力ターミナルを一斉に作ってパイプをつなぐコマンド:

for p in $ps; do (echo "(echo 'echo this is pipe-$p'; tail -f pipe-$p) | bash" > pipe-$p &) ;( gnome-terminal -x bash pipe-$p &); done

これで満足.

Hadoop で忘れがちな点

ひとつのファイルは HDFS 上に物理的にチャンクに区切られて置かれる.FileSplit はひとつのファイルを論理的に区切ったもの.ひとつの FileSplit に対して Map のジョブひとつが対応して実行される.細かくは,ひとつの FileSplit がひとつの RecordReader を生成し,その RecordReader がその Map ジョブへと Key/Valueペアのリストを提供する.このとき,その RecordReader はその FileSplit で指定された論理的な領域を越えて(HDFS上にあるだろう)元のファイルを読み込むことが出来る.

一般に,HDFSのチャンクはFileSplitではないし,FileSplitとMapジョブに提供される仕事とは完全に一致はしない.

実際,デフォルトのTextInputFormatが作るRecordReaderは,与えられたFileSplitの最終行を完成させるため,次のFileSplitの境域へと境界を越えて最後の行を読みに行く.例えば,改行なしの大きなファイルを入力とすれば,最初のFileSplitから作られたRecoardReaderは改行を求めてファイル全部を読んでしまう.結果として,先頭のMapジョブはファイル全体を入力として受け取る.

Hadoop に言うことを聞かせるまでのメモ

大体にしてインストールというかセッティングが面倒なので手順のメモ.

  1. ダウンロードして展開.クラスタの場合,本体の置き場所は NFS 上にしとくとインストールが楽(ログのディレクトリに注意).さもなければ全マシンの同じパスに置いておくべし(パスをマシンごとに変えていいのかどうか分からん).
    wget http://ftp.kddilabs.jp/infosystems/apache//hadoop/core/hadoop-0.21.0/hadoop-0.21.0.tar.gz
    tar xfvz hadoop-0.21.0.tar.gz
    cd hadoop-0.21.0
  2. 環境ファイル conf/hadoop-env.sh の編集:JAVA_HOME と HADOOP_HEAPSIZE と HADOOP_LOG_DIR.

    ログの出力先はデフォルトで hadoop の置いてあるディレクトリ(HADOOP_HOME)の下の logs ディレクトリなので,これが NFS 上だったりすると酷いことになる.なので,ローカル上に取るように指定を入れておく.

    export JAVA_HOME=/usr/lib/jvm/java-6-sun/
    export HADOOP_HEAPSIZE=2048
    export HADOOP_LOG_DIR=/tmp/user/node000/logs
    
  3. 設定ファイル conf/slaves:計算に使うノード名を列挙.

    node000
    node001
    node002
    node003
    ...
    
  4. 設定ファイル conf/core-site.xml の編集.

    namenode として使う計算機の名前(URI)を書いておく.マルチコア計算機1台なら localhost で十分.クラスタの場合には localhost ではまずいので,ちゃんとした計算機名前を書いておく.

    <configuration>
         <property>
             <name>fs.default.name</name>
             <value>hdfs://node000:9000/</value>
         </property>
    </configuration>
    
  5. 設定ファイル conf/hdfs-site.xml の編集.

    dfs.name.dir と dfs.data.dir に /tmp とかローカルなディスクのパスを書いておく(設定書かなくてもデフォルトで /tmp に適当なディレクトリを作って使ってくれるけど).NFS上のパスを指定するのは何考えているか分からない.あとは dfs.blocksize (HDFS上のチャンクのサイズ)を適当に変えてもいいかも知れない.Mapper への一仕事の最大値はこのサイズ.

    <configuration>
         <property>
             <name>dfs.name.dir</name>
             <value>/tmp/user/node000/name-dir/</value>
         </property>
         <property>
             <name>dfs.data.dir</name>
             <value>/tmp/user/node000/data-dir/</value>
         </property>
         <property>
             <name>dfs.blocksize</name>
             <value>4194304</value>
         </property>
    </configuration>
    
  6. 設定ファイル conf/mapred-site.xml の編集.

    とりあえず,mapred.job.tracker に JobTracker のいる計算機の名前を書いておく.マルチコア1台なら localhost で十分.クラスタなら計算機名をちゃんとかく.あと,計算が軽い時には JVM を使い回さないと遅くて困るので mapred.job.reuse.jvm.num.tasks に無限回の使い回しを意味する -1 を入れておく.ノード一つ当たりのジョブの数は mapred.tasktracker.map.tasks.maximum と mapred.tasktracker.reduce.tasks.maximum で指定しておく.マルチコアならコア数以上のジョブが同時にあって構わないので,コア数以上の数字を書いておく(2コアマシンしか無いのに200とか書くとプロセスが多すぎて死ぬ).また,Hadoop ではひとつのファイルを複数の FileSplit に分割し,その FileSplit 1つに対して Mapper が1つ呼ばれて動く(間に RecordReader が挟まって「FileSplit→KVペアの集合」という変換が入るけど).その FileSplit のサイズ指定が mpreduce.input.fileinputformat.split.maxsize でできる.実際には,これで指定したサイズと HDFS のチャンクサイズとの小さいほうが実際の FileSplit のサイズになる(全タスク数の指定を入れたときには,さらにそこから導かれるサイズとの小さいほうかね).

    <configuration>
         <property>
             <name>mapred.job.reuse.jvm.num.tasks</name>
             <value>-1</value>
         </property>
         <property>
             <name>mapred.job.tracker</name>
             <value>node000:9001</value>
         </property>
         <property>
             <name>mapred.tasktracker.map.tasks.maximum</name>
             <value>2</value>
         </property>
         <property>
             <name>mapred.tasktracker.reduce.tasks.maximum</name>
             <value>2</value>
         </property>
         <property>
             <name>mapreduce.input.fileinputformat.split.maxsize</name>
             <value>4194304</value>
         </property>
    </configuration>
    
  7. テスト.

    namenode を初期化して,他のノードを起動して,最初にディレクトリ作って,そこにファイルを転送して,サンプル動かして,出力確認して,邪魔な出力消して,そしてノード停止.

    bin/hadoop namenode -format
    bin/start-all.sh
    bin/hadoop fs -mkdir input
    bin/hadoop fs -put conf/* input
    bin/hadoop jar hadoop-mapred-examples-0.21.0.jar grep input output 'dfs[a-z.]+'
    bin/hadoop fs -cat output/*
    bin/hadoop fs -rmr output
    bin/stop-all.sh
    

これで言う事聞くようになった.

その他無茶な設定 in conf/mapred-site.xml:なるべくディスクつかなわいように無理をする.

     <property>
         <name>io.sort.record.percent</name>
         <value>1.0</value>
     </property>
     <property>
         <name>io.sort.spill.percent</name>
         <value>1.0</value>
     </property>
     <property>
         <name>io.sort.mb</name>
         <value>4</value>
     </property>
     <property>
         <name>mapred.inmem.merge.threshold</name>
         <value>2048</value>
     </property>
     <property>
         <name>mapred.job.reduce.input.buffer.percent</name>
         <value>1.0</value>
     </property>
     <property>
         <name>mapred.job.shuffle.input.buffer.percent</name>
         <value>1.0</value>
     </property>
     <property>
         <name>mapred.job.shuffle.merge.percent</name>
         <value>1.0</value>
     </property>
«Prev || 1 | 2 | 3 |...| 5 | 6 | 7 |...| 57 | 58 | 59 || Next»
Search
Feeds

Page Top