基于commons-pool2实现FastDFS连接池+心跳检测

yml 配置文件

#fastdfs相关配置
fastdfs:
  connectTimeoutInSeconds: 30
  networkTimeoutInSeconds: 60
  charset: UTF-8
  httpAntiStealToken: no
  httpSecretKey: FastDFS1234567890
  httpTrackerHttpPort: 8888
  trackerServers: 82.157.0.217:22122
  maxStorageConnection: 10 #最大连接数
  defaultPoolSize: 5 #默认连接数
  connectionRetrySize: 5 #连接重试次数
TrackerServer客户端工厂
  1 package cn.com.fileparse.client.fdfs;  5 import lombok.extern.slf4j.Slf4j;
  6 import org.apache.commons.lang.exception.ExceptionUtils;
  7 import org.apache.commons.pool2.BasePooledObjectFactory;
  8 import org.apache.commons.pool2.PooledObject;
  9 import org.apache.commons.pool2.impl.DefaultPooledObject;
 10 import org.csource.fastdfs.ClientGlobal;
 11 import org.csource.fastdfs.ProtoCommon;
 12 import org.csource.fastdfs.TrackerClient;
 13 import org.csource.fastdfs.TrackerServer;
 14 
 15 import java.io.IOException;
 16 import java.util.Properties;
 17 
 18 /**
 19 *@program
 20 *@description TrackerServer客户端工厂
 21 *@author jiaqiangx
 22 *@create 2021-08-13 11:20
 23 */
 24 @Slf4j
 25 public class TrackerServerFactory extends BasePooledObjectFactory {
 26     private FastDfsProperties fastDfsProperties;
 27 
 28     public TrackerServerFactory(FastDfsProperties fastDfsProperties) {
 29         this.fastDfsProperties = fastDfsProperties;
 30     }
 31     private static Properties properties = new Properties();
 32 
 33     @Override
 34     public TrackerServer create() throws FileparseException {
 35         try {
 36             initClientGlobal();
 37         } catch (Exception e) {
 38             log.warn("FastDFS initClientGlobal failed  [{}]", ExceptionUtils.getFullStackTrace(e));
 39             return null;
 40         }
 41         // TrackerClient
 42         TrackerClient trackerClient = new TrackerClient();
 43         // TrackerServer
 44         TrackerServer trackerServer = null;
 45         int flag =0;
 46         try {
 47             trackerServer = trackerClient.getConnection();
 48         } catch (IOException e) {
 49             log.warn("FastDFS connect failed ");
 50             return null;
 51         }
 52         try {
 53             while (trackerServer == null && flag < fastDfsProperties.getConnectionRetrySize()) {
 54                 log.info("[创建TrackerServer(createTrackerServer)][第{}次重建]", flag);
 55                 flag++;
 56                 initClientGlobal();
 57                 trackerServer = trackerClient.getConnection();
 58             }
 59             if(ProtoCommon.activeTest(trackerServer.getSocket())){
 60                 return trackerServer;
 61             }
 62 
 63         } catch (Exception e) {
 64             log.error("[创建TrackerServer(createTrackerServer)][异常:{}]", ExceptionUtils.getFullStackTrace(e));
 65 
 66         } finally {
 67             if (trackerServer != null) {
 68                 try {
 69                     trackerServer.close();
 70                 } catch (Exception e) {
 71                     log.error("[创建TrackerServer(createTrackerServer)--关闭trackerServer异常][异常:{}]", ExceptionUtils.getFullStackTrace(e));
 72                 }
 73             }
 74         }
 75         return trackerServer;
 76 
 77     }
 78 
 79     @Override
 80     public PooledObject wrap(TrackerServer obj) {
 81         return new DefaultPooledObject<>(obj);
 82     }
 83 
 84     /**
 85      * 销毁TrackerServer对象
 86      */
 87     @Override
 88     public void destroyObject(PooledObject ftpPooled) {
 89 
 90         if (ftpPooled != null) {
 91             TrackerServer trackerServer = ftpPooled.getObject();
 92             try {
 93                 trackerServer.close();
 94             } catch (IOException e) {
 95                 log.error("FastDFS client logout failed...{}", ExceptionUtils.getFullStackTrace(e));
 96             }
 97         }
 98 
 99     }
100 
101     /**
102      * 验证TrackerServer对象是否还可用
103      */
104     @Override
105     public boolean validateObject(PooledObject pooledObject) {
106         try {
107             return  ProtoCommon.activeTest(pooledObject.getObject().getSocket());
108         } catch (IOException e) {
109             log.error("Failed to validate client: {}", ExceptionUtils.getFullStackTrace(e));
110         }
111         return false;
112     }
113 
114     private void initClientGlobal() throws Exception {
115         properties.put(ClientGlobal.PROP_KEY_CONNECT_TIMEOUT_IN_SECONDS, fastDfsProperties.getConnectTimeoutInSeconds());
116         properties.put(ClientGlobal.PROP_KEY_NETWORK_TIMEOUT_IN_SECONDS, fastDfsProperties.getNetworkTimeoutInSeconds());
117         properties.put(ClientGlobal.PROP_KEY_CHARSET, fastDfsProperties.getCharset());
118         properties.put(ClientGlobal.PROP_KEY_HTTP_TRACKER_HTTP_PORT, fastDfsProperties.getHttpTrackerHttpPort());
119         properties.put(ClientGlobal.PROP_KEY_HTTP_ANTI_STEAL_TOKEN, fastDfsProperties.getHttpAntiStealToken());
120         properties.put(ClientGlobal.PROP_KEY_HTTP_SECRET_KEY, fastDfsProperties.getHttpSecretKey());
121         properties.put(ClientGlobal.PROP_KEY_TRACKER_SERVERS, fastDfsProperties.getTrackerServers());
122         ClientGlobal.initByProperties(properties);
123     }
124 
125 }

