实战
演示读取https://spring.io/blog.atom 的新闻聚合文件,atom是一种xml文件,且格式是固定的:
<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>Spring</title>
<link rel="alternate" href="https://spring.io/blog" />
<link rel="self" href="https://spring.io/blog.atom" />
<id>http://spring.io/blog.atom</id>
<icon>https://spring.io/favicon.ico</icon>
<updated>2018-07-19T17:28:45Z</updated>
<entry>
<title>Spring REST Docs 2.0.2.RELEASE</title>
<link rel="alternate" href="https://spring.io/blog/2018/07/19/spring-rest-docs-2-0-2-release" />
<category term="releases" label="Releases" />
<author>
<name>Andy Wilkinson</name>
</author>
<id>tag:spring.io,2018-07-19:3334</id>
<updated>2018-07-19T17:28:45Z</updated>
<content type="html"></content>
</entry>
...
</feed>
我们将读取到的消息通过分类(Category),将消息转到不同的消息通道,将分类为releases和engineering的消息写入磁盘。
1:新建Spring Boot项目,添加依赖Integration;我们还需要添加Spring Integration对atom的支持:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-feed</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.1.0.M1</version>
</dependency>
读取流程:
@Value("https://spring.io/blog.atom") // @Value注解自动获得https://spring.io/blog.atom的资源
Resource resource;
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() { // 使用Fluent API和Pollers配置默认的轮询方式
return Pollers.fixedRate(500).get();
}
@Bean
public FeedEntryMessageSource feedMessageSource() throws IOException { //构造feed的入站通道适配器作为数据输入
FeedEntryMessageSource messageSource = new FeedEntryMessageSource(resource.getURL(), "news");
return messageSource;
}
@Bean
public IntegrationFlow myFlow() throws IOException {
return IntegrationFlows.from(feedMessageSource()) //流程从from方法开始
//通过路由方法route选择路由,消息体(payload)的类型为SyndEntry,作为判断条件的类型为String,判断的值是通过payload获得的分类Categroy
.<SyndEntry, String> route(payload -> payload.getCategories().get(0).getName(),
mapping -> mapping.channelMapping("releases", "releasesChannel")
.channelMapping("engineering", "engineeringChannel")//通过不同分类的值转向不同的消息通道
)
.get(); // 获得IntegrationFlow实体
}
3:releases流程
@Bean
public IntegrationFlow releasesFlow() {
return IntegrationFlows.from(MessageChannels.queue("releasesChannel", 10)) //从消息通道releasesChannel获取数据
.<SyndEntry, String> transform(
payload -> "《" + payload.getTitle() + "》 " + payload.getLink() + getProperty("line.separator")) //1
.handle(Files.outboundAdapter(new File("e:/springblog")) //2
.fileExistsMode(FileExistsMode.APPEND) //添加到文件末尾
.charset("UTF-8") //设置字符
.fileNameGenerator(message -> "releases.txt") //将消息转换为txtx文件
.get())
.get();
}
1:使用transform方法进行数据转换。payload类型为SyndEntry,将其转换为字符串类型,并自定义数据的格式;
2:处理file的出站适配器,Files类由 Spring Integration Java DSL提供的;
4:engineering流程
@Bean
public IntegrationFlow engineeringFlow() {
return IntegrationFlows.from(MessageChannels.queue("engineeringChannel", 10))
.<SyndEntry, String> transform(
payload -> "《" + payload.getTitle() + "》 " + payload.getLink() + getProperty("line.separator"))
.handle(Files.outboundAdapter(new File("e:/springblog"))
.fileExistsMode(FileExistsMode.APPEND)
.charset("UTF-8")
.fileNameGenerator(message -> "engineering.txt")
.get())
.get();
}
运行测试:
查看e:/springblog目录,多了两个文件
releases.txt
engineering.txt