Perl の Rx だから Px というものを夢想

Perl には Rx ってないのかな,って思ったらなさそうである.

reactivex.io

Rx

Rx は Reactive Extensions のこと.うまく紹介できないので紹介記事を:

blog.xin9le.net

すごい雑で個人的な解釈でいうと,Stream 的な流れてくる値に,任意の関数を適用して,Observer パターン で受け取って扱うもの.スレッド間のやりとりはスケジューラーによってよしなにされる.

余談として,自分のケースでは Android 開発で知ったので,スマホアプリ開発ではさかんに使われていて知名度が高そう.

Px

Perl で Rx の模倣をする Px があったらどうだろうと思った:

github.com

とりあえず最小限のものをつくって試すことにした.スケジューラーやオペレーター等,Rx に重要な実装は圧倒的にされていない.

たわむれにめちゃダサロゴを描いたらいい感じに使ってはいけない感がでた:

https://raw.githubusercontent.com/mangano-ito/Px/master/docs/assets/px.png

サンプル

use strict;
use warnings;
use feature qw(say);

use Px::Helpers qw(
    observable_from_sub
    px_subscribe
    px_map
    px_filter
    yield
);

sub main {
    my $observable = _generate();
    _output($observable);
}

sub _generate {
    return observable_from_sub {
        say '=== generate: begin ===';
        say 'yield => 1';
        $_->next(1);
        say 'yield => 2';
        $_->next(2);
        say 'yield => 3';
        yield 3;
        say '=== generate: done ===';
    };
}

sub _output {
    my ($observable) = @_;

    px_subscribe { say 'subscribe <= ' . $_ }
    px_map       { $_ + 1 }
    px_filter    { $_ > 10 }
    px_map       { $_ * 10 }
    $observable;
}

main;

Rx に親しんでいる人は既視感を覚えるような感じになっていると思う.適用の順番は Perl の流儀に則って,後→先にできるように ヘルパー関数 をつくった.

結果

この結果はこうなる:

=== generate: begin ===
yield => 1
yield => 2
subscribe <= 21
yield => 3
subscribe <= 31
=== generate: done ===
  1. 値が 1, 2, 3 と生産されて流れてくる
  2. 各値は 10 倍されて 10, 20, 30 となる
  3. が,途中 $_ > 10 のものだけがフィルタされ 20, 30 のみが流れるようになる
  4. その値が再度 + 1 され,21, 31 となり
  5. 最終的に say で値が出力される

observable_from_sub

observable_from_sub というヘルパーにより,Stream生産者が作成される.observable_from_sub のスコープの中では暗黙的に $_ は生産者のインスタンスが与えられる.

$_->next(value) によって値を emit することができる.これはよくある generator における yield value と同じような感じ.

なので,yieldemit できるようなヘルパー関数を作ったら,generator 的な処理と同一視できそうな感じになった.

subscribe

他方,この値を受け取る Stream購読者subscribe ヘルパーによって購読できる. subscribe のスコープの中では,流れてきた値を暗黙的に $_ として受け取ることができる.

オペレーター

そして,各 Streammapfilter の操作により,流れてきた値に関数を適用したり,フィルタしたりすることができる.これらも同様に px_mappx_filter ヘルパー関数により適用できる.

これは普通の Perlmapgrep と同一視できる.そして同様に $_ で暗黙的に流れてきた値を受け取ることができる.

observable_from_list

リストから生成するヘルパー関数もつくった:

use Px::Helpers qw(observable_from_list);

my $observable2 = observable_from_list 4, 5, 6, 1, 7, 8;
_output($observable2)

この $observable2 をさっきの例の _output に与えても同じインタフェースで扱えて,結果はこうなる:

subscribe <= 41
subscribe <= 51
subscribe <= 61
subscribe <= 71
subscribe <= 81

これは RxJava でいう,Observable.fromArrayObservable.fromIterable と同等のものとして扱える:

reactivex.io

かくして,値の流れがなんとなく Rx っぽい実装になる.

RxJava + Kotlin での同様の例

第一の observable_from_sub の 例は RxJava + Kotlin で書くと:

package myPackage

import io.reactivex.Observable

fun main(args: Array<String>) {
    val observable = Observable.create<Int> { emitter ->
        println("=== generate: begin ===")
        println("yield => 1")
        emitter.onNext(1)
        println("yield => 2")
        emitter.onNext(2)
        println("yield => 3")
        emitter.onNext(3)
        println("=== generate: done ===")
    }

    observable.map{ it * 10 }
        .filter{ it > 10 }
        .map{ it + 1 }
        .subscribe{ println("subscribe <= $it") }
}

これと同等になる.

observable_from_list の例はこう書き換える:

val observable = Observable.fromArray(4, 5, 6, 1, 7, 8)

なので,大体同じイメージで記述されている.

さて

Perl でそういうものが必要か,というと,非同期的な処理を扱うことがあまりなさそうで需要がなさそうだ.

Perlmap とかの操作もあるし,List::*Utils で一通りの操作もできる.具体的なリストに対しての操作として扱えるようになっている.

ところで,他の言語では array から SQL の実行結果ディレクトリツリー まで,列挙可能なものは Iterable (や,似た概念) として抽象化されていたりする (例: PHPDirectoryIterator)

その上で foreach のような操作はあるインタフェースに対する操作として扱えるようになっているし,ユーザーがそのインタフェースにのっとって実装することができる.(例: PHPTraversable や,C++range-based for)

なので,そういうインタフェースの一つとして使うことはできなくもないだろうと思った.

モチベーション

そもそも,データベースから SELECT するときにページネーションしようとして,LIMIT でオフセットを変えながら取得するのを実装していたときに,Perl で Rx 的な概念があったらどうなるだろうか,と考えたのがきっかけ.

そうすればこう書けそう (イメージ):

sub main {
    my $p = observable_from_sub { paginated_select($_) };
    subscribe { export($_) } $p;
}

sub paginated_select {
    my $emitter = shift;
    my $db = Some::DB->new();
    my $limit = 10;

    for (
        $offset = 0;
        $rows = select($db, $offset, $limit);
        $offset += $limit
     ) {
        $emitter->next($_) for $rows;
    }
}

sub select {
    my ($db, $offset, $limit) = @_;

    return $db->select
        ->from('users')
        ->order_by('id', 'ASC')
        ->offset($offset)
        ->limit($limit)
        ->fetch();
}

sub export {
    my ($row) = @_;

    say 'row = ' . join(', ', $row->@*);
}

ただ,この程度ならそのまま書いても特に辛いって感じでもないな,と思った.

結論

あまりいいユースケースを示せないので特に結論を出せずに終わった.

関係ないですが,コードが非常に見づらいのでデザインを調整する必要があると思った.