FastDfsProperties 读取yml配置

 1 package cn.com.fileparse.properties;
 2 
 3 import lombok.Data;
 4 import org.springframework.boot.context.properties.ConfigurationProperties;
 5 
 6 /**
 7  * @author jiaqiangx
 8  * @program ebs-file-service
 9  * @description
10  * @create 2021-07-09 09:22
11  */
12 @Data
13 @ConfigurationProperties(prefix = FastDfsProperties.PREFIX, ignoreUnknownFields = false)
14 public class FastDfsProperties {
15     public static final String PREFIX = "fastdfs";
16 
17     private String connectTimeoutInSeconds;
18     private String networkTimeoutInSeconds;
19     private String charset;
20     private String httpAntiStealToken;
21     private String httpSecretKey;
22     private String httpTrackerHttpPort;
23     private String trackerServers;
24     private Integer maxStorageConnection;
25     private Integer connectionRetrySize;
26     private Integer defaultPoolSize;
27 
28 }
FastDFS 连接池
  1 package cn.com.fileparse.client.fdfs;
  2 
  3 import cn.com.fileparse.constant.ResponseCode;
  4 import cn.com.fileparse.exception.FileparseException;
  5 import cn.com.fileparse.util.YmlUtils;
  6 import lombok.extern.slf4j.Slf4j;
  7 import org.apache.commons.lang.exception.ExceptionUtils;
  8 import org.apache.commons.pool2.BaseObjectPool;
  9 import org.csource.fastdfs.TrackerServer;
 10 import org.springframework.util.ObjectUtils;
 11 
 12 import java.util.concurrent.ArrayBlockingQueue;
 13 import java.util.concurrent.BlockingQueue;
 14 import java.util.concurrent.ExecutorService;
 15 import java.util.concurrent.Executors;
 16 import java.util.concurrent.TimeUnit;
 17 
 18 /**
 19 *@program  
 20 *@description FastDFS client 池
 21 *@author jiaqiangx
 22 *@create 2021-08-13 11:20
 23 */
 24 @Slf4j
 25 public class TrackerServerPool extends BaseObjectPool {
 26     private final BlockingQueue dfsBlockingQueue;
 27     private final TrackerServerFactory trackerServerFactory;
 28     private static Integer poolSize =  Integer.valueOf(YmlUtils.getValue("fastdfs.defaultPoolSize"));
 29     private static Integer poolMaxSize =  Integer.valueOf(YmlUtils.getValue("fastdfs.maxStorageConnection"));
 30     /**
 31      * 初始化连接池,需要注入一个工厂来提供TrackerServer实例
 32      *
 33      * @param trackerServerFactory trackerServe工厂
 34      * @throws Exception
 35      */
 36     public TrackerServerPool(TrackerServerFactory trackerServerFactory) throws Exception {
 37         this.trackerServerFactory = trackerServerFactory;
 38         dfsBlockingQueue = new ArrayBlockingQueue<>(poolSize);
 39         initPool(poolSize);
 40     }
 41 
 42 
 43     /**
 44      * 初始化连接池,需要注入一个工厂来提供TrackerServer实例
 45      *
 46      * @param maxPoolSize 最大连接数
 47      * @throws Exception
 48      */
 49 
 50     private void initPool(int maxPoolSize) throws Exception {
 51         ExecutorService executorService = Executors.newFixedThreadPool(1);
 52         executorService.submit(() -> {
 53             log.info("-----Init TrackerServer Connect START------");
 54             for (int i = 0; i < maxPoolSize; i++) {
 55                 // 往池中添加对象
 56                 try {
 57                     addObject();
 58                 }catch (Exception e){
 59                     log.warn("初始化连接池失败");
 60                 }
 61             }
 62         });
 63     }
 64 
 65     /**
 66      * 获取连接
 67      *
 68      * @return
 69      * @throws Exception
 70      */
 71     @Override
 72     public TrackerServer borrowObject() throws FileparseException {
 73         TrackerServer client = null;
 74         try {
 75             //client = dfsBlockingQueue.take();
 76             //从队列中获取值 默认30S超时
 77             client = dfsBlockingQueue.poll(30,TimeUnit.SECONDS);
 78         } catch (InterruptedException e) {
 79             log.info("queue获取任务异常,异常原因:{}", ExceptionUtils.getFullStackTrace(e));
 80         }
 81 
 82         if (ObjectUtils.isEmpty(client)) {
 83             if (dfsBlockingQueue.size() < poolMaxSize) {
 84                 client = trackerServerFactory.create();
 85                 // 放入连接池
 86                 returnObject(client);
 87             }else {
 88                 //连接数已满
 89                 throw new FileparseException(ResponseCode.CONNECT_OVERFLOW);
 90             }
 91             // 验证对象是否有效  这里通过实践验证 如果长时间不校验是否存活,则这里会报通道已断开等错误
 92         } else if (!trackerServerFactory.validateObject(trackerServerFactory.wrap(client))) {
 93             // 对无效的对象进行处理
 94             invalidateObject(client);
 95             // 创建新的对象
 96             client = trackerServerFactory.create();
 97             // 将新的对象放入连接池
 98             returnObject(client);
 99         }
100         return client;
101     }
102 
103     @Override
104     public void returnObject(TrackerServer client) throws FileparseException{
105         try {
106            if (client != null && !dfsBlockingQueue.offer(client, 10, TimeUnit.SECONDS)) {
107                 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client));
108                //连接池已满 抛出指定异常
109                throw new FileparseException(ResponseCode.CONNECT_OVERFLOW);
110             }
111         } catch (InterruptedException e) {
112             log.error("return TrackerServer client interrupted ...{}", ExceptionUtils.getFullStackTrace(e));
113         }
114     }
115 
116     @Override
117     public void invalidateObject(TrackerServer client) {
118         trackerServerFactory.destroyObject(trackerServerFactory.wrap(client));
119         dfsBlockingQueue.remove(client);
120     }
121 
122     /**
123      * 增加一个新的链接,超时失效
124      */
125     @Override
126     public void addObject() throws Exception {
127         // 插入对象到队列
128         dfsBlockingQueue.offer(trackerServerFactory.create(), 10, TimeUnit.SECONDS);
129     }
130 
131     /**
132      * @param trackerServer 需释放的连接对象
133      * @Description: 释放繁忙连接 1.如果空闲池的连接小于最小连接值,就把当前连接放入idleConnectionPool;
134      * 2.如果空闲池的连接等于或大于最小连接值,就把当前释放连接丢弃;
135      */
136     public void checkin(TrackerServer trackerServer) {
137         //log.info("[释放当前连接(checkin)][prams:{}}] ", trackerServer);
138         if (trackerServer != null) {
139             if (dfsBlockingQueue.size() < poolSize) {
140                 dfsBlockingQueue.add(trackerServer);
141             } else {
142                 trackerServerFactory.destroyObject(trackerServerFactory.wrap(trackerServer));
143             }
144         }
145 
146     }
147 
148 
149     /**
150      * 关闭连接池
151      */
152     @Override
153     public void close() {
154         try {
155             while (dfsBlockingQueue.iterator().hasNext()) {
156                 TrackerServer client = dfsBlockingQueue.take();
157                 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client));
158             }
159         } catch (Exception e) {
160             log.error("close TrackerServer client dfsBlockingQueue failed...{}", ExceptionUtils.getFullStackTrace(e));
161         }
162     }
163 
164     public BlockingQueue getDfsBlockingQueue() {
165         return dfsBlockingQueue;
166     }
167 
168 }

