yml 配置文件
#fastdfs相关配置 fastdfs: connectTimeoutInSeconds: 30 networkTimeoutInSeconds: 60 charset: UTF-8 httpAntiStealToken: no httpSecretKey: FastDFS1234567890 httpTrackerHttpPort: 8888 trackerServers: 82.157.0.217:22122 maxStorageConnection: 10 #最大连接数 defaultPoolSize: 5 #默认连接数 connectionRetrySize: 5 #连接重试次数
TrackerServer客户端工厂
1 package cn.com.fileparse.client.fdfs; 5 import lombok.extern.slf4j.Slf4j; 6 import org.apache.commons.lang.exception.ExceptionUtils; 7 import org.apache.commons.pool2.BasePooledObjectFactory; 8 import org.apache.commons.pool2.PooledObject; 9 import org.apache.commons.pool2.impl.DefaultPooledObject; 10 import org.csource.fastdfs.ClientGlobal; 11 import org.csource.fastdfs.ProtoCommon; 12 import org.csource.fastdfs.TrackerClient; 13 import org.csource.fastdfs.TrackerServer; 14 15 import java.io.IOException; 16 import java.util.Properties; 17 18 /** 19 *@program 20 *@description TrackerServer客户端工厂 21 *@author jiaqiangx 22 *@create 2021-08-13 11:20 23 */ 24 @Slf4j 25 public class TrackerServerFactory extends BasePooledObjectFactory { 26 private FastDfsProperties fastDfsProperties; 27 28 public TrackerServerFactory(FastDfsProperties fastDfsProperties) { 29 this.fastDfsProperties = fastDfsProperties; 30 } 31 private static Properties properties = new Properties(); 32 33 @Override 34 public TrackerServer create() throws FileparseException { 35 try { 36 initClientGlobal(); 37 } catch (Exception e) { 38 log.warn("FastDFS initClientGlobal failed [{}]", ExceptionUtils.getFullStackTrace(e)); 39 return null; 40 } 41 // TrackerClient 42 TrackerClient trackerClient = new TrackerClient(); 43 // TrackerServer 44 TrackerServer trackerServer = null; 45 int flag =0; 46 try { 47 trackerServer = trackerClient.getConnection(); 48 } catch (IOException e) { 49 log.warn("FastDFS connect failed "); 50 return null; 51 } 52 try { 53 while (trackerServer == null && flag < fastDfsProperties.getConnectionRetrySize()) { 54 log.info("[创建TrackerServer(createTrackerServer)][第{}次重建]", flag); 55 flag++; 56 initClientGlobal(); 57 trackerServer = trackerClient.getConnection(); 58 } 59 if(ProtoCommon.activeTest(trackerServer.getSocket())){ 60 return trackerServer; 61 } 62 63 } catch (Exception e) { 64 log.error("[创建TrackerServer(createTrackerServer)][异常:{}]", ExceptionUtils.getFullStackTrace(e)); 65 66 } finally { 67 if (trackerServer != null) { 68 try { 69 trackerServer.close(); 70 } catch (Exception e) { 71 log.error("[创建TrackerServer(createTrackerServer)--关闭trackerServer异常][异常:{}]", ExceptionUtils.getFullStackTrace(e)); 72 } 73 } 74 } 75 return trackerServer; 76 77 } 78 79 @Override 80 public PooledObject wrap(TrackerServer obj) { 81 return new DefaultPooledObject<>(obj); 82 } 83 84 /** 85 * 销毁TrackerServer对象 86 */ 87 @Override 88 public void destroyObject(PooledObject ftpPooled) { 89 90 if (ftpPooled != null) { 91 TrackerServer trackerServer = ftpPooled.getObject(); 92 try { 93 trackerServer.close(); 94 } catch (IOException e) { 95 log.error("FastDFS client logout failed...{}", ExceptionUtils.getFullStackTrace(e)); 96 } 97 } 98 99 } 100 101 /** 102 * 验证TrackerServer对象是否还可用 103 */ 104 @Override 105 public boolean validateObject(PooledObject pooledObject) { 106 try { 107 return ProtoCommon.activeTest(pooledObject.getObject().getSocket()); 108 } catch (IOException e) { 109 log.error("Failed to validate client: {}", ExceptionUtils.getFullStackTrace(e)); 110 } 111 return false; 112 } 113 114 private void initClientGlobal() throws Exception { 115 properties.put(ClientGlobal.PROP_KEY_CONNECT_TIMEOUT_IN_SECONDS, fastDfsProperties.getConnectTimeoutInSeconds()); 116 properties.put(ClientGlobal.PROP_KEY_NETWORK_TIMEOUT_IN_SECONDS, fastDfsProperties.getNetworkTimeoutInSeconds()); 117 properties.put(ClientGlobal.PROP_KEY_CHARSET, fastDfsProperties.getCharset()); 118 properties.put(ClientGlobal.PROP_KEY_HTTP_TRACKER_HTTP_PORT, fastDfsProperties.getHttpTrackerHttpPort()); 119 properties.put(ClientGlobal.PROP_KEY_HTTP_ANTI_STEAL_TOKEN, fastDfsProperties.getHttpAntiStealToken()); 120 properties.put(ClientGlobal.PROP_KEY_HTTP_SECRET_KEY, fastDfsProperties.getHttpSecretKey()); 121 properties.put(ClientGlobal.PROP_KEY_TRACKER_SERVERS, fastDfsProperties.getTrackerServers()); 122 ClientGlobal.initByProperties(properties); 123 } 124 125 }
FastDfsProperties 读取yml配置
1 package cn.com.fileparse.properties; 2 3 import lombok.Data; 4 import org.springframework.boot.context.properties.ConfigurationProperties; 5 6 /** 7 * @author jiaqiangx 8 * @program ebs-file-service 9 * @description 10 * @create 2021-07-09 09:22 11 */ 12 @Data 13 @ConfigurationProperties(prefix = FastDfsProperties.PREFIX, ignoreUnknownFields = false) 14 public class FastDfsProperties { 15 public static final String PREFIX = "fastdfs"; 16 17 private String connectTimeoutInSeconds; 18 private String networkTimeoutInSeconds; 19 private String charset; 20 private String httpAntiStealToken; 21 private String httpSecretKey; 22 private String httpTrackerHttpPort; 23 private String trackerServers; 24 private Integer maxStorageConnection; 25 private Integer connectionRetrySize; 26 private Integer defaultPoolSize; 27 28 }
FastDFS 连接池
1 package cn.com.fileparse.client.fdfs; 2 3 import cn.com.fileparse.constant.ResponseCode; 4 import cn.com.fileparse.exception.FileparseException; 5 import cn.com.fileparse.util.YmlUtils; 6 import lombok.extern.slf4j.Slf4j; 7 import org.apache.commons.lang.exception.ExceptionUtils; 8 import org.apache.commons.pool2.BaseObjectPool; 9 import org.csource.fastdfs.TrackerServer; 10 import org.springframework.util.ObjectUtils; 11 12 import java.util.concurrent.ArrayBlockingQueue; 13 import java.util.concurrent.BlockingQueue; 14 import java.util.concurrent.ExecutorService; 15 import java.util.concurrent.Executors; 16 import java.util.concurrent.TimeUnit; 17 18 /** 19 *@program 20 *@description FastDFS client 池 21 *@author jiaqiangx 22 *@create 2021-08-13 11:20 23 */ 24 @Slf4j 25 public class TrackerServerPool extends BaseObjectPool { 26 private final BlockingQueue dfsBlockingQueue; 27 private final TrackerServerFactory trackerServerFactory; 28 private static Integer poolSize = Integer.valueOf(YmlUtils.getValue("fastdfs.defaultPoolSize")); 29 private static Integer poolMaxSize = Integer.valueOf(YmlUtils.getValue("fastdfs.maxStorageConnection")); 30 /** 31 * 初始化连接池,需要注入一个工厂来提供TrackerServer实例 32 * 33 * @param trackerServerFactory trackerServe工厂 34 * @throws Exception 35 */ 36 public TrackerServerPool(TrackerServerFactory trackerServerFactory) throws Exception { 37 this.trackerServerFactory = trackerServerFactory; 38 dfsBlockingQueue = new ArrayBlockingQueue<>(poolSize); 39 initPool(poolSize); 40 } 41 42 43 /** 44 * 初始化连接池,需要注入一个工厂来提供TrackerServer实例 45 * 46 * @param maxPoolSize 最大连接数 47 * @throws Exception 48 */ 49 50 private void initPool(int maxPoolSize) throws Exception { 51 ExecutorService executorService = Executors.newFixedThreadPool(1); 52 executorService.submit(() -> { 53 log.info("-----Init TrackerServer Connect START------"); 54 for (int i = 0; i < maxPoolSize; i++) { 55 // 往池中添加对象 56 try { 57 addObject(); 58 }catch (Exception e){ 59 log.warn("初始化连接池失败"); 60 } 61 } 62 }); 63 } 64 65 /** 66 * 获取连接 67 * 68 * @return 69 * @throws Exception 70 */ 71 @Override 72 public TrackerServer borrowObject() throws FileparseException { 73 TrackerServer client = null; 74 try { 75 //client = dfsBlockingQueue.take(); 76 //从队列中获取值 默认30S超时 77 client = dfsBlockingQueue.poll(30,TimeUnit.SECONDS); 78 } catch (InterruptedException e) { 79 log.info("queue获取任务异常,异常原因:{}", ExceptionUtils.getFullStackTrace(e)); 80 } 81 82 if (ObjectUtils.isEmpty(client)) { 83 if (dfsBlockingQueue.size() < poolMaxSize) { 84 client = trackerServerFactory.create(); 85 // 放入连接池 86 returnObject(client); 87 }else { 88 //连接数已满 89 throw new FileparseException(ResponseCode.CONNECT_OVERFLOW); 90 } 91 // 验证对象是否有效 这里通过实践验证 如果长时间不校验是否存活,则这里会报通道已断开等错误 92 } else if (!trackerServerFactory.validateObject(trackerServerFactory.wrap(client))) { 93 // 对无效的对象进行处理 94 invalidateObject(client); 95 // 创建新的对象 96 client = trackerServerFactory.create(); 97 // 将新的对象放入连接池 98 returnObject(client); 99 } 100 return client; 101 } 102 103 @Override 104 public void returnObject(TrackerServer client) throws FileparseException{ 105 try { 106 if (client != null && !dfsBlockingQueue.offer(client, 10, TimeUnit.SECONDS)) { 107 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client)); 108 //连接池已满 抛出指定异常 109 throw new FileparseException(ResponseCode.CONNECT_OVERFLOW); 110 } 111 } catch (InterruptedException e) { 112 log.error("return TrackerServer client interrupted ...{}", ExceptionUtils.getFullStackTrace(e)); 113 } 114 } 115 116 @Override 117 public void invalidateObject(TrackerServer client) { 118 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client)); 119 dfsBlockingQueue.remove(client); 120 } 121 122 /** 123 * 增加一个新的链接,超时失效 124 */ 125 @Override 126 public void addObject() throws Exception { 127 // 插入对象到队列 128 dfsBlockingQueue.offer(trackerServerFactory.create(), 10, TimeUnit.SECONDS); 129 } 130 131 /** 132 * @param trackerServer 需释放的连接对象 133 * @Description: 释放繁忙连接 1.如果空闲池的连接小于最小连接值,就把当前连接放入idleConnectionPool; 134 * 2.如果空闲池的连接等于或大于最小连接值,就把当前释放连接丢弃; 135 */ 136 public void checkin(TrackerServer trackerServer) { 137 //log.info("[释放当前连接(checkin)][prams:{}}] ", trackerServer); 138 if (trackerServer != null) { 139 if (dfsBlockingQueue.size() < poolSize) { 140 dfsBlockingQueue.add(trackerServer); 141 } else { 142 trackerServerFactory.destroyObject(trackerServerFactory.wrap(trackerServer)); 143 } 144 } 145 146 } 147 148 149 /** 150 * 关闭连接池 151 */ 152 @Override 153 public void close() { 154 try { 155 while (dfsBlockingQueue.iterator().hasNext()) { 156 TrackerServer client = dfsBlockingQueue.take(); 157 trackerServerFactory.destroyObject(trackerServerFactory.wrap(client)); 158 } 159 } catch (Exception e) { 160 log.error("close TrackerServer client dfsBlockingQueue failed...{}", ExceptionUtils.getFullStackTrace(e)); 161 } 162 } 163 164 public BlockingQueue getDfsBlockingQueue() { 165 return dfsBlockingQueue; 166 } 167 168 }
心跳检测
1 package cn.com.fileparse.client.fdfs; 2 3 import lombok.extern.slf4j.Slf4j; 4 import org.apache.commons.lang.exception.ExceptionUtils; 5 import org.csource.fastdfs.ProtoCommon; 6 import org.csource.fastdfs.TrackerServer; 7 import org.springframework.beans.factory.annotation.Autowired; 8 9 import javax.annotation.PostConstruct; 10 import java.util.Iterator; 11 import java.util.concurrent.BlockingQueue; 12 13 /** 14 *@program 15 *@description 检测TrackerServer客户端是否在活着 16 *@author jiaqiangx 17 *@create 2021-06-29 22:42 18 */ 19 @Slf4j 20 public class TrackerServerKeepAlive { 21 22 private KeepAliveThread keepAliveThread; 23 24 @Autowired 25 private TrackerServerPool trackerServerPool; 26 27 private final String THREAD_NAME = "tracker-client-alive-thread"; 28 29 /** 30 * 等待时间 31 */ 32 public static int waitTimes = 1200; 33 34 35 @PostConstruct 36 public void init() { 37 // 启动心跳检测线程 38 if (keepAliveThread == null) { 39 keepAliveThread = new KeepAliveThread(); 40 Thread thread = new Thread(keepAliveThread, THREAD_NAME); 41 thread.start(); 42 } 43 } 44 45 class KeepAliveThread implements Runnable { 46 @Override 47 public void run() { 48 TrackerServer ts = null; 49 while (true) { 50 try { 51 BlockingQueue pool = trackerServerPool.getDfsBlockingQueue(); 52 if (pool != null && pool.size() > 0) { 53 Iterator it = pool.iterator(); 54 while (it.hasNext()) { 55 ts = it.next(); 56 if (ts != null) { 57 boolean result = ProtoCommon.activeTest(ts.getSocket()); 58 log.info("trackerServer心跳检测结果:{} ", result); 59 if(!result){ 60 //清除失效连接 61 trackerServerPool.invalidateObject(ts); 62 }else { 63 //回收空闲连接 64 trackerServerPool.checkin(ts); 65 } 66 } else { 67 /** 代表已经没有空闲长连接 */ 68 break; 69 } 70 } 71 } 72 } catch (Exception e) { 73 log.error("trackerServer心跳检测异常{}", ExceptionUtils.getFullStackTrace(e)); 74 trackerServerPool.invalidateObject(ts); 75 } 76 // 每30s发送一次心跳 77 try { 78 Thread.sleep(1000 * 30); 79 } catch (InterruptedException e) { 80 log.error("trackerServer休眠异常{}", ExceptionUtils.getFullStackTrace(e)); 81 } 82 } 83 84 } 85 } 86 }
启动初始化服务
1 package cn.com.fileparse.config; 2 3 import cn.com.fileparse.client.fdfs.TrackerServerKeepAlive; 4 import cn.com.fileparse.client.fdfs.TrackerServerFactory; 5 import cn.com.fileparse.client.fdfs.TrackerServerPool; 6 import cn.com.fileparse.properties.FastDfsProperties; 7 import lombok.extern.slf4j.Slf4j; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; 10 import org.springframework.boot.context.properties.EnableConfigurationProperties; 11 import org.springframework.context.annotation.Bean; 12 13 /** 14 *@program 15 *@description 16 *@author jiaqiangx 17 *@create 2021-06-30 21:52 18 */ 19 @Slf4j 20 @EnableConfigurationProperties(FastDfsProperties.class) 21 public class FastDFSConfig { 22 @Autowired 23 FastDfsProperties fastDfsProperties; 24 /** 25 * 客户端工厂 26 * 27 * @return 28 */ 29 @Bean 30 public TrackerServerFactory trackerServerFactory() { 31 return new TrackerServerFactory(fastDfsProperties); 32 } 33 34 /** 35 * 连接池 36 * 37 * @param trackerServerFactory 38 * @return 39 * @throws Exception 40 */ 41 @Bean 42 public TrackerServerPool TrackerServerClientPool(TrackerServerFactory trackerServerFactory) throws Exception { 43 return new TrackerServerPool(trackerServerFactory); 44 } 45 46 /** 47 * TrackerServer心跳检测 48 */ 49 @Bean 50 @ConditionalOnBean(TrackerServerPool.class) 51 public TrackerServerKeepAlive trackerServerKeepAlive() { 52 return new TrackerServerKeepAlive(); 53 } 54 55 }
注解配置
1 package cn.com.fileparse.annotation; 2 import cn.com.fileparse.config.FastDFSConfig; 3 import org.springframework.context.annotation.Import; 4 5 import java.lang.annotation.ElementType; 6 import java.lang.annotation.Retention; 7 import java.lang.annotation.RetentionPolicy; 8 import java.lang.annotation.Target; 9 10 /** 11 *@program 12 *@description 启用FastDFS自动配置 13 *@author jiaqiangx 14 *@create 2021-08-13 14:17 15 */ 16 @Target(ElementType.TYPE) 17 @Retention(RetentionPolicy.RUNTIME) 18 @Import(FastDFSConfig.class) 19 public @interface EnableFastDFS { 20 }
文件上传下载主类
1 package cn.com.fileparse.client.fdfs; 2 3 import cn.com.fileparse.annotation.EnableFastDFS; 4 import cn.com.fileparse.client.ftp.FtpClientPool; 5 import cn.com.fileparse.constant.ResponseCode; 6 import cn.com.fileparse.exception.FileparseException; 7 import cn.com.fileparse.util.common.SpringBeanUtil; 8 import lombok.extern.slf4j.Slf4j; 9 import org.apache.commons.io.FilenameUtils; 10 import org.apache.commons.lang.exception.ExceptionUtils; 11 import org.apache.commons.lang3.StringUtils; 12 import org.csource.common.MyException; 13 import org.csource.common.NameValuePair; 14 import org.csource.fastdfs.DownloadStream; 15 import org.csource.fastdfs.FileInfo; 16 import org.csource.fastdfs.StorageClient; 17 import org.csource.fastdfs.StorageClient1; 18 import org.csource.fastdfs.TrackerServer; 19 import org.csource.fastdfs.UploadCallback; 20 import org.csource.fastdfs.UploadStream; 21 import org.springframework.stereotype.Component; 22 23 import java.io.File; 24 import java.io.FileInputStream; 25 import java.io.FileNotFoundException; 26 import java.io.FileOutputStream; 27 import java.io.IOException; 28 import java.io.InputStream; 29 import java.io.OutputStream; 30 31 /** 32 * FastDFS Java API. 文件上传下载主类. 33 *
34 */ 35 @Slf4j 36 @EnableFastDFS 37 @Component 38 public class FastDFSClient { 39 40 public static TrackerServer getStorageClient() throws FileparseException{ 41 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 42 TrackerServer trackerServer = null; 43 try { 44 trackerServer = trackerServerPool.borrowObject(); 45 } catch (FileparseException e) { 46 throw new FileparseException(ResponseCode.GET_DFS_CONNECT_ERROR); 47 } 48 49 return trackerServer; 50 } 51 52 public static String upload(File file) throws FileparseException { 53 return upload(file, null); 54 } 55 56 public static String upload(File file, String groupName) throws FileparseException { 57 String filePath; 58 FileInputStream in = null; 59 try { 60 in = new FileInputStream(file); 61 String fileExtName = FilenameUtils.getExtension(file.getName()); 62 // 设置文件元信息 63 NameValuePair[] metaList = new NameValuePair[3]; 64 metaList[0] = new NameValuePair("fileExtName", fileExtName); 65 metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length())); 66 metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName())); 67 filePath = upload(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList); 68 } catch (Exception e) { 69 log.error("上传文件到fastdfs失败!文件名:{}", file.getName()); 70 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 71 } finally { 72 try { 73 if (in != null) { 74 in.close(); 75 } 76 } catch (IOException e) { 77 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e)); 78 } 79 } 80 return filePath; 81 82 } 83 84 public static String upload(InputStream in, String fileName) throws FileparseException { 85 String filePath; 86 try { 87 String fileExtName = FilenameUtils.getExtension(fileName); 88 // 设置文件元信息 89 NameValuePair[] metaList = new NameValuePair[3]; 90 metaList[0] = new NameValuePair("fileExtName", fileExtName); 91 metaList[1] = new NameValuePair("fileLength", String.valueOf(in.available())); 92 metaList[2] = new NameValuePair("fileName", getFileNameString(fileName)); 93 filePath = upload(null, in.available(), new UploadStream(in, in.available()), fileExtName, metaList); 94 } catch (Exception e) { 95 log.error("上传文件到fastdfs失败!文件名:{}", fileName); 96 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 97 } finally { 98 try { 99 if (in != null) { 100 in.close(); 101 } 102 } catch (IOException e) { 103 log.error("close FileInputStream failed...{}", ExceptionUtils.getFullStackTrace(e)); 104 } 105 } 106 return filePath; 107 108 } 109 110 /** 111 * 上传通用方法 112 */ 113 public static String upload(String groupName, long file_size, UploadCallback callback, String file_ext_name, NameValuePair[] meta_list) throws FileparseException { 114 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 115 TrackerServer trackerServer = trackerServerPool.borrowObject(); 116 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 117 String path = null; 118 try { 119 // 上传 120 path = storageClient.upload_file1(groupName, file_size, callback, file_ext_name, meta_list); 121 if (StringUtils.isBlank(path)) { 122 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 123 } 124 log.info("上传文件到fastdfs成功!文件地址:{}", path); 125 } catch (IOException e) { 126 log.error("上传文件到fastdfs失败!失败原因:{}", ExceptionUtils.getFullStackTrace(e)); 127 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 128 } catch (MyException e) { 129 log.error("上传文件到fastdfs失败!失败原因:{}", ExceptionUtils.getFullStackTrace(e)); 130 throw new FileparseException(ResponseCode.UPLOD_FILE_ERROR); 131 } finally { 132 // 返还对象 133 trackerServerPool.checkin(trackerServer); 134 } 135 return path; 136 } 137 138 139 public static void download(String filePathName, File file) throws FileNotFoundException, FileparseException { 140 if (!file.exists()) { 141 File parentFile = file.getParentFile(); 142 if (!parentFile.exists()) { 143 parentFile.mkdirs(); 144 } 145 } 146 FileOutputStream in = null; 147 try { 148 in = new FileOutputStream(file); 149 download(filePathName, in); 150 } catch (Exception e) { 151 log.error("从fastdfs下载文件失败!失败原因:{}", ExceptionUtils.getFullStackTrace(e)); 152 throw new FileparseException(ResponseCode.DOWN_FILE_ERROR_1); 153 } 154 } 155 public static void download(String filePathName, OutputStream output) throws FileparseException { 156 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 157 TrackerServer trackerServer = trackerServerPool.borrowObject(); 158 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 159 try { 160 storageClient.download_file1(filePathName, new DownloadStream(output)); 161 log.info("从fastdfs下载文件成功!文件名称:{}", filePathName); 162 } catch (Exception e) { 163 log.error("从fastdfs下载文件失败,文件路径:{}", filePathName); 164 throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode()); 165 }finally { 166 // 返还对象 167 trackerServerPool.checkin(trackerServer); 168 } 169 } 170 171 /** 172 * @Description: 根据文件路径删除存储服务器上文件 173 * @Param: [filePath] SysDocFile.getFilePath() 174 * @return: boolean 175 * @Author: wanfy 176 * @Date: 2019/4/23 10:08 AM 177 */ 178 public static boolean deleteFile(String filePath) throws FileparseException{ 179 boolean success = false; 180 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 181 TrackerServer trackerServer = trackerServerPool.borrowObject(); 182 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 183 try { 184 String group_name = filePath.split("/")[0]; 185 String remote_filename = filePath.substring(group_name.length() + 1); 186 int i = storageClient.delete_file(group_name, remote_filename); 187 if (i == 0) { 188 log.info("已从fastdfs上删除文件"); 189 success = true; 190 } 191 } catch (Exception e) { 192 log.warn("从fastdfs上删除文件失败"); 193 throw new FileparseException(ResponseCode.DEF_ERROR_CODE); 194 } finally { 195 // 返还对象 196 trackerServerPool.checkin(trackerServer); 197 } 198 return success; 199 } 200 201 /** 202 * @Description: 获取文件大小 203 * @Param: [filePath] SysDocFile.getFilePath() 204 * @return: boolean 205 * @Author: wanfy 206 * @Date: 2019/4/23 10:08 AM 207 */ 208 public static long getFileSize(String filePath) throws FileparseException{ 209 TrackerServerPool trackerServerPool = SpringBeanUtil.getBean(TrackerServerPool.class); 210 TrackerServer trackerServer = trackerServerPool.borrowObject(); 211 StorageClient1 storageClient = new StorageClient1(trackerServer, null); 212 try { 213 String group_name = filePath.split("/")[0]; 214 String remote_filename = filePath.substring(group_name.length() + 1); 215 FileInfo fileInfo = storageClient.get_file_info(group_name, remote_filename); 216 if (fileInfo == null) { 217 log.warn("fastdfs查询文件为空"); 218 throw new FileparseException(ResponseCode.DOWN_FILE_IS_NULL); 219 } 220 return fileInfo.getFileSize(); 221 } catch (Exception e) { 222 log.warn("从fastdfs上获取文件大小失败"); 223 throw new FileparseException(ResponseCode.DOWN_FILE_ERROR.getDesc()+e.getMessage(), ResponseCode.DOWN_FILE_ERROR.getCode()); 224 } finally { 225 // 返还对象 226 trackerServerPool.checkin(trackerServer); 227 } 228 } 229 230 private static String getFileNameString(String fileName) { 231 String str1; 232 if (fileName.indexOf(".") != -1) { 233 str1 = fileName.substring(0, fileName.lastIndexOf(".")); 234 } else { 235 str1 = fileName; 236 } 237 return str1; 238 } 239 }
简单写几个工具类
1 /** 2 * 3 */ 4 package cn.com.fileparse.util.old; 5 6 import cn.com.fileparse.client.fdfs.FastDFSClient; 7 import cn.com.fileparse.exception.FileparseException; 8 import lombok.extern.slf4j.Slf4j; 9 import org.apache.commons.io.FilenameUtils; 10 import org.csource.common.NameValuePair; 11 import org.csource.fastdfs.StorageClient1; 12 import org.csource.fastdfs.StorageServer; 13 import org.csource.fastdfs.UploadStream; 14 import org.springframework.stereotype.Component; 15 16 import java.io.File; 17 import java.io.FileInputStream; 18 import java.io.FileNotFoundException; 19 import java.io.FileOutputStream; 20 import java.io.IOException; 21 import java.io.InputStream; 22 import java.io.OutputStream; 23 24 /** 25 * @author jiaqiangx 26 */ 27 @Component 28 @Slf4j 29 public class FastDFSUtil { 30 31 public static String upload(File file) throws FileparseException{ 32 return FastDFSClient.upload(file, null); 33 } 34 35 public static String upload(InputStream in, String fileName) throws FileparseException{ 36 return FastDFSClient.upload(in, fileName); 37 } 38 39 /** 40 * @Description:上传多个文件到本机的storage服务器 (供CA服务专用) 41 * @Param: [file] 42 * @return: java.lang.String 43 * @Author: wanfy 44 * @Date: 2019/5/21 3:44 PM 45 */ 46 public static String uploadFileToLocalhostStorage(File file) { 47 String filePath; 48 StorageServer storageServer = null; 49 try (FileInputStream in = new FileInputStream(file)) { 50 String fileExtName = FilenameUtils.getExtension(file.getName()); 51 storageServer = new StorageServer("localhost", 23000, 0); 52 StorageClient1 client = new StorageClient1(null, storageServer); 53 // 设置文件元信息 54 NameValuePair[] metaList = new NameValuePair[3]; 55 metaList[0] = new NameValuePair("fileExtName", fileExtName); 56 metaList[1] = new NameValuePair("fileLength", String.valueOf(file.length())); 57 metaList[2] = new NameValuePair("fileName", getFileNameString(file.getName())); 58 filePath = client.upload_file1(null, in.getChannel().size(), new UploadStream(in, file.length()), fileExtName, metaList); 59 log.info("上传文件到fastdfs成功!"); 60 } catch (Exception e) { 61 log.error("上传文件到fastdfs失败!文件名:{}", file.getName()); 62 throw new RuntimeException(e); 63 } finally { 64 try { 65 if (storageServer != null) { 66 storageServer.close(); 67 } 68 } catch (IOException e) { 69 log.warn("storageServer关闭失败"); 70 } 71 } 72 return filePath; 73 } 74 75 public static void download(String filePathName, File file) throws FileNotFoundException,FileparseException { 76 if (!file.exists()) { 77 File parentFile = file.getParentFile(); 78 if (!parentFile.exists()) { 79 parentFile.mkdirs(); 80 } 81 } 82 download(filePathName, new FileOutputStream(file)); 83 } 84 85 public static void download(String filePathName, OutputStream output) throws FileparseException { 86 FastDFSClient.download(filePathName,output); 87 } 88 89 /** 90 * @Description: 根据文件路径删除存储服务器上文件 91 * @Param: [filePath] SysDocFile.getFilePath() 92 * @return: boolean 93 * @Author: wanfy 94 * @Date: 2019/4/23 10:08 AM 95 */ 96 public static boolean deleteFile(String filePath) throws FileparseException { 97 return FastDFSClient.deleteFile(filePath); 98 } 99 100 /** 101 * @Description: 获取文件大小 102 * @Param: [filePath] SysDocFile.getFilePath() 103 * @return: boolean 104 * @Author: wanfy 105 * @Date: 2019/4/23 10:08 AM 106 */ 107 public static long getFileSize(String filePath) throws FileparseException{ 108 return FastDFSClient.getFileSize(filePath); 109 } 110 111 private static String getFileNameString(String fileName) { 112 String str1; 113 if (fileName.indexOf(".") != -1) { 114 str1 = fileName.substring(0, fileName.lastIndexOf(".")); 115 } else { 116 str1 = fileName; 117 } 118 return str1; 119 } 120 121 }
测试类
1 package cn.com.fileparse.utils; 2 3 import cn.com.fileparse.client.fdfs.FastDFSClient; 4 import cn.com.fileparse.util.old.FastDFSUtil; 5 import org.csource.fastdfs.TrackerServer; 6 import org.junit.Test; 7 import org.junit.runner.RunWith; 8 import org.springframework.boot.test.context.SpringBootTest; 9 import org.springframework.test.context.junit4.SpringRunner; 10 11 import java.io.File; 12 13 /** 14 * @author jiaqiangx 15 * @program 16 * @description 17 * @create 2021-07-12 09:29 18 */ 19 @SpringBootTest 20 @RunWith(SpringRunner.class) 21 public class DFSUtilsTest { 22 @Test 23 public void upload(){ 24 try { 25 File pdf = new File("D:\\工作\\s.png"); 26 String asd = FastDFSUtil.upload(pdf); 27 System.out.println(asd); 28 } catch (Exception e) { 29 e.printStackTrace(); 30 } 31 } 32 33 @Test 34 public void getStorageClient(){ 35 try { 36 TrackerServer trackerServer = FastDFSClient.getStorageClient(); 37 trackerServer.getSocket(); 38 } catch (Exception e) { 39 e.printStackTrace(); 40 } 41 } 42 43 }