首先是 Net::Amazon::S3,用法沒什麼問題,寫成某個 module 的 function:(這邊不太處理發生錯誤的情況,請自己
eval{} 下去處理...)sub get {
my $self = shift;
# S3 的 filename
my $key = shift;
my $bucket = $self->s3->new(
aws_access_key_id => 'xxx',
aws_secret_access_key => 'xxx',
retry => 1,
timeout => 10, # 預設 timeout 30 秒
);
my $r = $bucket->get_key($key);
return $r->{value};
}接下來用 Coro 的 async{} 開平行處理:use Coro;
sub {
foreach my $key (@allKeys) {
async {
my $data = $self->get($key);
# 這邊是對 $data 處理的 code
};
};
}但你有時候會需要等所有的 async{} 都跑完再處理剩下的事情,這時候就要拿 AnyEvent 的 condvar:use AnyEvent;
use Coro;
sub {
my $cv = AnyEvent->condvar;
$cv->begin;
foreach my $key (@allKeys) {
$cv->begin;
async {
my $data = $self->get($key);
# 這邊是對 $data 處理的 code
$cv->end;
};
};
$cv->end;
$cv->recv;
}然後,你會發現打 AWS S3 打太兇時會傳回 500 給你,所以要用 Coro::Semaphore 限制連線數量,並且用 Perl 5.10 之後才有的語法來協助程式管理變數:use 5.010;
use AnyEvent;
use Coro;
use Coro::Semaphore;
sub {
state $sema;
if (!defined $sema) {
# 如果還沒建立的話開 semaphore,數量定為 32
$sema = Coro::Semaphore->new(32);
}
my $cv = AnyEvent->condvar;
$cv->begin;
foreach my $key (@allKeys) {
$cv->begin;
async {
$sema->down;
my $data = $self->get($key);
$sema->up;
# 這邊是對 $data 處理的 code
$cv->end;
};
};
$cv->end;
$cv->recv;
}其實就是現有的東西套一套而已,不過真的還蠻好寫的... 包一包就能正常運作 :o
沒有留言:
張貼留言