当你使用Samza来实现一个数据流处理逻辑时,你必须实现一个叫StreamTask的接口,如下所示:
public class MyTaskClass implements StreamTask {
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
// process message
}
}
当你运行你的job时,Samza将为你的class创建一些实例(可能在多台机器上)。这些任务实例会处理输入流里的消息。
在你的job的配置中你能告诉Samza你想消费哪条数据流。举一个较为完整的例子(大家也可以参看http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/configuration.html
):
# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass
# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent
# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json
对于Samza从任务的输入流利接收的每一条消息,处理逻辑都会被调用。它主要包含三个重要的信息:消息、关键词key以及消息来自的数据流:
/** Every message that is delivered to a StreamTask is wrapped
* in an IncomingMessageEnvelope, which contains metadata about
* the origin of the message. */
public class IncomingMessageEnvelope {
/** A deserialized message. */
Object getMessage() { ... }
/** A deserialized key. */
Object getKey() { ... }
/** The stream and partition that this message came from. */
SystemStreamPartition getSystemStreamPartition() { ... }
}
注意键和值都要被声明为对象,并且需要转化为正确的类型。如果你不配置一个serializer/deserializer,它们就会成为典型的java字节数组。一个deserializer能够转化这些字节到其他任意类型,举个例子来说j一个son deserializer能够将字节数组转化为Map、List以及字符串对象。
SystemStreamPartition()这个方法会返回一个SystemStreamPartition对象,它会告诉你消息是从哪里来的。它由以下三部分组成:
1. The system:系统的名字来源于消息,就在你job的配置里定义。你可以有多个用于输入和输出的不同名字的系统;
2. The stream name: 在原系统里数据流(话题、队列)的名字。同样也是在job的配置里定义;
3. The partition: 一条数据流通常会被划分到多个分区,并且每一个分区会被Samza安排一个StreamTask实例;
API看起来像是这样的:
/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {
/** The name of the system which provides this stream. It is
defined in the Samza job‘s configuration. */
public String getSystem() { ... }
/** The name of the stream/topic/queue within the system. */
public String getStream() { ... }
/** The partition within the stream. */
public Partition getPartition() { ... }
}
在上面这个job的配置例子里可以看到,这个系统名字叫“Kafka”,数据流的名字叫“PageViewEvent”。(kafka这个名字不是特定的——你能给你的系统取任何你想要的名字)。如果你有一些输入流向导入你的StreamTask,你能够使用SystemStreamPartition去决定你接受到哪一类消息。
如何发送消息呢?如果你看一下StreamTask里的process()方法,你将看到你有一个MessageCollector接口。
/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
void send(OutgoingMessageEnvelope envelope);
}
为了发送一个消息, 你会创建一个OutgoingMessageEnvelop对象并且把它传递给消息收集器。它至少会确定你想要发送的消息、系统以及数据流名字再发送出去。你也可以确定分区的key和另一些参数。具体可以参考javadoc(http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)。
注意事项:
请只在process()方法里使用MessageCollector对象。如果你保持住一个MessageCollector实例并且之后再次使用它,你的消息可能会错误地发送出去。举一个例子,这儿有一个简单的任务,它把每个输入的消息拆成单词,并且发送每一个单词作为一个消息:
public class SplitStringIntoWords implements StreamTask {
// Send outgoing messages to a stream called "words"
// in the "kafka" system.
private final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "words");
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
String message = (String) envelope.getMessage();
for (String word : message.split(" ")) {
// Use the word as the key, and 1 as the value.
// A second task can add the 1‘s to get the word count.
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
}
}
}
Samza的API的概要介绍就到这里吧,很多细节的API可以参看javadoc文档,这也是官网下一节的内容,由于篇幅有限,大家可以自己针对性的去深入了解了解就可以了。下一篇会讲一下之前在架构篇里多次提到的SamzaContainer。