java.nioに触ってみた(UDP通信編)
大容量のデータを高速通信をしようとして、UDPデータ送受信のプログラムを書いていたのだが、どうにもパフォーマンスが出なくて悩んでいた。所謂、以下にあるようなマルチスレッド型のサーバを書いていた。
... DatagramSocket socket = new DatagramSocket(4649); byte[] buffer = new byte[8192]; while(true) { DatagramPacket packet = new DatagramPacket(buffer, buffer.length); socket.receive(packet); byte[] receiveData = new byte[8192]; System.arraycopy(packet.getData(), 0, receiveData, 0, packet.getData().length); Worker worker = new Worker(receiveData, packet.getSocketAddress()); pool.execute(worker); } ...
詳しくは書いてないけど、Workerってのが受け取ったパケット一つを処理するワーキングスレッドで、パケットを受け取るたびにスレッドプールにワーカを渡して処理させている。こんな感じで書いていたのだが、パフォーマンスがちっとも出ない。数十MByteのデータを秒間3MByte/sくらいのスピードで送るとパケットの取りこぼしが大量に発生する。
冷静に考えると、スレッドが多すぎたのだ。8KByteのパケットで秒間1MByte/secを出そうとしたら、単純計算で128スレッドが1秒間に生成されることになる。スレッドプールで使い回しているとはいえ、これではパフォーマンスは出ない。
C言語だと、select()/poll()あたりを利用することで、データを受け取るまでブロックしないということが出来るので、わざわざマルチスレッドにする必要はない。で、Javaでこれを実現するためにはNew IOというパッケージを使う必要があるみたいだ。
というわけで、ざっくり調べつつ書いてみた。TCPでのNew IOの使い方の資料はいっぱいあるんだが、UDPだととたんに少なくなって結構苦労したが、やってみれば実はTCPと大差ない感じだった。
DatagramChannel datagramChannel; Selector selector; .... datagramChannel = DatagramChannel.open(); datagramChannel.socket().setReuseAddress(true); datagramChannel.socket().bind(new InetSocketAddress(port)); datagramChannel.configureBlocking(false); selector = Selector.open(); datagramChannel.register(selector, SelectionKey.OP_READ, new IOHandler());
まずは、上記のような感じでチャンネルとセレクタを初期化する。チャンネルっていうのは入出力をラップするものだと思えばよくって、DatagramChannelだとかServerSocketChannel, SocketChannel, FileChannelなどがある。ファイル記述子とその操作をラップしたオブジェクトだと思えばいいのかな。で、セレクタってのはselect()/poll()に対応するものだと思えばいい。
上記のコードではDatagramChannelを初期化して、セレクタと関連づける。このチャンネルが読み込み可能な状態になる(=パケットを受信する)と、IOHandlerに処理が委譲される。IOHandlerは自分で定義したクラスで、実際にパケットを受信したときの処理を行う。
while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for(Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) { SelectionKey key = it.next(); it.remove(); Handler handler = (Handler)key.attachment(); handler.handle(key); } }
上記は実際のサーバプロセスのコード。セレクタのselect()メソッドでイベントを待つ。イベントを取得するとselectedKeys()メソッドで、イベントが発生したチャンネル一覧を取得する。その後のfor分でチャンネル毎に処理を行う感じ。attachment()関数でregister時にアタッチしたオブジェクトを取得可能なので、ここでIOHandlerオブジェクトが取得できる。複数のチャンネルで待っている場合は、ハンドラを別々に登録することで、チャンネル毎の処理を分けて記述することが出来る。ちなみに、UDP通信だと別にいらない気もする…。
interface Handler { public void handle(SelectionKey key) throws ClosedChannelException, IOException; } class IOHandler implements Handler { @Override public void handle(SelectionKey key) throws ClosedChannelException, IOException { DatagramChannel channel = (DatagramChannel)key.channel(); ByteBuffer buf = new ByteBuffer(8192); SocketAddress address = channel.receive(buf); buf.flip(); channel.send(buf, address); } }
で、実際に処理を行うハンドラクラスが上記のような感じ。別段、handle()メソッドの中身を直接サーバプロセス内のwhile文に書いちゃっても結果は一緒。DatagramChannelでは、receiveとsendを使ってクライアント側とやりとりを行う。receiveの返り値にクライアントのSocketAddressが返ってくるので、それを利用してデータを送り返す感じになる。上記のコード例だと、やってきたデータを熨斗はつけないまでも、そのまま送り返す感じになっている。
これでパケット受信毎にプロセスを作らない仕組みが出来た。select()/poll()っぽい感じのコードがNew IOのライブラリを使うことで書くことが出来る。別段New IOなんて新しいパッケージでもないんだけど、Javaのフィーチャーの中ではあまりメジャーじゃないし、自分でも使ったことが無かったから結構新鮮。
実はtomcatも5.5からNew IOになってるとか。やっぱり有名どころのオープンソースのコードはざっとでいいから目を通しておかないと、世の中の潮流から取り残されちゃうね。そのうちファイル転送におけるNew IOを使った場合と使わなかった場合の性能比較でもやってみようかなと思う。
ちなみに、New IOに関して言えばマイコミジャーナルのライトニングJavaあたりに結構詳しく書いてあるので、そっち見た方がいいかも。UDPで書いてる記事が見あたらなかったのでさっくり書いてみた。でもまぁ、大して変わらんよなw