エンジニア

Amazon SQSを使った非同期処理の実装

投稿日:2014年5月7日 更新日:

こんにちは。サーバ担当の中村です。

本日はAmazon Web Services(以下AWS)のサービスの1つであるSimple Queue Service(以下SQS)を使った非同期処理の実装について書いてみようと思います。


Q.SQSとは?

A.分散型キューサービスで高い信頼性とスケーラビリティを兼ね備えたサービス。
しかもとても安い!

Q.何ができるの?

A.キューにメッセージを「送信する」、「溜めておく」、「取得する」、「削除する」ことができる。
この機能を使いリアルタイム性を求められない処理をキューに溜めておき、非同期に処理することが可能。
失敗した時にはメッセージを消さないようにしておけば最低1回は実行されることが保証される。

Q.具体的にはどのように使うの?

A.例えばログの出力をSQSに任せることでアプリケーションの処理とログ出力を分離することができる。
例)

【SQSを使わない場合】
ユーザーが課金してくれた時に、処理部分とログ出力部分を同じ処理内に記述することが多い。
ログ処理が必須だとした場合、何かしらの理由でログ出力に失敗すると処理全体が失敗とみなされ、ユーザーにも失敗が通知されてしまう。
【SQSを使う場合】
ユーザーが課金してくれた時に、ログ出力部分をSQSへのメッセージ送信だけにして実際のログ出力は後回し。
これにより処理全体が軽くなり、ログ出力の成否が処理の成否には関係がなくなる。
キューに溜まったメッセージは別サーバからバッチ処理で定期的に取得し、ログ出力を実行する。

※このように「リアルタイム性が求められる処理」と「リアルタイム性が求められない処理」を分離し、レスポンスや安定性の向上を図ることができます。


次はAWSのコンソールでSQSを使う準備をしてみます。
①AWSコンソールのSQSの画面を開きます。
AWS_SQS1

②[Create New Queue]を押すとこのような画面が出てくるのでキュー名を入力して[Create Queue]
AWS_SQS2

③新しいキューが生成されて下部のプロパティに[URL]が表示されます。
※番号の部分(赤枠)をプログラムで使用します
AWS_SQS3

キューを使う準備はこれだけでOKです。


では具体的にどのようにプログラムを記述するのかFuelPHPとSDKを使って実際にやってみます。

<?php
require APPPATH . 'vendor/autoload.php';

use \Aws\Common\Aws;
use \Aws\Common\Enum\Region;
use Aws\DynamoDb\DynamoDbClient;
use Aws\Sqs\SqsClient;

class Model_Aws extends \Model
{
    // SQSクライアントオブジェクトを生成  
    private static function get_sqs_client()
    {
        $aws_config = \Config::get('aws_config.credential');
        $aws_config['region'] = Region::TOKYO;
        return SqsClient::factory($aws_config);
    }

    // SQSにメッセージを送信  
    // 先程作成した[logs]というキュー名を$sqs_nameに入れて呼び出す  
    public static function send_sqs_message($sqs_name, $data)
    {
        // SQSクライアントを取得  
        $client = self::get_sqs_client();
        // キューIDを指定してメッセージを送信する  
        $client->sendMessageBatch(array('QueueUrl' => 'https://sqs.ap-northeast-1.amazonaws.com/[キューID]/'.$sqs_name,
                                        'Entries' => array(array('Id' => '1', 'MessageBody' => json_encode($data)))
        ));
    }

    // SQSからメッセージを受信  
    public static function receive_sqs_message()
    {
        // SQSクライアントを取得  
        $client = self::get_sqs_client();

        // キューのURLを取得する  
        $listqueues = $client->listQueues();
        $queueUrls  = $listqueues['QueueUrls'];

        // キューからメッセージを取り出す  
        foreach ($queueUrls as $queueUrl) {
            $result = $client->receiveMessage(array(
                'QueueUrl' => $queueUrl,
            ));
            foreach ($result->getPath('Messages/*/Body') as $messageBody) {
                // json_decodeして配列化する  
                $data = json_decode($messageBody);
                $target_table = $data['target_table'];
                // 想定されるデータ形式 (string)log_purchase  
                $values       = $data['values'];
                // 想定されるデータ形式 array('user_id' => '1', 'purchase_date' => '20140501', 'price' => '9800');  

                // ログINSERT実行  
                Model_Logs::insert_log($target_table, $values);
            }
        }
    }
}

このようにSDKを使えば簡単な記述でキューへのメッセージ送信、メッセージ取出しができます。
バッチ処理でメッセージを取り出し、JSON形式のBODY部を解析してログのINSERTをするような実装になると思います。


非同期で良い処理は非同期でやるようにするというのはわかっていても、どうやっていいのかわからないという方はまずはSQSを使ってみるのもいいと思います。

ここまで読んでいただきましてありがとうございました。

採用情報

ワンダープラネットでは、一緒に働く仲間を幅広い職種で募集しております。

-エンジニア
-

© WonderPlanet Inc.