心跳检测

 1 package cn.com.fileparse.client.fdfs;
 2 
 3 import lombok.extern.slf4j.Slf4j;
 4 import org.apache.commons.lang.exception.ExceptionUtils;
 5 import org.csource.fastdfs.ProtoCommon;
 6 import org.csource.fastdfs.TrackerServer;
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 
 9 import javax.annotation.PostConstruct;
10 import java.util.Iterator;
11 import java.util.concurrent.BlockingQueue;
12 
13 /**
14 *@program 
15 *@description 检测TrackerServer客户端是否在活着
16 *@author jiaqiangx
17 *@create 2021-06-29 22:42
18 */
19 @Slf4j
20 public class TrackerServerKeepAlive {
21 
22     private KeepAliveThread keepAliveThread;
23 
24     @Autowired
25     private TrackerServerPool trackerServerPool;
26 
27     private final String THREAD_NAME = "tracker-client-alive-thread";
28 
29     /**
30      * 等待时间
31      */
32     public static int waitTimes = 1200;
33 
34 
35     @PostConstruct
36     public void init() {
37         // 启动心跳检测线程
38         if (keepAliveThread == null) {
39             keepAliveThread = new KeepAliveThread();
40             Thread thread = new Thread(keepAliveThread, THREAD_NAME);
41             thread.start();
42         }
43     }
44 
45     class KeepAliveThread implements Runnable {
46         @Override
47         public void run() {
48             TrackerServer ts = null;
49             while (true) {
50                 try {
51                     BlockingQueue pool = trackerServerPool.getDfsBlockingQueue();
52                     if (pool != null && pool.size() > 0) {
53                         Iterator it = pool.iterator();
54                         while (it.hasNext()) {
55                             ts = it.next();
56                             if (ts != null) {
57                                 boolean result = ProtoCommon.activeTest(ts.getSocket());
58                                 log.info("trackerServer心跳检测结果:{} ", result);
59                                 if(!result){
60                                     //清除失效连接
61                                     trackerServerPool.invalidateObject(ts);
62                                 }else {
63                                     //回收空闲连接
64                                     trackerServerPool.checkin(ts);
65                                 }
66                             } else {
67                                 /** 代表已经没有空闲长连接 */
68                                 break;
69                             }
70                         }
71                     }
72                 } catch (Exception e) {
73                     log.error("trackerServer心跳检测异常{}", ExceptionUtils.getFullStackTrace(e));
74                     trackerServerPool.invalidateObject(ts);
75                 }
76                 // 每30s发送一次心跳
77                 try {
78                     Thread.sleep(1000 * 30);
79                 } catch (InterruptedException e) {
80                     log.error("trackerServer休眠异常{}", ExceptionUtils.getFullStackTrace(e));
81                 }
82             }
83 
84         }
85     }
86 }

