FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

Amazon SQSを使ってみた③

今回で3回目となりましたSQSネタです。
1回目ではSQSの概要を説明し、2回目では実際のコードを用いて説明しました。
今回はより実践的な利用方法について説明していきたいと思います。

f:id:no_sugiyama:20150722162517p:plain

キューイングを実装するとき、ジョブを割り込ませたいというケースってあると思います。
例えば、時間のかかるジョブがキューに入ってるけど、先にすぐ終わるジョブをやってしまいたいとか、
無料会員のアクションより、有料会員のアクションを先に実行したいとか、そんなケースが挙げられます。
しかしながら1回目の記事で紹介したとおりSQSは順序性を保証しないという特徴があります。

これを解決するのが優先度付きキューという概念です。
クラウドデザインパターンでもPriorityQueueパターンという、似たような手法が紹介されています。


優先度付きキューとは

f:id:no_sugiyama:20150722162539p:plain
優先度付きキューとは、その名の通りキューに優先度が付いたものになります。
まず、優先度の高いキューと優先度の低いキューを用意します。
client側ではジョブの優先度に応じて、どちらのキューに登録するか決めます。
worker側ではジョブを取得するときに、はじめに優先度の高いキューにジョブが登録されているか調べます。
登録されていなければ優先度の低いキューを調べます。
このようにすることで、優先度が高いキューにジョブを登録すれば、そのジョブは割り込みで優先して実行されることになります。

実際に動かしてみる

実際に動かしてみましょう。
事前準備として、前回と同様の事前準備が必要です。
そして、AWSの管理画面上で、priority_queueとsecondary_queueというふたつのキューを作成しておきましょう。

client側

まずはジョブを登録するclient側から見ていきましょう。
こちらは2秒毎に10個のジョブを登録するプログラムになります。
9個目のみ優先度の高いキューに登録します。
下記のコードを書いたファイルにsetqueue.phpという名前をつけて作業ディレクトリに保存します。

<?php
require_once('aws.phar');

use Aws\Sqs\SqsClient as SqsClient;

$config = array (
    'key' => YOUR_ACCESS_KEY,
    'secret' => YOUR_SECRET_KEY,
    'region' => 'ap-northeast-1'
);
date_default_timezone_set('Asia/Tokyo');

//SqsClientオブジェクトを作成
$client = SqsClient::factory($config);

//キューの登録、9回目のキューのみ優先度高で登録
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);
setQueue($client, "priority_queue", '優先度:高/登録時刻'.date('c'));
sleep(2);
setQueue($client, "secondary_queue", '優先度:低/登録時刻'.date('c'));
sleep(2);

function setQueue($client, $queueName, $message){
    //Queueの名前からQueueUrlを取得
    $queueUrl = $client->getQueueUrl(array('QueueName' => $queueName));

    //メッセージを登録
    $client->sendMessageBatch(
        array(
            'QueueUrl' => $queueUrl['QueueUrl'],
            'Entries' => array(
                array(
                    'Id' => '1',
                    'MessageBody' => $message
                )
            )
        )
    );
}


worker側

次にジョブを取得するworker側を見ていきましょう。
こちらは5秒毎にキューからメッセージを取得するプログラムになります。
下記のコードを書いたファイルにgetqueue.phpという名前をつけて作業ディレクトリに保存します。

<?php
require_once('aws.phar');

use Aws\Sqs\SqsClient as SqsClient;

$config = array (
    'key' => YOUR_ACCESS_KEY,
    'secret' => YOUR_SECRET_KEY,
    'region' => 'ap-northeast-1'
);

//SqsClientオブジェクトを作成
$client = SqsClient::factory($config);

while(true){
    //優先度の高いキューを調べる
    $message = getMessage($client, 'priority_queue');

    //優先度の高いキューにジョブが無ければ優先度の低いキューを調べる
    if($message === false){
        $message = getMessage($client, 'secondary_queue');
    }

    if($message === false){
        echo 'キューが空です。'.PHP_EOL;
    }

    sleep(5);
}

function getMessage($client, $queueName){
    //Queueの名前からQueueUrlを取得
    $queueUrl = $client->getQueueUrl(array('QueueName' => $queueName));

    //Queueからメッセージを取得
    $result = $client->receiveMessage(array(
        'QueueUrl' => $queueUrl['QueueUrl']
    ));
    $messages = $result->getPath('Messages/*/Body');

    if(!empty($messages)){
        //メッセージを元に処理を行う
        work($messages[0]);
        //処理が終わったらジョブを削除
        $receiptHandle = $result->getPath('Messages/*/ReceiptHandle');
        $client->deleteMessage(array('QueueUrl' => $queueUrl['QueueUrl'], 'ReceiptHandle' => $receiptHandle[0]));
        return true;
    }else{
        return false;
    }
}

function work($message){
    echo $message.PHP_EOL;
}


実行結果

ターミナルをふたつ開いて(もしくはtmuxなど使って)、片方でgetqueue.phpを実行します。
もう片方でsetqueue.phpを実行します。

% php getqueue.php
キューが空です。
キューが空です。
優先度:低/登録時刻2013-09-16T18:59:59+09:00
優先度:低/登録時刻2013-09-16T19:00:05+09:00
優先度:低/登録時刻2013-09-16T19:00:07+09:00
優先度:高/登録時刻2013-09-16T19:00:20+09:00
優先度:低/登録時刻2013-09-16T19:00:02+09:00
優先度:低/登録時刻2013-09-16T19:00:12+09:00
優先度:低/登録時刻2013-09-16T19:00:10+09:00
優先度:低/登録時刻2013-09-16T19:00:17+09:00
優先度:低/登録時刻2013-09-16T19:00:15+09:00
優先度:低/登録時刻2013-09-16T19:00:22+09:00
キューが空です。
キューが空です。

このように優先度の高いキューに入ったメッセージが割り込みで実行されているのがわかります。
また、優先度の低いキューに入ったメッセージは実行される順序がメチャクチャで、
順序性が保証されていないのもわかりますね。

まとめ

今回は優先度付きキューというテーマについて説明していきました。
クラウドデザインパターン疎結合が鉄則となるので、バッチ処理の起点をSQSにできると捗ります。
しかもSQSは使いようによってはかなり柔軟な利用方法ができるので、試してみてください。
と言いつつも、私は最近AmazonSimpleWorkflowというサービスを知ってしまったので、そちらも調査中です。
どうやらSQSでできること+アルファのことができる様子。
かなり気になりますね。
ただ、リリースして1年以上たつのになかなか日本語の情報が無いです。
英語勉強しろってことですね。

それでは今回はこの辺りで。
皆さん良いAWSライフを〜