一 sftp搭建
略
这里简单说一下为什么使用sftp。ftp和sftp各有优点,差别并不是太大。sftp安全性好,性能比ftp低。ftp对于java来说并不复杂,效率也高。之所以使用sftp主要是可以使用spring-boot+apache-camel。camel框架将文件传输分为filter,prcessor,和路由,定时器等组件,模块化开发,将可随意将这些组件进行组合,耦合性低,开发较为灵活。可以将更多的精力放到业务层面。
二使用apache-camel来定时从sftp服务器下载文件
2.1 pom依赖
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>2.18.0</version>
</dependency> <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ftp</artifactId>
<version>2.19.4</version>
</dependency>
2.2 applicatin.properties配置
ftp.server.uri=sftp://${ftp.url}\
?username=${ftp.username}\
&password=${ftp.password}\
&useUserKnownHostsFile=false\
&localWorkDirectory=${ftp.local.work.directory}\
&delay=5m\
&filter=#ftpDownloadFileFilter\
&stepwise=false\
&recursive=true
ftp.url=192.168.20.162:22/
ftp.username=test
ftp.password=123456 #文件服务器目录
ftp.local.work.directory=/
# 文件拉取到本地存储的文件
ftp.local.data.dir=E://test/
其中
readLock=rename\ 是否重命名,防止读取文件服务器正在写入的文件
recursive=true 是否递归读取
#有些地方说这里需要显式指定后台运行
camel.springboot.main-run-controller=true
2.3 过滤器
自定义规则判断哪些文件需要下载,哪些文件不需要下载
package com.test.comm; import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import com.test.util.RedisTemplateUtil; @Component
public class FtpDownloadFileFilter implements GenericFileFilter<Object> { private static Logger logger = LoggerFactory.getLogger(FtpDownloadFileFilter.class); @Value("${ftp.local.data.dir}")
private String localDir; @Autowired
private RedisTemplateUtil redisTemplateUtil; /**
* 过滤下载文件
*
* @author sunk
*/
@Override
public boolean accept(GenericFile<Object> file) {
try {
return isDownloaded(file);
} catch (Exception e) {
logger.error("ftp download file filter error !", e);
return false;
}
} /**
* 根据时间戳来判断是否下载过
*
* @param fileName
*
* @return
*/
public boolean isDownloaded(GenericFile<Object> file) {
String fileName = file.getFileName();
if (file.isDirectory()) {
return true;
}
boolean bool = false;
if (fileName.contains("_")) {
long time = Long.parseLong(fileName.split("_")[3]);
// 从redis中获取上次的时间,当前文件时间大于当前时间则获取,否则不获取
Object preTime = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME);
if (preTime == null) {
bool = true;
} else {
if (Long.parseLong(preTime.toString()) < time) {
bool = true;
}
}
}
return bool;
} }
2.4 路由
自定义路由规则,一般是告诉程序,从哪里读文件,并搬运到哪里去
package com.test.comm; import java.net.InetAddress; import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; @Component
public class FtpDownloadRoute extends RouteBuilder { private static Logger logger = LoggerFactory.getLogger(FtpDownloadRoute.class); @Value("${ftp.server.uri}")
private String ftpUri; @Value("${ftp.local.data.dir}")
private String localDir; @Autowired
LocationFileProcessor locationFileProcessor; @Override
public void configure() throws Exception {
logger.debug("开始连接 " + ftpUri);
from(ftpUri).to("file:" + localDir).process(locationFileProcessor).log(LoggingLevel.INFO, logger,
"download file ${file:name} complete.");
logger.debug("连接成功");
} }
2.5 其它自定义进程
除了文件搬运之外,允许自定义对文件的其它操作,比如入库等等
,自定义的类,可添加在路由中
package com.test.comm; import java.io.RandomAccessFile;
import java.util.HashMap; import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFileMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import com.google.gson.Gson;
import com.test.config.ApplicationStartup;
import com.test.model.Device;
import com.test.model.Pictrue;
import com.test.util.DateUtil;
import com.test.util.ESRepository;
import com.test.util.FileUtil;
import com.test.util.RedisTemplateUtil; /**
* camel 业务类
*
* <p>
* Title:LocationFileProcessor
* </p>
* <p>
* Description:TODO
* </p>
* <p>
* Copyright:Copyright(c)2005
* </p>
* <p>
* Company:stest
* </p>
*
* @author
* @date 2018年11月15日 上午9:02:29
*/
@Component
public class LocationFileProcessor implements Processor { private static Logger logger = LoggerFactory.getLogger(LocationFileProcessor.class); @Autowired
private RedisTemplateUtil redisTemplateUtil; @Autowired
private FastDFSClient fastDFSClient; @Value("${ftp.local.data.dir}")
private String localDir; @Autowired
private ESRepository eSRepository; @Value("${elasticsearch.index}")
private String esIndex; @Value("${elasticsearch.type}")
private String esType; @Autowired
private ApplicationStartup applicationStartup; @Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
String fileName = inFileMessage.getGenericFile().getFileName();// 文件名
logger.info(fileName);// 文件的绝对路径
String subfileName = fileName.substring(fileName.lastIndexOf("/") + 1);
long time = Long.parseLong(fileName.split("_")[3]);
// 上传到fastdfs
String path = upload(fileName);
// 将图片地址等信息保存到es
saveEs(subfileName, path);
// 获取当前redis里面保存的时间,如果为空直接存入,如果不为空且当前文件时间大于redis时间,那覆盖
saveRedis(time);
} /**
* 将最后获取图片的时间标记保存至redis
*
* @param time
*/
private void saveRedis(long time) {
Object redisKey = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME);
if (redisKey == null || (redisKey != null && Long.parseLong(redisKey.toString()) < time)) {
redisTemplateUtil.set(Constants.reids.YP_PICTRUE_TIME, time, 0);
}
} /**
* 保存es
*
* @param subfileName
* @param path
*/
private void saveEs(String subfileName, String path) {
String[] fileNames = subfileName.split("_");
String deviceId = fileNames[0];
String plate = fileNames[2].substring(1);
String captrue = fileNames[3];
String type = fileNames[4].split("\\.")[0];
String times = DateUtil.transForDate1(Integer.parseInt(captrue));
captrue = captrue + "000";
// 根据deviceId获取经纬度
HashMap<Integer, Device> devices = applicationStartup.getDevices();
Device device = devices.get(Integer.parseInt(deviceId));
double latitude = 0;
double longitude = 0;
if (device != null) {
latitude = device.getLat();
longitude = device.getLon();
}
String deviceName = device.getDeviceName();
String address = device.getDeviceAddress();
Pictrue pictrue = new Pictrue(deviceId, plate, captrue, type, path, times, latitude, longitude, deviceName,
address, "视频数据");
Gson gson = new Gson();
eSRepository.addTargetDataALL(gson.toJson(pictrue), esIndex, esType, null);
} /**
* 上传fastdfs
*
* @param fileName
* @return
* @throws Exception
*/
private String upload(String fileName) throws Exception {
String path = fastDFSClient.uploadFile(FileUtil.getBytes(localDir + fileName), fileName);
return path;
}
}