使用 Zendesk maxwell 对接 kinesis (include producer and consumer)

之前有写过对接 kafka 的教程,kinesis 作为 aws 的 MQ 中间件,在国外公司以及使用 aws 体系的公司有非常重要的地位。

因为对比 kafka 使用 aws 提供的 kinesis 不仅可以大幅节约成本,而且可以非常方便的通过调整 shards 去平衡处理能力和费用。(shards 的概念有点类似 kafka 里面的 partitiosn 但是 shards 不仅可以增加,也可以减少!所以更像 kinesis 的一个吞吐单位)

 

Producer:

不同于 kafka 或者 msk (aws 托管 kafka) ,kinesis 通过 key、region 、data stream 的名字来指明连接的对象。中间没有 ip 地址的出现,所以会让人觉得很懵逼

在配置 maxwell 的时候我们除了需要配置 config.properties 将我们使用的 backend 调整为 kinesis (默认是使用 kafka),还需要额外配置一个叫 kinesis-producer-library.properties 的配置文件。当我们在 config.properties 中配置使用的 backend 是 kinesis 的时候,maxwell 就会读取 kinesis-producer-library.properties 的配置文件。

我会在下面贴两个已经成功运行的配置文件。

 

config.properties 

# tl;dr config
#log_level=info

#producer=kafka
#kafka.bootstrap.servers=localhost:9092

# mysql login info
#host=localhost
#user=maxwell
#password=maxwell


#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kinesis

# set the log level.  note that you can configure things further in log4j2.xml
#log_level=INFO # [DEBUG, INFO, WARN, ERROR]
log_level=INFO

# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=MAXWELL_

#     *** mysql ***

# mysql host to connect to
host=db-prod.social.conviva.com

# mysql port to connect to
port=3306

# mysql user to connect as.  This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
user=maxwell

# mysql password
password=xxx

# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

# name of the mysql database where maxwell keeps its own state
"config.properties" 366L, 12061C                                                                                                                                                                                                                    1,1           Top
# tl;dr config
#log_level=info

#producer=kafka
#kafka.bootstrap.servers=localhost:9092

# mysql login info
#host=localhost
#user=maxwell
#password=maxwell


#     *** general ***
# choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kinesis

# set the log level.  note that you can configure things further in log4j2.xml
#log_level=INFO # [DEBUG, INFO, WARN, ERROR]
log_level=INFO

# if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs
#env_config_prefix=MAXWELL_

#     *** mysql ***

# mysql host to connect to
host=db-prod.social.conviva.com

# mysql port to connect to
port=3306

# mysql user to connect as.  This user must have REPLICATION SLAVE permissions,
# as well as full access to the `maxwell` (or schema_database) database
user=maxwell

# mysql password
password=AtvxYZikWjAZ5L3hwBft

# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

# name of the mysql database where maxwell keeps its own state
"config.properties" 366L, 12061C                                                                                                                                                                                                                    1,1           Top
# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2

# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60

# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY

# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125

# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag

# To enable Maxwell diagnostic
#http_diagnostic=true # default false

# Diagnostic check timeout in milliseconds, required if diagnostic = true
#http_diagnostic_timeout=10000 # default 10000

#    *** misc ***

# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic.  Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions).  sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]

# output filename when using the "file" producer
#output_file=/path/to/file
                                                                                                                                                                                                                                                    366,1         Bot
# When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /.
#http_path_prefix=/some/path/

# ** The following are Datadog specific. **
# When metrics_type includes datadog this is the way metrics will be reported.
# Options: [udp, http]
# Supplying multiple is not allowed.
#metrics_datadog_type=udp

# datadog tags that should be supplied
#metrics_datadog_tags=tag1:value1,tag2:value2

# The frequency metrics are pushed to datadog, in seconds
#metrics_datadog_interval=60

# required if metrics_datadog_type = http
#metrics_datadog_apikey=API_KEY

# required if metrics_datadog_type = udp
#metrics_datadog_host=localhost # default localhost
#metrics_datadog_port=8125 # default 8125

# Maxwell exposes http diagnostic endpoint to check below in parallel:
# 1. binlog replication lag
# 2. producer (currently kafka) lag

# To enable Maxwell diagnostic
#http_diagnostic=true # default false

# Diagnostic check timeout in milliseconds, required if diagnostic = true
#http_diagnostic_timeout=10000 # default 10000

#    *** misc ***

# maxwell's bootstrapping functionality has a couple of modes.
#
# In "async" mode, maxwell will output the replication stream while it
# simultaneously outputs the database to the topic.  Note that it won't
# output replication data for any tables it is currently bootstrapping -- this
# data will be buffered and output after the bootstrap is complete.
#
# In "sync" mode, maxwell stops the replication stream while it
# outputs bootstrap data.
#
# async mode keeps ops live while bootstrapping, but carries the possibility of
# data loss (due to buffering transactions).  sync mode is safer but you
# have to stop replication.
#bootstrapper=async [sync, async, none]

# output filename when using the "file" producer
#output_file=/path/to/file

 

kinesis-producer-library.properties

# copied from https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties

# You can load a properties file with
# KinesisProducerConfiguration.fromPropertiesFile(String)
#
# Any fields not found in the properties file will take on default values.
#
# The values loaded are checked against any constraints that each respective
# field may have. If there are invalid values an IllegalArgumentException will
# be thrown.


# Enable aggregation. With aggregation, multiple user records are packed into
# a single KinesisRecord. If disabled, each user record is sent in its own
# KinesisRecord.
#
# If your records are small, enabling aggregation will allow you to put many
# more records than you would otherwise be able to for a shard before getting
# throttled.
#
# Default: true
AggregationEnabled = true

# Maximum number of items to pack into an aggregated record.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# Default: 4294967295
# Minimum: 1
# Maximum (inclusive): 9223372036854775807
AggregationMaxCount = 4294967295

# Maximum number of bytes to pack into an aggregated Kinesis record.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# If a record has more data by itself than this limit, it will bypass the
# aggregator. Note the backend enforces a limit of 50KB on record size. If
# you set this beyond 50KB, oversize records will be rejected at the backend.
#
# Default: 51200
# Minimum: 64
# Maximum (inclusive): 1048576
AggregationMaxSize = 51200

# Maximum number of items to pack into an PutRecords request.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# Default: 500
# Minimum: 1
# Maximum (inclusive): 500
CollectionMaxCount = 500

# Maximum amount of data to send with a PutRecords request.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# Records larger than the limit will still be sent, but will not be grouped
# with others.
#
# Default: 5242880
# Minimum: 52224
# Maximum (inclusive): 9223372036854775807
CollectionMaxSize = 5242880

# Timeout (milliseconds) for establishing TLS connections.
#
# Default: 6000
# Minimum: 100
# Maximum (inclusive): 300000
ConnectTimeout = 6000

# Use a custom Kinesis and CloudWatch endpoint.
#
# Mostly for testing use. Note this does not accept protocols or paths, only
# host names or ip addresses. There is no way to disable TLS. The KPL always
# connects with TLS.
#
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
# CustomEndpoint =

# If true, throttled puts are not retried. The records that got throttled
# will be failed immediately upon receiving the throttling error. This is
# useful if you want to react immediately to any throttling without waiting
# for the KPL to retry. For example, you can use a different hash key to send
# the throttled record to a backup shard.
#
# If false, the KPL will automatically retry throttled puts. The KPL performs
# backoff for shards that it has received throttling errors from, and will
# avoid flooding them with retries. Note that records may fail from
# expiration (see record_ttl) if they get delayed for too long because of
# throttling.
#
# Default: false
FailIfThrottled = false

# Minimum level of logs. Messages below the specified level will not be
# logged. Logs for the native KPL daemon show up on stderr.
#
# Default: info
# Expected pattern: info|warning|error
LogLevel = info

# Maximum number of connections to open to the backend. HTTP requests are
# sent in parallel over multiple connections.
#
# Setting this too high may impact latency and consume additional resources
# without increasing throughput.
#
# Default: 24
# Minimum: 1
# Maximum (inclusive): 256
MaxConnections = 24

# Controls the granularity of metrics that are uploaded to CloudWatch.
# Greater granularity produces more metrics.
#
# When "shard" is selected, metrics are emitted with the stream name and
# shard id as dimensions. On top of this, the same metric is also emitted
# with only the stream name dimension, and lastly, without the stream name.
# This means for a particular metric, 2 streams with 2 shards (each) will
# produce 7 CloudWatch metrics, one for each shard, one for each stream, and
# one overall, all describing the same statistics, but at different levels of
# granularity.
#
# When "stream" is selected, per shard metrics are not uploaded; when
# "global" is selected, only the total aggregate for all streams and all
# shards are uploaded.
#
# Consider reducing the granularity if you're not interested in shard-level
# metrics, or if you have a large number of shards.
#
# If you only have 1 stream, select "global"; the global data will be
# equivalent to that for the stream.
#
# Refer to the metrics documentation for details about each metric.
#
# Default: shard
# Expected pattern: global|stream|shard
MetricsGranularity = shard

# Controls the number of metrics that are uploaded to CloudWatch.
#
# "none" disables all metrics.
#
# "summary" enables the following metrics: UserRecordsPut, KinesisRecordsPut,
# ErrorsByCode, AllErrors, BufferingTime.
# "detailed" enables all remaining metrics.
#
# Refer to the metrics documentation for details about each metric.
#
# Default: detailed
# Expected pattern: none|summary|detailed
MetricsLevel = detailed

# The namespace to upload metrics under.
#
# If you have multiple applications running the KPL under the same AWS
# account, you should use a different namespace for each application.
#
# If you are also using the KCL, you may wish to use the application name you
# have configured for the KCL as the the namespace here. This way both your
# KPL and KCL metrics show up under the same namespace.
#
# Default: KinesisProducerLibrary
# Expected pattern: (?!AWS/).{1,255}
MetricsNamespace = KinesisProducerLibrary