启动初始化服务

 1 package cn.com.fileparse.config;
 2 
 3 import cn.com.fileparse.client.fdfs.TrackerServerKeepAlive;
 4 import cn.com.fileparse.client.fdfs.TrackerServerFactory;
 5 import cn.com.fileparse.client.fdfs.TrackerServerPool;
 6 import cn.com.fileparse.properties.FastDfsProperties;
 7 import lombok.extern.slf4j.Slf4j;
 8 import org.springframework.beans.factory.annotation.Autowired;
 9 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
10 import org.springframework.boot.context.properties.EnableConfigurationProperties;
11 import org.springframework.context.annotation.Bean;
12 
13 /**
14 *@program 
15 *@description
16 *@author jiaqiangx
17 *@create 2021-06-30 21:52
18 */
19 @Slf4j
20 @EnableConfigurationProperties(FastDfsProperties.class)
21 public class FastDFSConfig {
22     @Autowired
23     FastDfsProperties fastDfsProperties;
24     /**
25      * 客户端工厂
26      *
27      * @return
28      */
29     @Bean
30     public TrackerServerFactory trackerServerFactory() {
31         return new TrackerServerFactory(fastDfsProperties);
32     }
33 
34     /**
35      * 连接池
36      *
37      * @param trackerServerFactory
38      * @return
39      * @throws Exception
40      */
41     @Bean
42     public TrackerServerPool TrackerServerClientPool(TrackerServerFactory trackerServerFactory) throws Exception {
43         return new TrackerServerPool(trackerServerFactory);
44     }
45 
46     /**
47      * TrackerServer心跳检测
48      */
49     @Bean
50     @ConditionalOnBean(TrackerServerPool.class)
51     public TrackerServerKeepAlive trackerServerKeepAlive() {
52         return new TrackerServerKeepAlive();
53     }
54 
55 }

