背筋を伸ばしてスタートアップするブログ

株式会社ペライチで開発責任者をやっています。

サーバーレスアプリケーション開発ガイドでLambdaについて学んだ

積読してましたがようやく読みました。AWSLambdaを利用したサーバーレスアーキテクチャの構築方法が解説してあります。

Amazon Web Servicesを使ったサーバーレスアプリケーション開発ガイド

Amazon Web Servicesを使ったサーバーレスアプリケーション開発ガイド

サーバーレスアプリケーションとは何か?サーバーレスのメリットとは?という内容から始まり、AWSを用いてサーバーレスアプリケーションを構築する方法について詳しく書かれています。

サンプルアプリケーションとしては、以下の3つの例がソースコード付きで解説してあり、とても実践的な内容となっていっます。

  • CloudWatchのアラームをトリガーに自動処理する
  • Amazon KinesisとLambdaを使ってTwitterデータをDynamoDBへ保存する
  • LambdaとVue.jsを用いて写真投稿サイトのSPAを作成する

またLambda以外にもサンプルで利用するすべてのAWSサービスについて解説があるため、AWSを使ったことがない人にとっては手を動かしながら入門できるという意味で良さそうと感じました。

今回の記事では、試しに写経してみたTwitterのリアルタイム解析アプリケーションについて記載してみます。

AmazonKinesisでTwitterのデータを受け取りDynamoDBに保存する

Pythonスクリプトを実行しTwitterのタイムラインを取得しKinesisに保存します。それをトリガーにしてLambda関数が呼び出されDynamoDBへとデータが保存されます。

f:id:katsuki1207:20180930003534p:plain

このアーキテクチャではTwitterのタイムラインを処理しますが、大量に流れるデータを処理するという点では様々なユースケースに応用可能です。

以下、AWSの設定や主要なスクリプトのみ記載します。

タイムラインからKinesisにデータを送る

from TwitterAPI import TwitterAPI
import credentials
import boto3
import json

consumer_key = credentials.customer_key
consumer_secret = credentials.customer_secret
access_token_key = credentials.access_token_key
access_token_secret = credentials.access_token_secret

twitter = TwitterAPI(consumer_key, consumer_secret, access_token_key, access_token_secret)
kinesis = boto3.client("kinesis")

res = twitter.request("statuses/filter", {"locations":"122.87,24.84,153.01,46.80"})

for item in res:
    print(item['text'])
    kinesis.put_record(StreamName="twitter-stream", Data=json.dumps(item), PartitionKey=item['timestamp_ms'])

Lamda関数を用意する

ストリームに保存されたツイートのデータから値を取り出してDynamoDBのテーブルに保存します。

import boto3
import json
import base64
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.getenv('TABLE_NAME'))

def lambda_handler(event, context):

    try:
        batch_item_list = []
        for record in event['Records']:
            #Kinesisのレコードはbase64エンコードされているためデコード
            payload = base64.b64decode(record['kinesis']['data'])

            data = json.loads(payload)
            item = {
                    'id' : data['id'], 
                    'timestamp' : int(data['timestamp_ms']), 
                    'text' : data['text']
                }

            batch_item_list.append(item)

        with table.batch_writer() as batch:
            for item in batch_item_list: 
                batch.put_item(
                    Item=item
                )
            return
            
    except Exception as e: 
        logging.error("Something went wrong...")
        logging.error(e.message)
        raise

なお、いくつか誤植があり書籍にあるサンプルコード通りに写経してつくっても動きませんでした。今回は理解を深めるために自分でデバッグしましたが、こちらのページから修正版のソースコードをダウンロードできるようです。

book.mynavi.jp

まとめ

サーバーレスについて概念的な話から具体的な実装までひととおり学べる良い本でした。

Amazon Web Servicesを使ったサーバーレスアプリケーション開発ガイド

Amazon Web Servicesを使ったサーバーレスアプリケーション開発ガイド

以前参加したサーバーレスミートアップ然りLambdaはいろんな活用方法があるので事例をもっと知りたいと思いました。

katsuki.hatenablog.com