Kinesis Client Libraryを利用してKinesis アプリケーションをつくったよ

はじめに

こんにちは。ホットペッパービューティーの開発をしています。長谷川です。

カレーが好きなアラサーエンジニアです。

今回は、実際のプロジェクトで、とあるログ連携の為に、Kinesis Client Library(以下KCL)を利用した、Kinesis アプリケーション(以下Kinesis App)を開発してみたので、開発のポイントなどを書いてみたいと思います。

個人的には、去年検証でちょっとしたサンプルをつくったことはあるんですが、実際のプロジェクトでは今回始めて使いました。

今回の構成

diag

  • Kinesis Streamのログデータを、とあるシステムに転送するために、EC2上でKinesis Appを動かしています。
  • この図だけだと、わざわざKinesis Appを作らなくても他に連携のやり方がありそうな感じですが、今回の要件では自分でログをfetchして、ファイルとして独自に出力するプログラムをつくる必要がありました。

開発のポイント

言語選択

  • KCLの各言語版の開発状況を見ると、Java版が一番AWSのAPIに対する追随が早そうだったので、Java版を使うことにしました。
  • JVM言語であれば、なんでも良かったのですが、メンテナンスを考えると、弊社で一番ポピュラーなJavaに落ち着きました。
  • 特にフレームワークは使っておらず、普通のJava8のアプリケーションとして実装しています。

KCLを使うことのメリット

kinesis-with-worker

  • KCL Worker(Streamと対応して、ログを受け取るプログラム)を複数立ち上げると、shardと連動して、自動でバランシングしてくれます。
    • 例えば、shardが2つのKinesis Streamから、1つのEC2インスタンス上でKinesis Appを起動状態にし、もう1つEC2インスタンスを立ち上げてKinesis Appを起動した場合、開発者が意識すること無くログ出力は2つのインスタンスに平均化されます。(DynamoDBに登録される、Kinesis Application Nameが同じ場合)。

KCL利用のポイントと、Worker起動の流れ

  • 下記は、KCLワーカーを起動するMainクラスと、ログをfetchするProcessorクラス、Processorを生成するFactoryクラスの実装イメージです。
  • classpathに、https://github.com/awslabs/amazon-kinesis-client を追加しています。
    • 今回はMavenを使って、Version 1.7.0を使用しました。

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
public class Main {
    // 独自定義の設定保持クラス
    private static Configuration configuration;
    public static void main(String... args) {
        // 設定読み込みます。
        configuration = ConfigurationsFactory.invoke();
        AWSCredentialsProvider provider = getAWSCredentialsProvider();
        // Workerクラスの生成。StreamNameTypeはEnumで複数のStreamが定義されています。
        List<Worker> workers = Stream.of(StreamNameType.values()).map(type -> {
            KinesisClientLibConfiguration kinesisConfig = getKinesisClientLibConfiguration(provider, type);
            AmazonDynamoDBClient dynamoDbClient = getAmazonDynamoDBClient(provider);
            IRecordProcessorFactory recordProcessorFactory = new Factory(type);
            Worker worker = new Worker.Builder()
                    .recordProcessorFactory(recordProcessorFactory)
                    .config(kinesisConfig)
                    .dynamoDBClient(dynamoDbClient)
                    .build();
            }));
            return worker;
        }).collect(Collectors.toList());
        // 今回はStreamが複数あるので、ここでListのWorkerごとにスレッド生成して、並列起動(Worker#run)させます。
        ExecutorService executor = Executors.newFixedThreadPool(workers.size());
        workers.stream().forEach(worker -> {
            executor.submit(new WorkerTask(worker));
        });
        executor.shutdown();
    }
    private static AmazonDynamoDBClient getAmazonDynamoDBClient(AWSCredentialsProvider provider) {
    // DynamoDB Clientの生成
    }
    private static KinesisClientLibConfiguration getKinesisClientLibConfiguration(AWSCredentialsProvider provider,
            StreamNameType type) {
    // KinesisClientLibConfigurationの生成
    }
    private static AWSCredentialsProvider getAWSCredentialsProvider() {
    // AWSCredentialsProviderの生成
    }
}
class WorkerTask extends Thread {
    private Worker worker;
    public WorkerTask(Worker worker) {
        this.worker = worker;
    }
    public void run() {
        worker.run();
    }
}

Factory.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
public class Factory implements IRecordProcessorFactory {
    // StreamNameTypeはEnumで複数のStreamが定義されています。
    private StreamNameType streamNameType;
    public Factory(StreamNameType streamNameType) {
        this.streamNameType = streamNameType;
    }
    @Override
    public IRecordProcessor createProcessor() {
        return new Processor(streamNameType);
    }
}

Processor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
public class Processor implements IRecordProcessor {
  // StreamNameTypeをフィールドに保持しています。
  private StreamNameType streamNameType;
  public Processor(StreamNameType streamNameType) {
    // StreamNameTypeはEnumで複数のStreamが定義されています。
    this.streamNameType = streamNameType;
  }
    @Override
    public void initialize(InitializationInput initializationInput) {
    // 初期化処理
    }
    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
    // processRecordsInput.getRecords() で、Streamのログデータを取り出します。
    // 要件に合わせてStreamNameType別のログ転送処理処理を書きます。
    // processRecordsInput.getCheckpointer()で、IRecordProcessorCheckpointerを取り出し、checkpoint()処理もここで行います。
    }
    @Override
    public void shutdown(ShutdownInput shutdownInput) {
    // シャットダウン処理
    }
}
  • Worker#runすると、FactoryからIRecordProcessorを実装した、Processorが生成されます。
  • Processorで、Streamからログをfetchすることができ、そのdataを利用して、要件に合わせてログの転送や出力を行います。

スケールアウト戦略

ログの流入が増えた場合は、まず、shardの分割を検討します。AWSの公式ドキュメントによると下記の上限があるので、超えそうになる前に監視ツールなどからアラートを飛ばすとよいでしょう。

シャードとは、Amazon Kinesis ストリームの基本的なスループットの単位です。1 シャードは、1 MB/秒のデータ入力と 2 MB/秒のデータ出力の能力を提供します。1 つのシャードは 1 秒当たり最大 1,000 件の PUT レコードをサポートできます。ストリームを作成するときに、必要なシャードの数を指定します。たとえば、2 シャードのストリームを作成できます。このストリームは、データ入力のスループットが 2 MB/秒、データ出力のスループットが 4 MB/秒で、1 秒間に最大 2,000 件の PUT レコードに対応できます。

shardの分割は、aws-cliを使用してコマンドラインで以下のように行うことができます。(この例は1つのshardを2つに分割しています。)

1
2
3
4
5
6
7
8
9
10
11
12
# stream名を変数化
$ STREAM_NAME=foobar-stream
# 分割するshardIdを確認し定義します(今回はshardId-000000000000を分割します)
$ aws kinesis describe-stream --stream-name=$STREAM_NAME
$ SPLIT_SHARD_ID=shardId-000000000000
# stream分割に必要なhash_keyの計算を行います
$ HASH_KEY=`echo "2^128 / 2" | bc` && echo ${HASH_KEY}
# split-shardコマンドで対象のshardを2分割します
$ aws kinesis split-shard --stream-name ${STREAM_NAME} --shard-to-split ${SPLIT_SHARD_ID} --new-starting-hash-key ${HASH_KEY}

shard分割を行うと、今度はボトルネックがWorkerに移動すると思います。そうなった場合は、Workerのプロセスを増やしたり、動かすEC2インスタンスの数を増やすことになると思います。

2016.12.27 追記

この記事を公開後、親切なお兄さんに教えてもらったのですが、2016年11月にKineisのUpdateがありました。

新たにUpdateShardCount APIが追加され、shardの分割がより簡単になったようです。AWSのリリースや、AWS CLIのリファレンスによりますと、上記では、hash_keyの計算をしていますが、下記のようにshard数の指定だけで、分割が行えます。

1
2
# shardが1つのものから、2つに分割するオペレーションのサンプルです
$ aws kinesis update-shard-count --stream-name $STREAM_NAME --target-shard-count 2 --scaling-type UNIFORM_SCALING

このAPI自体の制約はいくつかありますが、以前より監視ツール等と連携したスケールアウトが容易になったと思います。

おわりに

  • 実装にあたっては、KCLの情報が少なくて、いくつか実装上悩んだところはあったものの、リリースできるプログラムになりました。
    • ネット上の実装サンプルはAPI v1のものが多かったり、KCL自体のドキュメントもあまりないため、使い方がわからないところは結局KCLのソースを見ながら実装する羽目に。。。
  • 途中にも書きましたが、Kinesis Streamからfetchして更に連携するのであれば、要件によっては、fluentdのin_kinesis pluginやAWS Lambdaなども検討したほうが良いと思います。今回は、要件にマッチしなかったので、自分でKinesis Appを書くことにしました。
  • 弊社では一緒に、AWSを使って、システムをガンガンつくってくれる仲間を募集しています。ご興味ありましたら是非!