Using the Java Kinesis Client Library (KCL) with localstack

Profile picturePosted by Joery Vreijsen on 31-1-2022 at blog.vreijsenj.nl

Wait wait, hear me out: Consuming a multi-sharded kinesis stream by implementing the Java Kinesis Client Library, and to top it off, use localstack to run our code locally. Interested? Great, let’s go!

Only interested in the result? Here is the Github Repository.

Kinesis Data Streams

Kinesis Data Streams is one of the many, many, managed services provided by Amazon Web Services (AWS). Kinesis streams can be sharded based on a partition key, allowing multiple consumers to each consume a specific shard in parallel.

Sharding is a way of scaling the consumers on a stream, while still maintaining the order of the messages based on the partition key. sharding-principle

With this example, you can see that message A-1 can be handled in parallel of message B-1, but messages A-1 and A-2 are always processed in order by the same consumer.

Configuring localstack

Usually we would setup the Kinesis Stream using CloudFormation, Terraform, or the AWS Console itself, but since we want to be able to test our code locally we will use localstack instead.

Localstack is “a fully functional local cloud stack”, which means it gives us a local implementation of almost every AWS service. Luckily we don’t need all the services provided by Amazon, but only Kinesis (obviously), DynamoDB and Cloudwatch.

We need DynamoDB as the Kinesis Client Library (KCL) uses a DynamoDB table to store the checkpoints of all shards. Checkpoints can be considered the state of the client, containing which application has the lease on which shard, and at which sequencenumber the consumer is currently processing the data.

We need CloudWatch as the Kinesis Client Library (KCL) provides metrics to monitor our application.

Setup

1
2
$ pip install localstack
$ pip install awscli-local

Running

1
2
3
4
5
6
7
8
9
10
11
12
$ SERVICES=kinesis,dynambodb,cloudwatch localstack start -d 

     __                     _______ __             __
    / /   ____  _________ _/ / ___// /_____ ______/ /__
   / /   / __ \/ ___/ __ `/ /\__ \/ __/ __ `/ ___/ //_/
  / /___/ /_/ / /__/ /_/ / /___/ / /_/ /_/ / /__/ ,<
 /_____/\____/\___/\__,_/_//____/\__/\__,_/\___/_/|_|

 💻 LocalStack CLI 0.13.3.3

[20:22:20] starting LocalStack in Docker mode 🐳
[20:22:21] detaching

Awesome, we can now interact with our local AWS services by using the awslocal cli and create our Kinesis stream.

1
$ awslocal kinesis create-stream --stream-name some-data-stream --shard-count 2

And that’s all we need, we have our localstack environment and we have our Kinesis data stream with two shards, time to get our hands dirty and build some actual code!

Setting up the Kinesis Client Library

Dependencies

Let’s start with the build.gradle file which lists all the dependencies we will need.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
plugins {
	id 'org.springframework.boot' version '2.6.3'
	id 'io.spring.dependency-management' version '1.0.11.RELEASE'
	id 'java'
}

group = 'com.vreijsen'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

tasks.withType(JavaCompile) {
	options.compilerArgs += "--enable-preview"
}

dependencies {
	annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")

	implementation("org.springframework.boot:spring-boot-starter-webflux")
	implementation('org.springframework:spring-context')

	implementation("javax.servlet:javax.servlet-api:4.0.1")

	// Kinesis Client Library
	implementation('software.amazon.kinesis:amazon-kinesis-client:2.3.10')

	compileOnly 'org.projectlombok:lombok:1.18.20'
	annotationProcessor 'org.projectlombok:lombok:1.18.20'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

tasks.named('test') {
	useJUnitPlatform()
}

Configuration

We’ll have to instruct our application which Kinesis Data Stream to consume, and in which region this stream lives. We can default the region to US_EAST_1 since that is the default region used by localstack.

1
2
3
4
5
6
7
8
9
10
package com.vreijsen.consumer.configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "kinesis")
public class KinesisConfigurationProperties {

    String stream;
    Region region = Region.US_EAST_1;
}

Our application.yaml can then contain the following.

1
2
kinesis:
  stream: "some-data-stream"

Since we’ll be using three AWS services, we have to define a client bean for each one of them.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Profile("localstack")
@Configuration
@RequiredArgsConstructor
public class LocalStackClientConfiguration {

    private static final URI LOCALSTACK_URI = URI.create("http://localhost:4566");

    private final KinesisConfigurationProperties properties;

    @Bean
    public KinesisAsyncClient kinesisAsyncClient() {
        return KinesisClientUtil.createKinesisAsyncClient(
                KinesisAsyncClient.builder()
                        .endpointOverride(LOCALSTACK_URI)
                        .region(properties.getRegion())
        );
    }

    @Bean
    public DynamoDbAsyncClient dynamoAsyncClient() {
        return DynamoDbAsyncClient.builder()
                .endpointOverride(LOCALSTACK_URI)
                .region(properties.getRegion())
                .build();
    }