注解配置

 1 package cn.com.fileparse.annotation;
 2 import cn.com.fileparse.config.FastDFSConfig;
 3 import org.springframework.context.annotation.Import;
 4 
 5 import java.lang.annotation.ElementType;
 6 import java.lang.annotation.Retention;
 7 import java.lang.annotation.RetentionPolicy;
 8 import java.lang.annotation.Target;
 9 
10 /**
11 *@program 
12 *@description 启用FastDFS自动配置
13 *@author jiaqiangx
14 *@create 2021-08-13 14:17
15 */
16 @Target(ElementType.TYPE)
17 @Retention(RetentionPolicy.RUNTIME)
18 @Import(FastDFSConfig.class)
19 public @interface EnableFastDFS {
20 }
文件上传下载主类
  1 package cn.com.fileparse.client.fdfs;
  2 
  3 import cn.com.fileparse.annotation.EnableFastDFS;
  4 import cn.com.fileparse.client.ftp.FtpClientPool;
  5 import cn.com.fileparse.constant.ResponseCode;
  6 import cn.com.fileparse.exception.FileparseException;
  7 import cn.com.fileparse.util.common.SpringBeanUtil;
  8 import lombok.extern.slf4j.Slf4j;
  9 import org.apache.commons.io.FilenameUtils;
 10 import org.apache.commons.lang.exception.ExceptionUtils;
 11 import org.apache.commons.lang3.StringUtils;
 12 import org.csource.common.MyException;
 13 import org.csource.common.NameValuePair;
 14 import org.csource.fastdfs.DownloadStream;
 15 import org.csource.fastdfs.FileInfo;
 16 import org.csource.fastdfs.StorageClient;
 17 import org.csource.fastdfs.StorageClient1;
 18 import org.csource.fastdfs.TrackerServer;
 19 import org.csource.fastdfs.UploadCallback;
 20 import org.csource.fastdfs.UploadStream;
 21 import org.springframework.stereotype.Component;
 22 
 23 import java.io.File;
 24 import java.io.FileInputStream;
 25 import java.io.FileNotFoundException;
 26 import java.io.FileOutputStream;
 27 import java.io.IOException;
 28 import java.io.InputStream;
 29 import java.io.OutputStream;
 30 
 31 /**
 32  * FastDFS Java API. 文件上传下载主类.
 33  *

 

 34  */
 35 @Slf4j
 36 @EnableFastDFS
 37 @Component
 38 public class FastDFSClient {
 39 
 40     public static TrackerServer getStorageClient() throws FileparseException{
 41         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
 42         TrackerServer trackerServer = null;
 43         try {
 44             trackerServer = trackerServerPool.borrowObject();
 45         } catch (FileparseException e) {
 46             throw new FileparseException(ResponseCode.GET_DFS_CONNECT_ERROR);
 47         }
 48 
 49         return trackerServer;
 50     }
 51 
 52     public static String upload(File file) throws FileparseException {
 53       return upload(file, null);
 54     }
 55 
 56     public static String upload(File file, String groupName) throws FileparseException {
 57         String filePath;
 58         FileInputStream in = null;
 59         try {
 60             in = new FileInputStream(file);
 61             String fileExtName = FilenameUtils.getExtension(file.getName());
 62             // 设置文件元信息
 63             NameValuePair[] metaList = new NameValuePair[3];
 64             metaList[0] = new NameValuePair("fileExtName", fileExtName);
 65             metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length()));
 66             metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName()));
 67             filePath = upload(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList);
 68         } catch (Exception e) {
 69             log.error("上传文件到fastdfs失败!文件名:{}", file.getName());
 70             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
 71         } finally {
 72             try {
 73                 if (in != null) {
 74                     in.close();
 75                 }
 76             } catch (IOException e) {
 77                 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e));
 78             }
 79         }
 80         return filePath;
 81 
 82     }
 83 
 84     public static String upload(InputStream in, String fileName) throws FileparseException {
 85         String filePath;
 86         try {
 87             String fileExtName = FilenameUtils.getExtension(fileName);
 88             // 设置文件元信息
 89             NameValuePair[] metaList = new NameValuePair[3];
 90             metaList[0] = new NameValuePair("fileExtName", fileExtName);
 91             metaList[1] = new NameValuePair("fileLength", String.valueOf(in.available()));
 92             metaList[2] = new NameValuePair("fileName", getFileNameString(fileName));
 93             filePath = upload(null, in.available(),  new UploadStream(in, in.available()), fileExtName, metaList);
 94         } catch (Exception e) {
 95             log.error("上传文件到fastdfs失败!文件名:{}", fileName);
 96             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
 97         } finally {
 98             try {
 99                 if (in != null) {
100                     in.close();
101                 }
102             } catch (IOException e) {
103                 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e));
104             }
105         }
106         return filePath;
107 
108     }
109 
110     /**
111      * 上传通用方法
112      */
113     public static String upload(String groupName, long file_size, UploadCallback callback, String file_ext_name, NameValuePair[] meta_list) throws FileparseException {
114         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
115         TrackerServer trackerServer = trackerServerPool.borrowObject();
116         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
117         String path = null;
118         try {
119             // 上传
120             path = storageClient.upload_file1(groupName, file_size, callback, file_ext_name, meta_list);
121             if (StringUtils.isBlank(path)) {
122                 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
123             }
124             log.info("上传文件到fastdfs成功!文件地址:{}", path);
125         } catch (IOException e) {
126             log.error("上传文件到fastdfs失败!失败原因:{}", ExceptionUtils.getFullStackTrace(e));
127             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
128         } catch (MyException e) {
129             log.error("上传文件到fastdfs失败!失败原因:{}", ExceptionUtils.getFullStackTrace(e));
130             throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR);
131         } finally {
132             // 返还对象
133             trackerServerPool.checkin(trackerServer);
134         }
135         return path;
136     }
137 
138 
139     public static void download(String filePathName, File file) throws FileNotFoundException, FileparseException {
140         if (!file.exists()) {
141             File parentFile = file.getParentFile();
142             if (!parentFile.exists()) {
143                 parentFile.mkdirs();
144             }
145         }
146         FileOutputStream in = null;
147         try {
148             in = new FileOutputStream(file);
149             download(filePathName, in);
150         } catch (Exception e) {
151             log.error("从fastdfs下载文件失败!失败原因:{}", ExceptionUtils.getFullStackTrace(e));
152             throw new FileparseException(ResponseCode.DOWN_FILE_ERROR_1);
153         }
154     }
155     public static void download(String filePathName, OutputStream output) throws FileparseException {
156         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
157         TrackerServer trackerServer = trackerServerPool.borrowObject();
158         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
159         try {
160             storageClient.download_file1(filePathName, new DownloadStream(output));
161             log.info("从fastdfs下载文件成功!文件名称:{}", filePathName);
162         } catch (Exception e) {
163             log.error("从fastdfs下载文件失败,文件路径:{}", filePathName);
164             throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode());
165         }finally {
166             // 返还对象
167             trackerServerPool.checkin(trackerServer);
168         }
169     }
170 
171     /**
172      * @Description: 根据文件路径删除存储服务器上文件
173      * @Param: [filePath]  SysDocFile.getFilePath()
174      * @return: boolean
175      * @Author: wanfy
176      * @Date: 2019/4/23 10:08 AM
177      */
178     public static boolean deleteFile(String filePath) throws FileparseException{
179         boolean success = false;
180         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
181         TrackerServer trackerServer = trackerServerPool.borrowObject();
182         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
183         try {
184             String group_name = filePath.split("/")[0];
185             String remote_filename = filePath.substring(group_name.length() + 1);
186             int i = storageClient.delete_file(group_name, remote_filename);
187             if (i == 0) {
188                 log.info("已从fastdfs上删除文件");
189                 success = true;
190             }
191         } catch (Exception e) {
192             log.warn("从fastdfs上删除文件失败");
193             throw new FileparseException(ResponseCode.DEF_ERROR_CODE);
194         } finally {
195             // 返还对象
196             trackerServerPool.checkin(trackerServer);
197         }
198         return success;
199     }
200 
201     /**
202      * @Description: 获取文件大小
203      * @Param: [filePath]  SysDocFile.getFilePath()
204      * @return: boolean
205      * @Author: wanfy
206      * @Date: 2019/4/23 10:08 AM
207      */
208     public static long getFileSize(String filePath) throws FileparseException{
209         TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class);
210         TrackerServer trackerServer = trackerServerPool.borrowObject();
211         StorageClient1 storageClient = new StorageClient1(trackerServer, null);
212         try {
213             String group_name = filePath.split("/")[0];
214             String remote_filename = filePath.substring(group_name.length() + 1);
215             FileInfo fileInfo = storageClient.get_file_info(group_name, remote_filename);
216             if (fileInfo == null) {
217                 log.warn("fastdfs查询文件为空");
218                 throw new FileparseException(ResponseCode.DOWN_FILE_IS_NULL);
219             }
220             return fileInfo.getFileSize();
221         } catch (Exception e) {
222             log.warn("从fastdfs上获取文件大小失败");
223             throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode());
224         } finally {
225             // 返还对象
226             trackerServerPool.checkin(trackerServer);
227         }
228     }
229 
230     private static String getFileNameString(String fileName) {
231         String str1;
232         if (fileName.indexOf(".") != -1) {
233             str1 = fileName.substring(0, fileName.lastIndexOf("."));
234         } else {
235             str1 = fileName;
236         }
237         return str1;
238     }
239 }

