星期四, 4月 21, 2011

用 AnyEvent、Coro 與 Net::Amazon::S3,同步抓取 S3 的資料...

這幾天用 AWS S3 用的不亦樂乎,為了加快程式執行速度,發現需要不少「撇步」解決,寫下來讓之後接手的人比較容易上手...

首先是 Net::Amazon::S3,用法沒什麼問題,寫成某個 module 的 function:(這邊不太處理發生錯誤的情況,請自己 eval{} 下去處理...)
sub get {
    my $self = shift;

    # S3 的 filename
    my $key = shift;

    my $bucket = $self->s3->new(
        aws_access_key_id => 'xxx',
        aws_secret_access_key => 'xxx',
        retry => 1,
        timeout => 10, # 預設 timeout 30 秒
    );

    my $r = $bucket->get_key($key);

    return $r->{value};
}
接下來用 Coroasync{} 開平行處理:
use Coro;
sub {
    foreach my $key (@allKeys) {
        async {
            my $data = $self->get($key);

            # 這邊是對 $data 處理的 code
        };
    };
}
但你有時候會需要等所有的 async{} 都跑完再處理剩下的事情,這時候就要拿 AnyEvent 的 condvar:
use AnyEvent;
use Coro;
sub {
    my $cv = AnyEvent->condvar;
    $cv->begin;
    foreach my $key (@allKeys) {
        $cv->begin;
        async {
            my $data = $self->get($key);

            # 這邊是對 $data 處理的 code

            $cv->end;
        };
    };
    $cv->end;
    $cv->recv;
}
然後,你會發現打 AWS S3 打太兇時會傳回 500 給你,所以要用 Coro::Semaphore 限制連線數量,並且用 Perl 5.10 之後才有的語法來協助程式管理變數:
use 5.010;
use AnyEvent;
use Coro;
use Coro::Semaphore;
sub {
    state $sema;
    if (!defined $sema) {
        # 如果還沒建立的話開 semaphore,數量定為 32
        $sema = Coro::Semaphore->new(32);
    }

    my $cv = AnyEvent->condvar;
    $cv->begin;
    foreach my $key (@allKeys) {
        $cv->begin;
        async {
            $sema->down;
            my $data = $self->get($key);
            $sema->up;

            # 這邊是對 $data 處理的 code

            $cv->end;
        };
    };
    $cv->end;
    $cv->recv;
}
其實就是現有的東西套一套而已,不過真的還蠻好寫的... 包一包就能正常運作 :o

沒有留言: