2.1 Producer API
We encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client.
You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
Examples showing how to use the producer are given in the javadocs.
推荐大家都用新的Java client来代替旧的scala的client,
A Kafka client that publishes records to the Kafka cluster.
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); //ack方式,all,会等所有的commit最慢的方式
props.put("retries", 0); //失败是否重试,设置会有可能产生重复数据
props.put("batch.size", 16384); //对于每个partition的batch buffer大小
props.put("linger.ms", 1); //等多久,如果buffer没满,比如设为1,即消息发送会多1ms的延迟,如果buffer没满
props.put("buffer.memory", 33554432); //整个producer可以用于buffer的内存大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
对于buffer.memory
:
The buffer.memory
controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted.
When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms
after which it throws a TimeoutException.
producer所能buffer数据的大小,如果数据产生的比发送的快,那么这个buffer会耗尽,因为producer的send的异步的,会先放到buffer,但是如果buffer满了,那么send就会被block,并且当达到max.block.ms时会触发TimeoutException
注意和batch.size的区别,这个是batch的大小
这里send接口,还可以加callback的,
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
2.2 Consumer API
As of the 0.9.0 release we have added a new Java consumer to replace our existing high-level ZooKeeper-based consumer and low-level consumer APIs.
虽然在0.9中已经提供新的consumer,但是当前仍然可以使用老的接口
2.2.3 New Consumer API
This new unified consumer API removes the distinction between the 0.8 high-level and low-level consumer APIs. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
Examples showing how to use the consumer are given in the javadocs.
Detecting Consumer Failures
The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned.
新版的consumer,通过poll来表示consumer的活性,即如果consumer是不断的在调用poll的,那么我们就认为这个consumer是正常的
这样有个问题,如果你的操作时间比较长,或是取得的records数目太多,会导致poll的间隔比较长导致超时;当然也可以设置上配置
-
session.timeout.ms
: By increasing the session timeout, you can give the consumer more time to handle a batch of records returned frompoll(long)
. The only drawback is that it will take longer for the server to detect hard consumer failures, which can cause a delay before a rebalance can be completed. However, clean shutdown withclose()
is not impacted since the consumer will send an explicit message to the server to leave the group and cause an immediate rebalance. -
max.poll.records
: Processing time in the poll loop is typically proportional to the number of records processed, so it's natural to want to set a limit on the number of records handled at once. This setting provides that. By default, there is essentially no limit.
可以配置session.timeout.ms
,让timeout的时候长些
也可以通过max.poll.records
,限制一次poll的条目数
你也可以把真正的逻辑,放在其他线程去做,然后尽量快点去poll;但这里注意,在处理完后需要commit;
如果要保证数据不丢,往往不会依赖auto commit,而是当逻辑处理完后,再手动的commit;如果处理延迟太长,该consumer已经超时,此时去做commit,会报CommitFailedException
异常
Automatic Offset Committing
自动offset commit的例子,
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true"); //自动commit
props.put("auto.commit.interval.ms", "1000"); //定时commit的周期
props.put("session.timeout.ms", "30000"); //consumer活性超时时间
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar")); //subscribe,foo,bar,两个topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); //100是超时等待时间
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
Manual Offset Control
手工commit offset,
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false"); //关闭自动commit
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync(); //批量完成写入后,手工sync offset
buffer.clear();
}
}
上面的方式,批量的sync offset
The above example uses commitSync
to mark all received messages as committed. In some cases you may wish to have even finer control over which messages have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the messages in each partition.
更细粒度的commitSync
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) { //按partition处理
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); //取出partition对应的Records
for (ConsumerRecord<String, String> record : partitionRecords) { //处理每条record
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //取出last offset
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); //独立的sync每个partition的offset
}
}
} finally {
consumer.close();
}
这里为何lastOffset要加1,因为你要commit的是,你下一条要读的log的offset,所以一定是当前的offset+1
Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets)
you should add one to the offset of the last message processed.
Manual Partition Assignment
In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group.
However, in some cases you may need finer control over the specific partitions that are assigned.
For example:
- If the process is maintaining some kind of local state associated with that partition (like a local on-disk key-value store), then it should only get records for the partition it is maintaining on disk.
- If the process itself is highly available and will be restarted if it fails (perhaps using a cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process will be restarted on another machine.
更细粒度,读取某个topic的某个partition,这样就不依赖kafka的动态assign,
To use this mode, instead of subscribing to the topic using subscribe
, you just call assign(Collection)
with the full list of partitions that you want to consume.
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
调用consumer.assign,指定该consumer读topic “foo”中的0,1两个partition
Once assigned, you can call poll
in a loop, just as in the preceding examples to consume records.
The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign
.
然后仍然用poll读取records,仍然用定义的consumer group来committing offsets;但是这个对应关系,除非再次调用assign,否则不会改变
Controlling The Consumer's Position
In most use cases the consumer will simply consume records from beginning to end, periodically committing its position (either automatically or manually). However Kafka allows the consumer to manually control its position, moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to the most recent records without actually consuming the intermediate records.
重置offset,
Kafka allows specifying the position using seek(TopicPartition, long)
to specify the new position. 重置某个partition的offset
Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection)
and seekToEnd(Collection)
respectively).
Consumption Flow Control
If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.
One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider fetching other topics.
Kafka supports dynamic controlling of consumption flows by using pause(Collection)
and resume(Collection)
to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the futurepoll(long)
calls.
如果consumer被assign多个partititions,那么他们是被同时,相同优先级读取的;
由于某些原因,你可能想优先读取其中部分partitions,
所以这里提供,pause或resume接口
注意,这里pause,只是consumer在poll这些partitions的时候,不真正返回records,但还是会去做poll,不会造成rebalance
Managing Consumer Groups
With the ConsumerGroupCommand tool, we can list, delete, or describe consumer groups. For example, to list all consumer groups across all topics:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list test-consumer-group
To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group test-consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
test-consumer-group test-foo 0 1 3 2 test-consumer-group_postamac.local-1456198719410-29ccd54f-0
When you're using the new consumer API where the broker handles coordination of partition handling and rebalance, you can manage the groups with the "--new-consumer" flags:
> bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server broker1:9092 --list
2.3 Streams API
As of the 0.10.0 release we have added a new client library named Kafka Streams to let users implement their stream processing applications with data stored in Kafka topics.
alpha版本
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
Kafka Streams allows for performing continuous computation on input coming from one or more input topics and sends output to zero or more output topics.
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder();
builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
我的理解是,把Samza给integretion进来了,因为单独的samza没人用
4.6 Message Delivery Semantics
So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages.
Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.
kafka是默认提供at-least-once语义的,
只需要关闭producer的重发机制并且在收到message后就先直接commit,就可以达到at most once语义
理论上,单靠kafka是无法实现 Exactly-once 的,需要配合其他如存储系统进行去重
单纯通过offset,是无法解,producer导致的duplication的,因为kafka里面的数据本身就是重复的
Availability and Durability Guarantees
When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas.
Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify acks=all will succeed. However, these writes could be lost if the remaining replica also fails.
Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability.
Therefore, we provide two topic-level configurations that can be used to prefer message durability over availability:
- Disable unclean leader election - if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss. See the previous section on Unclean Leader Election for clarification.
- Specify a minimum ISR size - the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses acks=all and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.
当producer的设置为,acknowledgement by all replicas,这里的all replicas不是指AR,而是指ISR,所以虽然是3 replicas,但如果只有一个replica alive,那么只要这个replica ack就算是all ack
如果你更倾向于durability,而非availability,有两个设置,
1. 关闭unclean leader election, 这样leader必须要是在isr中的replica,如果没有replica available,那么该partition会offline,这样可以在牺牲可用性的情况下,降低丢数据的可能性
2. Specify a minimum ISR size,如果ISR的数目小于这个值,那么这个partition就会直接offline
这个配置仅仅在acks=all时才有意义,这样只要有minimum个isr,并完成ack,我们就可以认为该all ack
明显这个做法也会大大降低可用性
4.8 Log Compaction
Log compaction ensures that Kafka will always retain at least the last known value for each message key within the log of data for a single topic partition. It addresses use cases and scenarios such as restoring state after application crashes or system failure, or reloading caches after application restarts during operational maintenance.
当前管理consumer的offset应该就是用的这个方案
细节用到再看吧
4.9 Quotas
Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id="test-client" has a produce quota of 10MB/sec, this is shared across all instances with that same id.
By default, each unique client-id receives a fixed quota in bytes/sec as configured by the cluster (quota.producer.default, quota.consumer.default). This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!
0.9加入,限流的功能
通过quota.producer.default, quota.consumer.default来限定某个client-id 的fixed quota in bytes/sec ;注意这个限流是per-broker的,而不是per-cluster的
How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under it's quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client-side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.
Client byte rate is measured over multiple small windows (e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly.
Typically, having large measurement windows (for e.g. 10 windows of 30 seconds each) leads to large bursts of traffic followed by long delays which is not great in terms of user experience.
broker当发现quota violation时,不会直接拒绝响应,而是去delay response;这样对client的影响会比较小
并且我们在做流量统计的时候,是基于多个小时间窗口,这样更准确一些
It is possible to override the default quota for client-ids that need a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides. Client-id overrides are written to ZooKeeper under/config/clients. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See here for details.
当然你可以通过动态配置去修改某个client的quotas配置,Client-id overrides are written to ZooKeeper under/config/clients.
The following sets the default quota per producer and consumer client-id to 10MB/sec.
quota.producer.default=10485760
quota.consumer.default=10485760
It is also possible to set custom quotas for each client.
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name clientA --entity-type clients
Updated config for clientId: "clientA".
Here's how to describe the quota for a given client.
> ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name clientA --entity-type clients
Configs for clients:clientA are producer_byte_rate=1024,consumer_byte_rate=2048
7. Security
7.1 Security Overview
In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:
- Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL (Kerberos). SASL/PLAIN can also be used from release 0.10.0.0 onwards.
- Authentication of connections from brokers to ZooKeeper
- Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)
- Authorization of read / write operations by clients
- Authorization is pluggable and integration with external authorization services is supported
1. 可以对brokers和clients,brokers,tools之间的connection进行Authentication,使用SSL,SASL
数字证书原理,很详细
SSL的延迟,http://www.ruanyifeng.com/blog/2014/09/ssl-latency.html
2. 对brokers和zookeeper之间的连接进行Authentication
3. 数据传输用SSL加密,性能会下降
4. 对clients的读写操作进行Authorization
5. Authorization 是pluggable,并可以使用外部的authorization services
实际使用的方式还是比较复杂的,后面用到再看吧
8. Kafka Connect
8.1 Overview
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.
提供kafka的导入和导出工具
Kafka Connect currently supports two modes of execution: standalone (single process) and distributed.
In standalone mode all work is performed in a single process. You can start a standalone process with the following command:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
distribute,
> bin/connect-distributed.sh config/connect-distributed.properties
In particular, the following configuration parameters are critical to set before starting your cluster:
-
group.id
(defaultconnect-cluster
) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDs -
config.storage.topic
(defaultconnect-configs
) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions. -
offset.storage.topic
(defaultconnect-offsets
) - topic to use for storing offsets; this topic should have many partitions and be replicated -
status.storage.topic
(defaultconnect-status
) - topic to use for storing statuses; this topic can have multiple partitions and should be replicated
上面4个配置是需要在connect-*.properties里面指定的
对于standalone,在命令行中还需要指明connector的配置,
但是对于distributed的方式,use the REST API described below to create, modify, and destroy connectors.
-
name
- Unique name for the connector. Attempting to register again with the same name will fail. -
connector.class
- The Java class for the connector -
tasks.max
- The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
-
topics
- A list of topics to use as input for this connector
REST API
Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default this service runs on port 8083. The following are the currently supported endpoints:
-
GET /connectors
- return a list of active connectors -
POST /connectors
- create a new connector; the request body should be a JSON object containing a stringname
field and a objectconfig
field with the connector configuration parameters -
GET /connectors/{name}
- get information about a specific connector - 。。。。。。
总结,
比较有价值的改动是,使用新的Producer和Consumer client,尤其Consumer会大大降低之前使用low-level consumer的复杂度
增加安全组件,支持SSL和SASL来做安全验证
至于stream API和connect,有些鸡肋