    @Bean
    public CloudWatchAsyncClient cloudWatchAsyncClient() {
        return CloudWatchAsyncClient.builder()
                .endpointOverride(LOCALSTACK_URI)
                .region(properties.getRegion())
                .build();
    }
}

You can see we annotated the class with @Profile("localstack") as it overrides the endpoints on the clients to point to our localstack environment running on our own machine.

We can define the same class but without the endpoint overrides and annotate it with @Profile("!localstack") to connect to the actual AWS cloud when not running with the localstack profile.

Kinesis Scheduler

The KinesisScheduler is the heart of the library and will do all our Kinesis magic. We can provide the clients that we just setup, but most importantly provide a ShardRecordProcessor which will be created for every shard the application holds the lease of.

When implementing the ShardRecordProcessor we have a few methods that we can override to adjust the behaviour to our specific needs.

The most important method to override is the processRecords(ProcessRecordsInput input) as it is the entry point for all data coming in from that shard. For our demo, we’ll just log every record that comes in.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class KinesisRecordProcessor implements ShardRecordProcessor {

    @Override
    /* Called when initializing the record processor; can be used to set some MDC properties before receiving data. */
    public void initialize(InitializationInput input) { }

    @Override
    public void processRecords(ProcessRecordsInput input) {
        input.records().forEach(record -> log.info("Received Kinesis message: {}.", record));
    }

    @Override
    /* Called when the lease has been lost to another consumer; can be used to remove some MDC properties. */
    public void leaseLost(LeaseLostInput input) { }

    @Override
    /* Called when the last message of the shard has been processed, and we need to persist our checkpoint. */
    public void shardEnded(ShardEndedInput input) {
        try {
            input.checkpointer().checkpoint();
        } catch (InvalidStateException | ShutdownException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        }
    }

    @Override
    /* Called when the app is shutting down, so we need to persist our current checkpoint. */
    public void shutdownRequested(ShutdownRequestedInput input) {
        try {
            input.checkpointer().checkpoint();
        } catch (InvalidStateException | ShutdownException e) {
            log.error("Exception while checkpointing at shutdown. Giving up.", e);
        }
    }
}

Alright now that we have a ShardRecordProcessor we can initialize the Kinesis Scheduler.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
@RequiredArgsConstructor
public class KinesisConfiguration {

    private final KinesisConfigurationProperties properties;

    @Bean
    public Scheduler scheduler(KinesisAsyncClient kinesis, DynamoDbAsyncClient dynamodb, CloudWatchAsyncClient cloudwatch) {
        ConfigsBuilder configs = new ConfigsBuilder(properties.getStream(), properties.getStream(), kinesis, dynamodb, cloudwatch,
                UUID.randomUUID().toString(),
                KinesisRecordProcessor::new
        );

        return new Scheduler(
                configs.checkpointConfig(),
                configs.coordinatorConfig(),
                configs.leaseManagementConfig(),
                configs.lifecycleConfig(),
                configs.metricsConfig(),
                configs.processorConfig(),
                configs.retrievalConfig().retrievalSpecificConfig(
                        new PollingConfig(properties.getStream(), kinesis)
                )
        );
    }
}

With this scheduler we can now create our deamon thread and start consuming!

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@RequiredArgsConstructor
public class KinesisRunner {

    private final Scheduler scheduler;

    @PostConstruct
    public void run() {
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }
}

Ofcourse to be able to consume data we’ll first need to send some data to the Kinesis stream. Keep in mind the Data property should be Base64 encoded.

1
awslocal kinesis put-records --stream-name some-data-stream --records Data=SGVsbG8gV29ybGQ=,PartitionKey=partition-1

When we run our application using the localstack profile we should see the following logs.

1
INFO : Received Kinesis message: KinesisClientRecord(sequenceNumber=49626058596308151669577377721366592164732241681909284866, approximateArrivalTimestamp=2022-01-23T12:24:27.560Z, data=java.nio.HeapByteBufferR[pos=0 lim=11 cap=11], partitionKey=partition-1, encryptionType=NONE, subSequenceNumber=0, explicitHashKey=null, aggregated=false, schema=null).

Hooray! We’ve successfully implemented the Kinesis Client Library, and made it run locally using our own localstack environment 🎉

Want to see the full application? Here is the Github Repository.

Profile picture

Joery Vreijsen

Software developer • Java & AWS Guru • Github: VR4J • Twitter: @JoeryVreijsen

Bij Kabisa staat privacy hoog in het vaandel. Wij vinden het belangrijk dat er zorgvuldig wordt omgegaan met de data die onze bezoekers achterlaten. Zo zult u op onze website geen tracking-cookies vinden van third-parties zoals Facebook, Hotjar of Hubspot. Er worden alleen cookies geplaatst van Google en Vimeo. Deze worden gebruikt voor analyses, om zo de gebruikerservaring van onze websitebezoekers te kunnen verbeteren. Tevens zorgen deze cookies ervoor dat er relevante advertenties worden getoond. Lees meer over het gebruik van cookies in ons privacy statement.