简单写几个工具类

  1 /**
  2  *
  3  */
  4 package cn.com.fileparse.util.old;
  5 
  6 import cn.com.fileparse.client.fdfs.FastDFSClient;
  7 import cn.com.fileparse.exception.FileparseException;
  8 import lombok.extern.slf4j.Slf4j;
  9 import org.apache.commons.io.FilenameUtils;
 10 import org.csource.common.NameValuePair;
 11 import org.csource.fastdfs.StorageClient1;
 12 import org.csource.fastdfs.StorageServer;
 13 import org.csource.fastdfs.UploadStream;
 14 import org.springframework.stereotype.Component;
 15 
 16 import java.io.File;
 17 import java.io.FileInputStream;
 18 import java.io.FileNotFoundException;
 19 import java.io.FileOutputStream;
 20 import java.io.IOException;
 21 import java.io.InputStream;
 22 import java.io.OutputStream;
 23 
 24 /**
 25  * @author jiaqiangx
 26  */
 27 @Component
 28 @Slf4j
 29 public class FastDFSUtil {
 30 
 31     public static String upload(File file) throws FileparseException{
 32         return FastDFSClient.upload(file, null);
 33     }
 34 
 35     public static String upload(InputStream in, String fileName) throws FileparseException{
 36         return FastDFSClient.upload(in, fileName);
 37     }
 38 
 39     /**
 40     * @Description:上传多个文件到本机的storage服务器 (供CA服务专用)
 41     * @Param: [file]
 42     * @return: java.lang.String
 43     * @Author: wanfy
 44     * @Date: 2019/5/21 3:44 PM
 45     */
 46     public static String uploadFileToLocalhostStorage(File file) {
 47         String filePath;
 48         StorageServer storageServer = null;
 49         try (FileInputStream in = new FileInputStream(file)) {
 50             String fileExtName = FilenameUtils.getExtension(file.getName());
 51             storageServer = new StorageServer("localhost", 23000, 0);
 52             StorageClient1 client = new StorageClient1(null, storageServer);
 53             // 设置文件元信息
 54             NameValuePair[] metaList = new NameValuePair[3];
 55             metaList[0] = new NameValuePair("fileExtName", fileExtName);
 56             metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length()));
 57             metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName()));
 58             filePath = client.upload_file1(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList);
 59             log.info("上传文件到fastdfs成功!");
 60         } catch (Exception e) {
 61             log.error("上传文件到fastdfs失败!文件名:{}", file.getName());
 62             throw new RuntimeException(e);
 63         } finally {
 64             try {
 65                 if (storageServer != null) {
 66                     storageServer.close();
 67                 }
 68             } catch (IOException e) {
 69                 log.warn("storageServer关闭失败");
 70             }
 71         }
 72         return filePath;
 73     }
 74 
 75     public static void download(String filePathName, File file) throws FileNotFoundException,FileparseException {
 76         if (!file.exists()) {
 77             File parentFile = file.getParentFile();
 78             if (!parentFile.exists()) {
 79                 parentFile.mkdirs();
 80             }
 81         }
 82         download(filePathName, new FileOutputStream(file));
 83     }
 84 
 85     public static void download(String filePathName, OutputStream output) throws FileparseException {
 86             FastDFSClient.download(filePathName,output);
 87     }
 88 
 89     /**
 90      * @Description: 根据文件路径删除存储服务器上文件
 91      * @Param: [filePath]  SysDocFile.getFilePath()
 92      * @return: boolean
 93      * @Author: wanfy
 94      * @Date: 2019/4/23 10:08 AM
 95      */
 96     public static boolean deleteFile(String filePath) throws FileparseException {
 97         return FastDFSClient.deleteFile(filePath);
 98     }
 99 
