在使用Storm的trident做流计算开发时,遇到一个诡异的问题:
我继承IPartitionedTridentSpout或者IOpaquePartitionedTridentSpout接口做事务型实时计算的开发,类型T通常是用来每个批次序列化到ZK中的偏移量。我遇到的问题是:只要实时应用启动后不终止,每个批次发送的消息的偏移量都是接着上一个批次消息的偏移量继续向后移动的。但是只要应用终止后重新启动,发送的消息就会从消息队列的起始位置重新开始,好像ZK中保存的偏移量根本没起作用。
之前在没有使用trident的时候,我继承的是IPartitionedTransactionalSpout或者IOpaquePartitionedTransactionalSpout接口。这两个接口在处理每个批次时,都会将下一批次消息的偏移量序列化到ZK中,当需要发送下一批次的消息时,是从ZK中去读取下一批次消息的偏移量。按照这个处理逻辑,应该不会出现一直运行就正常,一重启就重头发送消息这种问题。
研究了trident的源码后发现,原来trident对这一块逻辑做了改进,为了减轻ZK的压力,trident在内存中维护了一个TreeMap类型的对象,里面保存了批次ID与偏移量的对应关系。Trident在处理每个批次消息的时候都会既向TreeMap中保存一份偏移量,又向ZK中序列化一份偏移量。在需要处理下一批次消息时,trident只是从内存的TreeMap中读取偏移量,而不需要从ZK中读取偏移量,只有当应用重启时,trident才从ZK中读取偏移量。
这就很好的解释了为什么我的应用一直运行时没有问题,一旦重启,处理的消息就会从头开始。同时,也可以推理出,其实trident序列化到ZK中的偏移量应该是有问题的,这才导致应用重启时没有读出来。
到ZK中去查了一下序列化进去的偏移量,发现果然序列化出现了问题。于是,继续研究trident做序列化的代码。发现原来trident修改了序列化对象的方式。从backtype.storm.transactional.state.TransactionalState里面的setData和getData方法可以看到,非trident的storm使用的是Kryo序列化框架;而从storm.trident.topology.state.TransactionalState里面的setData和getData方法可以看到,trident使用的是JSON-Simple的序列化方式,再具体点是用
String org.json.simple.JSONValue.toJSONString(Object arg0) 方法进行序列化。原生的JSONValue.toJSONString()方法是不能序列化自定义类的。而我之前使用的T类型就是一个自定义类型,这也是导致偏移量没有成功序列化到ZK中的原因。最后我将T类型修改为JSONObject来保存偏移量,解决了序列化到ZK错误的问题。
我不大清楚trident修改序列化方式的目的是什么。它导致之前使用非trident封装Spout的代码无法重用,希望对序列化有深入研究的同学能够指教。