之前有写过对接 kafka 的教程,kinesis 作为 aws 的 MQ 中间件,在国外公司以及使用 aws 体系的公司有非常重要的地位。
因为对比 kafka 使用 aws 提供的 kinesis 不仅可以大幅节约成本,而且可以非常方便的通过调整 shards 去平衡处理能力和费用。(shards 的概念有点类似 kafka 里面的 partitiosn 但是 shards 不仅可以增加,也可以减少!所以更像 kinesis 的一个吞吐单位)
Producer:
不同于 kafka 或者 msk (aws 托管 kafka) ,kinesis 通过 key、region 、data stream 的名字来指明连接的对象。中间没有 ip 地址的出现,所以会让人觉得很懵逼
在配置 maxwell 的时候我们除了需要配置 config.properties 将我们使用的 backend 调整为 kinesis (默认是使用 kafka),还需要额外配置一个叫 kinesis-producer-library.properties 的配置文件。当我们在 config.properties 中配置使用的 backend 是 kinesis 的时候,maxwell 就会读取 kinesis-producer-library.properties 的配置文件。
我会在下面贴两个已经成功运行的配置文件。
config.properties
# tl;dr config #log_level=info #producer=kafka #kafka.bootstrap.servers=localhost:9092 # mysql login info #host=localhost #user=maxwell #password=maxwell # *** general *** # choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis producer=kinesis # set the log level. note that you can configure things further in log4j2.xml #log_level=INFO # [DEBUG, INFO, WARN, ERROR] log_level=INFO # if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs #env_config_prefix=MAXWELL_ # *** mysql *** # mysql host to connect to host=db-prod.social.conviva.com # mysql port to connect to port=3306 # mysql user to connect as. This user must have REPLICATION SLAVE permissions, # as well as full access to the `maxwell` (or schema_database) database user=maxwell # mysql password password=xxx # options to pass into the jdbc connection, given as opt=val&opt2=val2 #jdbc_options=opt1=100&opt2=hello # name of the mysql database where maxwell keeps its own state "config.properties" 366L, 12061C 1,1 Top # tl;dr config #log_level=info #producer=kafka #kafka.bootstrap.servers=localhost:9092 # mysql login info #host=localhost #user=maxwell #password=maxwell # *** general *** # choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis producer=kinesis # set the log level. note that you can configure things further in log4j2.xml #log_level=INFO # [DEBUG, INFO, WARN, ERROR] log_level=INFO # if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs #env_config_prefix=MAXWELL_ # *** mysql *** # mysql host to connect to host=db-prod.social.conviva.com # mysql port to connect to port=3306 # mysql user to connect as. This user must have REPLICATION SLAVE permissions, # as well as full access to the `maxwell` (or schema_database) database user=maxwell # mysql password password=AtvxYZikWjAZ5L3hwBft # options to pass into the jdbc connection, given as opt=val&opt2=val2 #jdbc_options=opt1=100&opt2=hello # name of the mysql database where maxwell keeps its own state "config.properties" 366L, 12061C 1,1 Top # datadog tags that should be supplied #metrics_datadog_tags=tag1:value1,tag2:value2 # The frequency metrics are pushed to datadog, in seconds #metrics_datadog_interval=60 # required if metrics_datadog_type = http #metrics_datadog_apikey=API_KEY # required if metrics_datadog_type = udp #metrics_datadog_host=localhost # default localhost #metrics_datadog_port=8125 # default 8125 # Maxwell exposes http diagnostic endpoint to check below in parallel: # 1. binlog replication lag # 2. producer (currently kafka) lag # To enable Maxwell diagnostic #http_diagnostic=true # default false # Diagnostic check timeout in milliseconds, required if diagnostic = true #http_diagnostic_timeout=10000 # default 10000 # *** misc *** # maxwell's bootstrapping functionality has a couple of modes. # # In "async" mode, maxwell will output the replication stream while it # simultaneously outputs the database to the topic. Note that it won't # output replication data for any tables it is currently bootstrapping -- this # data will be buffered and output after the bootstrap is complete. # # In "sync" mode, maxwell stops the replication stream while it # outputs bootstrap data. # # async mode keeps ops live while bootstrapping, but carries the possibility of # data loss (due to buffering transactions). sync mode is safer but you # have to stop replication. #bootstrapper=async [sync, async, none] # output filename when using the "file" producer #output_file=/path/to/file 366,1 Bot # When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /. #http_path_prefix=/some/path/ # ** The following are Datadog specific. ** # When metrics_type includes datadog this is the way metrics will be reported. # Options: [udp, http] # Supplying multiple is not allowed. #metrics_datadog_type=udp # datadog tags that should be supplied #metrics_datadog_tags=tag1:value1,tag2:value2 # The frequency metrics are pushed to datadog, in seconds #metrics_datadog_interval=60 # required if metrics_datadog_type = http #metrics_datadog_apikey=API_KEY # required if metrics_datadog_type = udp #metrics_datadog_host=localhost # default localhost #metrics_datadog_port=8125 # default 8125 # Maxwell exposes http diagnostic endpoint to check below in parallel: # 1. binlog replication lag # 2. producer (currently kafka) lag # To enable Maxwell diagnostic #http_diagnostic=true # default false # Diagnostic check timeout in milliseconds, required if diagnostic = true #http_diagnostic_timeout=10000 # default 10000 # *** misc *** # maxwell's bootstrapping functionality has a couple of modes. # # In "async" mode, maxwell will output the replication stream while it # simultaneously outputs the database to the topic. Note that it won't # output replication data for any tables it is currently bootstrapping -- this # data will be buffered and output after the bootstrap is complete. # # In "sync" mode, maxwell stops the replication stream while it # outputs bootstrap data. # # async mode keeps ops live while bootstrapping, but carries the possibility of # data loss (due to buffering transactions). sync mode is safer but you # have to stop replication. #bootstrapper=async [sync, async, none] # output filename when using the "file" producer #output_file=/path/to/file
kinesis-producer-library.properties
# copied from https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties # You can load a properties file with # KinesisProducerConfiguration.fromPropertiesFile(String) # # Any fields not found in the properties file will take on default values. # # The values loaded are checked against any constraints that each respective # field may have. If there are invalid values an IllegalArgumentException will # be thrown. # Enable aggregation. With aggregation, multiple user records are packed into # a single KinesisRecord. If disabled, each user record is sent in its own # KinesisRecord. # # If your records are small, enabling aggregation will allow you to put many # more records than you would otherwise be able to for a shard before getting # throttled. # # Default: true AggregationEnabled = true # Maximum number of items to pack into an aggregated record. # # There should be normally no need to adjust this. If you want to limit the # time records spend buffering, look into record_max_buffered_time instead. # # Default: 4294967295 # Minimum: 1 # Maximum (inclusive): 9223372036854775807 AggregationMaxCount = 4294967295 # Maximum number of bytes to pack into an aggregated Kinesis record. # # There should be normally no need to adjust this. If you want to limit the # time records spend buffering, look into record_max_buffered_time instead. # # If a record has more data by itself than this limit, it will bypass the # aggregator. Note the backend enforces a limit of 50KB on record size. If # you set this beyond 50KB, oversize records will be rejected at the backend. # # Default: 51200 # Minimum: 64 # Maximum (inclusive): 1048576 AggregationMaxSize = 51200 # Maximum number of items to pack into an PutRecords request. # # There should be normally no need to adjust this. If you want to limit the # time records spend buffering, look into record_max_buffered_time instead. # # Default: 500 # Minimum: 1 # Maximum (inclusive): 500 CollectionMaxCount = 500 # Maximum amount of data to send with a PutRecords request. # # There should be normally no need to adjust this. If you want to limit the # time records spend buffering, look into record_max_buffered_time instead. # # Records larger than the limit will still be sent, but will not be grouped # with others. # # Default: 5242880 # Minimum: 52224 # Maximum (inclusive): 9223372036854775807 CollectionMaxSize = 5242880 # Timeout (milliseconds) for establishing TLS connections. # # Default: 6000 # Minimum: 100 # Maximum (inclusive): 300000 ConnectTimeout = 6000 # Use a custom Kinesis and CloudWatch endpoint. # # Mostly for testing use. Note this does not accept protocols or paths, only # host names or ip addresses. There is no way to disable TLS. The KPL always # connects with TLS. # # Expected pattern: ^([A-Za-z0-9-\\.]+)?$ # CustomEndpoint = # If true, throttled puts are not retried. The records that got throttled # will be failed immediately upon receiving the throttling error. This is # useful if you want to react immediately to any throttling without waiting # for the KPL to retry. For example, you can use a different hash key to send # the throttled record to a backup shard. # # If false, the KPL will automatically retry throttled puts. The KPL performs # backoff for shards that it has received throttling errors from, and will # avoid flooding them with retries. Note that records may fail from # expiration (see record_ttl) if they get delayed for too long because of # throttling. # # Default: false FailIfThrottled = false # Minimum level of logs. Messages below the specified level will not be # logged. Logs for the native KPL daemon show up on stderr. # # Default: info # Expected pattern: info|warning|error LogLevel = info # Maximum number of connections to open to the backend. HTTP requests are # sent in parallel over multiple connections. # # Setting this too high may impact latency and consume additional resources # without increasing throughput. # # Default: 24 # Minimum: 1 # Maximum (inclusive): 256 MaxConnections = 24 # Controls the granularity of metrics that are uploaded to CloudWatch. # Greater granularity produces more metrics. # # When "shard" is selected, metrics are emitted with the stream name and # shard id as dimensions. On top of this, the same metric is also emitted # with only the stream name dimension, and lastly, without the stream name. # This means for a particular metric, 2 streams with 2 shards (each) will # produce 7 CloudWatch metrics, one for each shard, one for each stream, and # one overall, all describing the same statistics, but at different levels of # granularity. # # When "stream" is selected, per shard metrics are not uploaded; when # "global" is selected, only the total aggregate for all streams and all # shards are uploaded. # # Consider reducing the granularity if you're not interested in shard-level # metrics, or if you have a large number of shards. # # If you only have 1 stream, select "global"; the global data will be # equivalent to that for the stream. # # Refer to the metrics documentation for details about each metric. # # Default: shard # Expected pattern: global|stream|shard MetricsGranularity = shard # Controls the number of metrics that are uploaded to CloudWatch. # # "none" disables all metrics. # # "summary" enables the following metrics: UserRecordsPut, KinesisRecordsPut, # ErrorsByCode, AllErrors, BufferingTime. # "detailed" enables all remaining metrics. # # Refer to the metrics documentation for details about each metric. # # Default: detailed # Expected pattern: none|summary|detailed MetricsLevel = detailed # The namespace to upload metrics under. # # If you have multiple applications running the KPL under the same AWS # account, you should use a different namespace for each application. # # If you are also using the KCL, you may wish to use the application name you # have configured for the KCL as the the namespace here. This way both your # KPL and KCL metrics show up under the same namespace. # # Default: KinesisProducerLibrary # Expected pattern: (?!AWS/).{1,255} MetricsNamespace = KinesisProducerLibrary # Delay (in milliseconds) between each metrics upload. # # For testing only. There is no benefit in setting this lower or higher in # production. # # Default: 60000 # Minimum: 1 # Maximum (inclusive): 60000 MetricsUploadDelay = 60000 # Minimum number of connections to keep open to the backend. # # There should be no need to increase this in general. # # Default: 1 # Minimum: 1 # Maximum (inclusive): 16 MinConnections = 1 # Server port to connect to. Only useful with custom_endpoint. # # Default: 443 # Minimum: 1 # Maximum (inclusive): 65535 Port = 443 # Limits the maximum allowed put rate for a shard, as a percentage of the # backend limits. # # The rate limit prevents the producer from sending data too fast to a shard. # Such a limit is useful for reducing bandwidth and CPU cycle wastage from # sending requests that we know are going to fail from throttling. # # Kinesis enforces limits on both the number of records and number of bytes # per second. This setting applies to both. # # The default value of 150% is chosen to allow a single producer instance to # completely saturate the allowance for a shard. This is an aggressive # setting. If you prefer to reduce throttling errors rather than completely # saturate the shard, consider reducing this setting. # # Default: 150 # Minimum: 1 # Maximum (inclusive): 9223372036854775807 RateLimit = 150 # Maximum amount of itme (milliseconds) a record may spend being buffered # before it gets sent. Records may be sent sooner than this depending on the # other buffering limits. # # This setting provides coarse ordering among records - any two records will # be reordered by no more than twice this amount (assuming no failures and # retries and equal network latency). # # The library makes a best effort to enforce this time, but cannot guarantee # that it will be precisely met. In general, if the CPU is not overloaded, # the library will meet this deadline to within 10ms. # # Failures and retries can additionally increase the amount of time records # spend in the KPL. If your application cannot tolerate late records, use the # record_ttl setting to drop records that do not get transmitted in time. # # Setting this too low can negatively impact throughput. # # Default: 100 # Maximum (inclusive): 9223372036854775807 RecordMaxBufferedTime = 100 # Set a time-to-live on records (milliseconds). Records that do not get # successfully put within the limit are failed. # # This setting is useful if your application cannot or does not wish to # tolerate late records. Records will still incur network latency after they # leave the KPL, so take that into consideration when choosing a value for # this setting. # # If you do not wish to lose records and prefer to retry indefinitely, set # record_ttl to a large value like INT_MAX. This has the potential to cause # head-of-line blocking if network issues or throttling occur. You can # respond to such situations by using the metrics reporting functions of the # KPL. You may also set fail_if_throttled to true to prevent automatic # retries in case of throttling. # # Default: 30000 # Minimum: 100 # Maximum (inclusive): 9223372036854775807 RecordTtl = 60000 # Which region to send records to. # # If you do not specify the region and are running in EC2, the library will # use the region the instance is in. # # The region is also used to sign requests. # # Expected pattern: ^([a-z]+-[a-z]+-[0-9])?$ Region = us-east-1 # The maximum total time (milliseconds) elapsed between when we begin a HTTP # request and receiving all of the response. If it goes over, the request # will be timed-out. # # Note that a timed-out request may actually succeed at the backend. Retrying # then leads to duplicates. Setting the timeout too low will therefore # increase the probability of duplicates. # # Default: 6000 # Minimum: 100 # Maximum (inclusive): 600000 RequestTimeout = 6000 # Verify the endpoint's certificate. Do not disable unless using # custom_endpoint for testing. Never disable this in production. # # Default: true VerifyCertificate = true
然后还需要配合拥有 aws 相关生产者组件的 iam 权限
"kinesis:PutRecord" "kinesis:PutRecords" "kinesis:DescribeStream"
当这些配置完毕之后可以直接去 maxwell 的根目录下执行
bin/maxwell
即可,这个命令会先去读 config.properties 配置,然后发现是使用 kinesis 会自己去读另外一个 kinesis 相关的配置文件。
Consumer:
我是使用 python 的 KCL 客户端,但是这个客户端并不是一个纯粹 python 实现的客户端,最终还是需要使用 java 来启的,所以我们需要 jdk7 以上的 jdk 被安装在部署的机器上。
https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-python-example.html
提供了一个例子,其中包括了一个 pipeline 的处理,以及一套配置文件的默认值,我们可以在本地初始化这两个文件。
另外我们可以在 https://github.com/awslabs/amazon-kinesis-client-python 找到启动消费者的整个流程。
使用
`amazon_kclpy_helper.py --print_command \ --java <path-to-java> --properties samples/sample.properties`
来启动我们的 python 消费者。同样的认证方式,我们需要将具有 aws 相关权限的 access_key access_secret 放到 ~/.aws/credentials 文件中用以鉴权。
Reference:
https://maxwells-daemon.io/producers/#kinesis
https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-streams.html
http://maxwells-daemon.io/quickstart
https://github.com/awslabs/amazon-kinesis-client-python