100     /**
101      * @Description: 获取文件大小
102      * @Param: [filePath]  SysDocFile.getFilePath()
103      * @return: boolean
104      * @Author: wanfy
105      * @Date: 2019/4/23 10:08 AM
106      */
107     public static long getFileSize(String filePath) throws FileparseException{
108        return FastDFSClient.getFileSize(filePath);
109     }
110 
111     private static String getFileNameString(String fileName) {
112         String str1;
113         if (fileName.indexOf(".") != -1) {
114             str1 = fileName.substring(0, fileName.lastIndexOf("."));
115         } else {
116             str1 = fileName;
117         }
118         return str1;
119     }
120 
121 }

 

测试类

 1 package cn.com.fileparse.utils;
 2 
 3 import cn.com.fileparse.client.fdfs.FastDFSClient;
 4 import cn.com.fileparse.util.old.FastDFSUtil;
 5 import org.csource.fastdfs.TrackerServer;
 6 import org.junit.Test;
 7 import org.junit.runner.RunWith;
 8 import org.springframework.boot.test.context.SpringBootTest;
 9 import org.springframework.test.context.junit4.SpringRunner;
10 
11 import java.io.File;
12 
13 /**
14  * @author jiaqiangx
15  * @program 
16  * @description
17  * @create 2021-07-12 09:29
18  */
19 @SpringBootTest
20 @RunWith(SpringRunner.class)
21 public class DFSUtilsTest {
22     @Test
23     public void upload(){
24         try {
25             File pdf = new File("D:\\工作\\s.png");
26            String asd = FastDFSUtil.upload(pdf);
27             System.out.println(asd);
28         } catch (Exception e) {
29             e.printStackTrace();
30         }
31     }
32 
33     @Test
34     public void getStorageClient(){
35         try {
36             TrackerServer trackerServer = FastDFSClient.getStorageClient();
37             trackerServer.getSocket();
38         } catch (Exception e) {
39             e.printStackTrace();
40         }
41     }
42 
43 }

 

基于commons-pool2实现FastDFS连接池+心跳检测

上一篇:docker容器dockerfile详解


下一篇:odoo 接口跨域请求报错