java – Camel 2.11批量聚合如何与单独的路由一起工作?

首先是一个类似的未回答的问题Joining routes into single aggregator

我们有一些消费者路线(ftp,file,smb)从远程系统读取文件.
简化为使用直接路由进行测试,但与批量使用者的行为类似:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

转换后,一次轮询的所有结果将在一个单独的路径中按批次聚合:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

如果每个消费者分开运行,一切正常.但如果多个消费者并行运行,聚合将拆分民意调查.例如,如果file-consumer轮询500条消息,而第二条路线开始从ftp读取6个文件,那么我们得到2个聚合1,其中500条来自文件,1条来自ftp的6条消息.

测试用例:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

结果是:“A A”,“B”,“A”,“B”,“A”而不是预期的“A A A”,“B B”,“A”,“Z”.
问题:

>我们关于聚合的假设是错误的吗?
>我们如何实现预期的行为?
>如果我们设置了completionTimeout,那么它会在第一次交换时发生超时 – 如果还有新的交换则独立吗?

解决方法:

你几乎让它工作了.这是你需要的改变(在我解释之后).

from("direct:aggregate").id("aggregate")
    .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
    .completionSize(property(Exchange.BATCH_SIZE))
    .to("log:result", "mock:result")

结果将是:

Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A

注意:由于批次大小为7,您将不会收到“Z”的结果.

要解释 – 正如您所读到的,聚合器是一个多功能的骆驼组件,正确定义的关键是:

>聚合表达式
>完成规则

现在,在您的情况下,您将聚合在属性AGGREGATION_PROPERTY上,该属性将为A,B或Z.此外,您还要指定批量大小.

但是,您没有在路线中表达completionSize().相反,你使用了completionFromBatchConsumer – 它做了一些不同的事情(代码声明它寻找Exchange#BATCH_COMPLETE属性),因此产生了奇怪的结果.

无论如何,.completionSize(Exchange.BATCH_SIZE)将使您的测试按需运行.

祝你好运.

上一篇:java Mongondb聚合函数&去重


下一篇:mysql – Django聚合会产生过多的GROUP BY子句