あらすじ
前回は、ReactiveXとは?RxJSの入門と導入手順について話しました。今回は、その中で紹介したObservable・Streamがどういうものなのか解説します。
Observable・Streamは、ReactiveXを使ってコードを書く上でも最も重要な要素の1つになります。これらがないとReactiveXのコードを書くことはできないと言っても過言ではないとも言えます。
それぞれの動作と特徴
Observable
Streamに流れてきたデータをフィルタリングしたり、加工したりして必要なデータに変換をした後、定義された処理を実行するものです。Observableは、Next・Completd・Errorの3つの処理を持っています。Observableなものを接続して、変換されたデータを扱ったり・処理を実行したりすることもできます。
Stream
各Observerに渡すためのデータを流すところです。あらゆるデータ・イベントはここへ流されて、必要なデータをObserverが監視して処理していきます。
今回作るもの
今回は、前回例に挙げた水道をモチーフにしたプログラムを書いていってみます。実際にObservableな要素をどう定義していくと、どう処理をすることが可能なのか説明します。
目指すところ
- 各Functionを宣言的に書くことができる
- 非同期処理をもっとシンプルに扱いやすくする
具体的な実装
水道に水(データ、イベント)を流し、各蛇口(Observableな要素)で必要な処理をして、水やお湯・ろ過処理などを実装してみます。
貯水
Streamにデータを流すところです。ろ過装置で不必要なデータを処理できるか検証するため、あえてノイズを含めたデータを流します。この貯水には、ごみ(DUST)・ 汚れた水(DARTY_WATER)・ 綺麗な水(WATER)の3種類のデータが存在します。これらを配列で保持し、Streamへ流します。
const DUST = 0; const DARTY_WATER = 1; const WATER = 2; // Water stored var store = [DUST, WATER, DARTY_WATER, DUST, WATER, WATER, DARTY_WATER, WATER, DUST];
今回は、DUSTが3つ、DARTY_WATERが2つ、WATERが4つ用意します。
水出力
変換されたデータをアウトプットとして出力するところです。コントローラー、もしくはお湯変換装置からデータを受け取ってデータを外部にアウトプットします。
まず、Streamを通って流れてきたデータを出力するロジックを書いていきます。
const rx = require('rx'); const DUST = 0; const DARTY_WATER = 1; const WATER = 2; // Water stored var store = [DUST, WATER, DARTY_WATER, DUST, WATER, WATER, DARTY_WATER, WATER, DUST]; // Send stored data into stream rx.Observable.from(store) // Receive data from stream and output data to console .subscribe(function(data) { var output = "None"; switch (data) { case DUST: output = "DUST"; break; case DARTY_WATER: output = "DARTY_WATER"; break; case WATER: output = "WATER"; break; default: break; } console.log(output); });
rx.Observable.from(store)
from関数は、Steamにデータを1つずつ流します。
subscribe(function(data) {...
subscribe関数を使うことで、Streamに流れてきたデータに対してObserverが1つずつ出力処理を行っていきます。すると出力が以下の通りになります。
まだろ過を行わずに、そのまま処理を行っているのでDUSTとDARTY_WATERがそのまま表示されています。
このままでは、飲める水にはならないのでろ過装置を実装していきます。
ろ過装置
データに混ざっているノイズを除去するところです。貯水から流れてきたデータを見て、不要なデータをフィルタリングしたあとお湯変換装置と水出力へ流します。
ここでは2つのフィルタリングを使って綺麗な水に変換していきます。
- filter関数を使ってDUSTを除きます。流れてきたデータがDUSTだった場合にStreamから除外します。
- map関数を使って、DARTY_WATERをWATERに変換します。DARTY_WATERが流れてきた場合、WATERに変換してStreamに流します。
まず1の実装です。
// Send stored data into stream rx.Observable.from(store) // Filter dust of stream .filter(function(data) { return data == WATER || data == DARTY_WATER; }) // Receive data from stream and output data to console .subscribe(function(data) {...
先ほどのsubscribe関数の前に追加をします。そうすると出力する処理の前に、DUSTをフィルタリングをすることができるようになります。
出力は以下の通りです。
DUSTがデータから除外されてデータが表示されています。
次に2の処理を実装します。
// Filter dust of stream .filter(function(data) { return data == WATER || data == DARTY_WATER; }) // Translate DARTY_WATER to WATER .map(function(data) { if (data == DARTY_WATER) data = WATER; return data; }) // Receive data from stream and output data to console .subscribe(function(data) {
filter関数とsubscribe関数の間にmap関数を追加します。このmap関数は、filter関数で処理されたフィルタリングされたデータを受け取り、DARTY_WATERをWATERに変換して出力へ渡しています。
実行結果は以下の通りです。
これで流れてくる水が全て飲める水に変換されましたね。
これを実装したコードをGitHubに上げているので、気になる人はぜひ見てみてください。
>https://github.com/hiro-nagami/rx-sample/tree/master/water_supply
結論
このようにReactiveXでは、StreamとObservableを使ってデータをフィルタリングしたり変換したりすることをシンプルに実装することができます。データ=>フィルタ=>データ=>変換=>データ=>出力と構造もわかりやすくなります。
上手く使うことによって構造をよりシンプルに書くことができ、よりバグなどが少ないコードを書くことができるようになるのです。
次回は、このコードをSubjectというものを使って実装置き換えていきます。