This package provides an interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon, which is part of the Amazon KCL for Java. Developers can use the Amazon KCL to build distributed applications that process streaming data reliably at scale. The Amazon KCL takes care of many of the complex tasks associated with distributed computing, such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to changes in stream volume. This interface manages the interaction with the MultiLangDaemon so that developers can focus on implementing their record processor executable. A record processor executable typically looks something like:
#!env python from amazon_kclpy import kcl import json, base64 class RecordProcessor(kcl.RecordProcessorBase): def initialize(self, initialiation_input): pass def process_records(self, process_records_input): pass def lease_lost(self, lease_lost_input): pass def shard_ended(self, shard_ended_input): pass def shutdown_requested(self, shutdown_requested_input): pass if __name__ == "__main__": kclprocess = kcl.KCLProcess(RecordProcessor()) kclprocess.run()
Before running the samples, you'll want to make sure that your environment is configured to allow the samples to use your AWS Security Credentials.
By default the samples use the DefaultCredentialsProvider so you'll want to make your credentials available to one of the credentials providers in that provider chain. There are several ways to do this such as providing a ~/.aws/credentials file, or if you're running on EC2, you can associate an IAM role with your instance with appropriate access.
For questions regarding Amazon Kinesis Service and the client libraries please visit the Amazon Kinesis Forums
amazon_kclpy package requires the MultiLangDaemon which is provided
by the Amazon KCL for Java. These jars will be downloaded automatically
install command, but you can explicitly download them with the
From the root of this repo, run:
python setup.py download_jars python setup.py install
amazon_kclpy and boto (used by the sample putter script) and required
jars should be installed in your environment. To start the sample putter, run:
sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster
This will create an Amazon Kinesis stream called words and put the words specified by the -w options into the stream once each. Use -p SECONDS to indicate a period over which to repeatedly put these words.
Now we would like to run an Amazon KCL for Python application that reads records from the stream we just created, but first take a look in the samples directory, you'll find a file called sample.properties, cat that file:
You'll see several properties defined there.
executableName indicates the
executable for the MultiLangDaemon to run,
streamName is the Kinesis stream
to read from,
appName is the Amazon KCL application name to use which will be the
name of an Amazon DynamoDB table that gets created by the Amazon KCL,
initialPositionInStream tells the Amazon KCL how to start reading from shards upon
a fresh startup. To run the sample application you can use a helper script
included in this package. Note you must provide a path to java (version 1.7
or greater) to run the Amazon KCL.
amazon_kclpy_helper.py --print_command \ --java <path-to-java> --properties samples/sample.properties
This will print the command needed to run the sample which you can copy paste, or surround the command with back ticks to run it.
`amazon_kclpy_helper.py --print_command \ --java <path-to-java> --properties samples/sample.properties`
Alternatively, if you don't have the source on hand, but want to run the sample
app you can use the
--sample argument to indicate you'd like to get the
sample.properties file from the installation location.
amazon_kclpy_helper.py --print_command --java <path-to-java> --sample
Running on EC2 is simple. Assuming you are already logged into an EC2 instance running
Amazon Linux, the following steps will prepare your environment for running the sample
app. Note the version of java that ships with Amazon Linux can be found at
/usr/bin/java and should be 1.7 or greater.
sudo yum install python-pip sudo pip install virtualenv virtualenv /tmp/kclpy-sample-env source /tmp/kclpy-sample-env/bin/activate pip install amazon_kclpy
Amazon KCL for Python uses Amazon KCL for Java internally. We have implemented a Java-based daemon, called the MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn the user-defined record processor script/program as a sub-process. The MultiLangDaemon communicates with this sub-process over standard input/output using a simple protocol, and therefore the record processor script/program can be written in any language.
At runtime, there will always be a one-to-one correspondence between a record processor, a child process, and an Amazon Kinesis Shard. The MultiLangDaemon will make sure of that, without any need for the developer to intervene.
In this release, we have abstracted these implementation details away and exposed an interface that enables you to focus on writing record processing logic in Python. This approach enables Amazon KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
RegisterStreamConsumerKinesis API, which provides dedicated throughput compared to
SubscribeToShardKinesis API, which provides lower latencies than
SubscribeToShardare new APIs, and may require updating any explicit IAM policies
RecordProcessorBasewhich supports the new
shutdownmethod from version 2 has been removed and replaced by
leaseLostmethod, which takes
LeaseLostInputobject and is invoked when a lease is lost.
shardEndedmethod, which takes
ShardEndedInputobject and is invoked when all records from a split/merge have been processed.
--log-configurationoption for command generation.
Record processors can now be notified, and given a final opportunity to checkpoint, when the KCL is being shutdown.
binary_datamethod that handles the base 64 decode of the data.
shutdownnow receives a
This library is licensed under the Apache 2.0 License.