# Delay (in milliseconds) between each metrics upload.
#
# For testing only. There is no benefit in setting this lower or higher in
# production.
#
# Default: 60000
# Minimum: 1
# Maximum (inclusive): 60000
MetricsUploadDelay = 60000

# Minimum number of connections to keep open to the backend.
#
# There should be no need to increase this in general.
#
# Default: 1
# Minimum: 1
# Maximum (inclusive): 16
MinConnections = 1

# Server port to connect to. Only useful with custom_endpoint.
#
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
Port = 443

# Limits the maximum allowed put rate for a shard, as a percentage of the
# backend limits.
#
# The rate limit prevents the producer from sending data too fast to a shard.
# Such a limit is useful for reducing bandwidth and CPU cycle wastage from
# sending requests that we know are going to fail from throttling.
#
# Kinesis enforces limits on both the number of records and number of bytes
# per second. This setting applies to both.
#
# The default value of 150% is chosen to allow a single producer instance to
# completely saturate the allowance for a shard. This is an aggressive
# setting. If you prefer to reduce throttling errors rather than completely
# saturate the shard, consider reducing this setting.
#
# Default: 150
# Minimum: 1
# Maximum (inclusive): 9223372036854775807
RateLimit = 150

# Maximum amount of itme (milliseconds) a record may spend being buffered
# before it gets sent. Records may be sent sooner than this depending on the
# other buffering limits.
#
# This setting provides coarse ordering among records - any two records will
# be reordered by no more than twice this amount (assuming no failures and
# retries and equal network latency).
#
# The library makes a best effort to enforce this time, but cannot guarantee
# that it will be precisely met. In general, if the CPU is not overloaded,
# the library will meet this deadline to within 10ms.
#
# Failures and retries can additionally increase the amount of time records
# spend in the KPL. If your application cannot tolerate late records, use the
# record_ttl setting to drop records that do not get transmitted in time.
#
# Setting this too low can negatively impact throughput.
#
# Default: 100
# Maximum (inclusive): 9223372036854775807
RecordMaxBufferedTime = 100

# Set a time-to-live on records (milliseconds). Records that do not get
# successfully put within the limit are failed.
#
# This setting is useful if your application cannot or does not wish to
# tolerate late records. Records will still incur network latency after they
# leave the KPL, so take that into consideration when choosing a value for
# this setting.
#
# If you do not wish to lose records and prefer to retry indefinitely, set
# record_ttl to a large value like INT_MAX. This has the potential to cause
# head-of-line blocking if network issues or throttling occur. You can
# respond to such situations by using the metrics reporting functions of the
# KPL. You may also set fail_if_throttled to true to prevent automatic
# retries in case of throttling.
#
# Default: 30000
# Minimum: 100
# Maximum (inclusive): 9223372036854775807
RecordTtl = 60000

# Which region to send records to.
#
# If you do not specify the region and are running in EC2, the library will
# use the region the instance is in.
#
# The region is also used to sign requests.
#
# Expected pattern: ^([a-z]+-[a-z]+-[0-9])?$
Region = us-east-1

# The maximum total time (milliseconds) elapsed between when we begin a HTTP
# request and receiving all of the response. If it goes over, the request
# will be timed-out.
#
# Note that a timed-out request may actually succeed at the backend. Retrying
# then leads to duplicates. Setting the timeout too low will therefore
# increase the probability of duplicates.
#
# Default: 6000
# Minimum: 100
# Maximum (inclusive): 600000
RequestTimeout = 6000

# Verify the endpoint's certificate. Do not disable unless using
# custom_endpoint for testing. Never disable this in production.
#
# Default: true
VerifyCertificate = true

 然后还需要配合拥有 aws 相关生产者组件的 iam 权限

"kinesis:PutRecord"
"kinesis:PutRecords"
"kinesis:DescribeStream"

  当这些配置完毕之后可以直接去 maxwell 的根目录下执行 

bin/maxwell

即可,这个命令会先去读 config.properties 配置,然后发现是使用 kinesis 会自己去读另外一个 kinesis 相关的配置文件。

 

Consumer:

我是使用 python 的 KCL 客户端,但是这个客户端并不是一个纯粹 python 实现的客户端,最终还是需要使用 java 来启的,所以我们需要 jdk7 以上的 jdk 被安装在部署的机器上。

https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-python-example.html

提供了一个例子,其中包括了一个 pipeline 的处理,以及一套配置文件的默认值,我们可以在本地初始化这两个文件。

另外我们可以在 https://github.com/awslabs/amazon-kinesis-client-python 找到启动消费者的整个流程。

使用

`amazon_kclpy_helper.py --print_command \
    --java <path-to-java> --properties samples/sample.properties`

来启动我们的 python 消费者。同样的认证方式,我们需要将具有 aws 相关权限的 access_key access_secret 放到 ~/.aws/credentials 文件中用以鉴权。

 

 

Reference:

https://maxwells-daemon.io/producers/#kinesis

https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-streams.html

http://maxwells-daemon.io/quickstart

https://github.com/awslabs/amazon-kinesis-client-python

 

上一篇:php – 多用户应用程序记录锁定 – 最好的方法?


下一篇:NUC 折腾笔记 - 储存能力测试