Spring Boot企业级开发(四)Integration

实战

演示读取https://spring.io/blog.atom 的新闻聚合文件,atom是一种xml文件,且格式是固定的:

<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
  <title>Spring</title>
  <link rel="alternate" href="https://spring.io/blog" />
  <link rel="self" href="https://spring.io/blog.atom" />
  <id>http://spring.io/blog.atom</id>
  <icon>https://spring.io/favicon.ico</icon>
  <updated>2018-07-19T17:28:45Z</updated>
  <entry>
    <title>Spring REST Docs 2.0.2.RELEASE</title>
    <link rel="alternate" href="https://spring.io/blog/2018/07/19/spring-rest-docs-2-0-2-release" />
    <category term="releases" label="Releases" />
    <author>
      <name>Andy Wilkinson</name>
    </author>
    <id>tag:spring.io,2018-07-19:3334</id>
    <updated>2018-07-19T17:28:45Z</updated>
    <content type="html"></content>
  </entry>
  ...
  </feed>

我们将读取到的消息通过分类(Category),将消息转到不同的消息通道,将分类为releases和engineering的消息写入磁盘。
1:新建Spring Boot项目,添加依赖Integration;我们还需要添加Spring Integration对atom的支持:

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-feed</artifactId>
		</dependency>
						
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-java-dsl</artifactId>
			<version>1.1.0.M1</version>
		</dependency>

读取流程:

@Value("https://spring.io/blog.atom") // @Value注解自动获得https://spring.io/blog.atom的资源
	Resource resource;

	@Bean(name = PollerMetadata.DEFAULT_POLLER)
	public PollerMetadata poller() { // 使用Fluent API和Pollers配置默认的轮询方式
		return Pollers.fixedRate(500).get();
	}

	@Bean
	public FeedEntryMessageSource feedMessageSource() throws IOException { //构造feed的入站通道适配器作为数据输入
		FeedEntryMessageSource messageSource = new FeedEntryMessageSource(resource.getURL(), "news");
		return messageSource;
	}

	@Bean
	public IntegrationFlow myFlow() throws IOException {
		return IntegrationFlows.from(feedMessageSource()) //流程从from方法开始
		         //通过路由方法route选择路由,消息体(payload)的类型为SyndEntry,作为判断条件的类型为String,判断的值是通过payload获得的分类Categroy
				.<SyndEntry, String> route(payload -> payload.getCategories().get(0).getName(),
						mapping -> mapping.channelMapping("releases", "releasesChannel") 
								.channelMapping("engineering", "engineeringChannel")//通过不同分类的值转向不同的消息通道
								)

		.get(); // 获得IntegrationFlow实体
	}

3:releases流程

@Bean
	public IntegrationFlow releasesFlow() {
		return IntegrationFlows.from(MessageChannels.queue("releasesChannel", 10)) //从消息通道releasesChannel获取数据
				.<SyndEntry, String> transform(
						payload -> "《" + payload.getTitle() + "》 " + payload.getLink() + getProperty("line.separator")) //1
				.handle(Files.outboundAdapter(new File("e:/springblog")) //2
						.fileExistsMode(FileExistsMode.APPEND) //添加到文件末尾
						.charset("UTF-8") //设置字符
						.fileNameGenerator(message -> "releases.txt") //将消息转换为txtx文件
						.get())
				.get();
	}

1:使用transform方法进行数据转换。payload类型为SyndEntry,将其转换为字符串类型,并自定义数据的格式;
2:处理file的出站适配器,Files类由 Spring Integration Java DSL提供的;

4:engineering流程

@Bean
	public IntegrationFlow engineeringFlow() {
		return IntegrationFlows.from(MessageChannels.queue("engineeringChannel", 10))
				.<SyndEntry, String> transform(
						payload -> "《" + payload.getTitle() + "》 " + payload.getLink() + getProperty("line.separator"))
				.handle(Files.outboundAdapter(new File("e:/springblog"))
						.fileExistsMode(FileExistsMode.APPEND)
						.charset("UTF-8")
						.fileNameGenerator(message -> "engineering.txt")
						.get())
				.get();
	}

运行测试:
查看e:/springblog目录,多了两个文件
Spring Boot企业级开发(四)Integration

releases.txt
Spring Boot企业级开发(四)Integration

engineering.txt
Spring Boot企业级开发(四)Integration

上一篇:atom配置和插件的网络同步


下一篇:VSCode 插件