简介
ssh 默认的连接数量有限,当大量请求连接 ssh 时会概率性连接失败甚至直接失败,因此需要对连接池化,当然如果不要求实时的话可以用生产者消费者。
了解 commons-pool2
依赖
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.1</version>
</dependency>
核心概念
- PooledObjectState:池化对象的状态
- IDLE:在队列中,未使用。
- ALLOCATED:正在使用。
- EVICTION:在队列中,正在判断是否满足被驱逐的条件。
- EVICTION_RETURN_TO_HEAD:不在队列中,目前正在测试是否可能被驱逐。 测试时尝试借用对象,将其从队列中删除。 一旦
测试完成,它应该返回到队列的头部。 - VALIDATION:在队列中,当前正在验证。
- VALIDATION_PREALLOCATED:预分配状态,不在队列中,还在测试,测试成功后分配。
- VALIDATION_RETURN_TO_HEAD:不在队列中,目前正在验证。 测试时尝试借用对象,将其从队列中删除。 一旦
测试完成,它应该返回到队列的头部。 - INVALID:无效的,将被销毁或已经被销毁。
- ABANDONED:废弃的
- RETURNING:使用完毕,正在归还到池中。
- PooledObject:池化对象的包装器,包含被包装的对象、状态以及使用信息,默认实现是
DefaultPooledObject
。 - PooledObjectFactory:对象工厂,管理池对象的生命周期(借出、验证和销毁等等),常用的实现是
BasePooledObjectFactory
。 - BaseObjectPoolConfig:对象池配置,实现有
GenericObjectPoolConfig
等等,常用参数如下。- minIdle:对象池最小空闲对象数,默认为
0
。 - maxIdle:对象池最大空闲对象数,默认为
8
。 - maxTotal:对象池最大可用对象数,默认为
8
。 - maxWaitMillis:获取对象是最大等待时间,默认为
-1
(负值时为无限等待)。 - testOnBorrow:从对象池中借出对象时是否进行测试,默认为
false
。 - testOnReturn:返回给对象池时是否测试对象可用,默认为
false
。 - testWhileIdle:空闲对象驱逐验证开关,打开后定时验证对象池中空闲对象是否可用,不可用就销毁该对象,默认为
false
。 - timeBetweenEvictionRunsMillis:上次空闲对象驱逐验证完成后多久开始下次验证,与 testWhileIdle 配合使用,默认为
-1
(负值时为无限等待)。
- minIdle:对象池最小空闲对象数,默认为
- ObjectPool:对象池。常用的实现是
GenericObjectPool
,它可以配置PooledObjectFactory
(对象工厂)、GenericObjectPoolConfig
(对象池的配置)以及AbandonedConfig
(废弃对象检测,可以不配置)。
简单应用
一个最简单的使用步骤为:
- 创建用于包装对象的
PooledObject
。这里使用默认实现DefaultPooledObject
。 - 创建管理对象的工厂
PooledObjectFactory
。新建CustomPooledObjectFactory
,继承BasePooledObjectFactory<Object>
,并实现create()
和wrap(Object obj)
方法,其中create()
方法返回的是我们想要放入和借出的对象;wrap(Object obj)
方法返回的是通过PooledObject
包装的 Object,这里可以使用默认实现new DefaultPooledObject<Object>(obj)
。 - 创建对象池配置。使用常用的
GenericObjectPoolConfig<Object>
,参数通过对应的set
方法传入。 - 创建连接池。使用常用的
GenericObjectPool
,构造函数为public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig<T> config)
,第一个参数传入第二步创建的CustomPooledObjectFactory
,第二个参数传入第三步创建的GenericObjectPoolConfig
。
代码如下,其中 main
方法为创建对象池的 demo 和使用对象池的官方 demo:
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
public class CustomPooledObjectFactory extends BasePooledObjectFactory<Object> {
@Override
public Object create() throws Exception {
return new Object();
}
@Override
public PooledObject<Object> wrap(Object obj) {
return new DefaultPooledObject<>(obj);
}
public static void main(String[] args) {
final GenericObjectPool<Object> objectPool = new GenericObjectPool<>(new CustomPooledObjectFactory(), new GenericObjectPoolConfig<>());
Object obj = null;
try {
obj = objectPool.borrowObject();
try {
// 使用对象
} catch(Exception e) {
// 使对象无效
objectPool.invalidateObject(obj);
// 不要将对象返回到池中两次
obj = null;
} finally {
// 确保对象返回到池中
if(null != obj) {
objectPool.returnObject(obj);
}
}
} catch(Exception e) {
// 从池中借出对象失败
}
}
}
Sftp
sftp 是通过 ssh 完成安全的传输的,而 ssh 的连接数是有限的,具体的 ssh 连接参数见 /etc/ssh/sshd_config
的 MaxStartups
参数,默认为 10
或 10:30:100
,含义如下:
- 10:单独只有一个参数表示最大可用连接数,当
10:30:100
这种格式时表示 100% 可以连接成功的连接数。 - 30:表示 已经有 10 个连接时再连接有 30% 的概率连接失败。
- 100:表示最大连接数。
因此为了尽可能少的创建新连接就需要使用连接池。
依赖
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
创建Sftp连接池
首先创建被池化的对象 SftpClient
,其中 originalDir
属性用于保存登录时的原始目录,validateConnect
方法用于验证连接,disconnect
方法用于销毁连接。
package com.haibara.toys.sftp.core;
import com.jcraft.jsch.*;
import lombok.Data;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author haibara
*/
@Data
public class SftpClient {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final AtomicLong CLIENT_NUMBER = new AtomicLong(1L);
private ChannelSftp channelSftp;
private Session session;
/**
*
*/
private String clientInfo = "sftpclient";
/**
* ssh 根目录。
* 用于判断是否成功返回连接到连接池的条件之一
*/
private String originalDir;
public SftpClient(SftpProperties sftpProperties) throws SftpException, JSchException {
try {
JSch jsch = new JSch();
session = jsch.getSession(sftpProperties.getUsername(), sftpProperties.getHost(), sftpProperties.getPort());
session.setPassword(sftpProperties.getPassword());
Properties config = new Properties();
if (sftpProperties.getSession() != null) {
sftpProperties.getSession().forEach(config::put);
}
session.setConfig(config);
session.connect();
channelSftp = (ChannelSftp) session.openChannel("sftp");
channelSftp.connect();
clientInfo += CLIENT_NUMBER.getAndIncrement() + ",createTime:" + DATE_TIME_FORMATTER.format(LocalDateTime.now());
originalDir = channelSftp.pwd();
} catch (Exception e) {
disconnect();
throw e;
}
}
public void disconnect() {
if (channelSftp != null) {
try {
channelSftp.disconnect();
} catch (Exception ignored) {
}
}
if (session != null) {
try {
session.disconnect();
} catch (Exception ignored) {
}
}
}
public boolean validateConnect() {
try {
return session.isConnected() && channelSftp.isConnected() && originalDir.equals(channelSftp.pwd());
} catch (Exception e) {
return false;
}
}
}
接下来和前面一样分 4 步创建。
-
创建被包装的对象,同样使用默认的
DefaultPooledObject
。 -
创建对象工厂
SftpFactory
,重写validateObject
和destroyObject
方法为SftpClient
的验证和销毁方法,SftpFactory
代码见第 4 步SftpPool
的静态内部类SftpFactory
。 -
创建连接池配置(以及 sftp 的配置)
SftpProperties
类和GenericObjectPoolConfig
(连接池配置),GenericObjectPoolConfig
创建代码见第 4 步SftpPool
的getPoolConfig
方法 。
package com.haibara.toys.sftp.core;
import lombok.Data;
import java.util.Map;
/**
* @author haibara
*/
@Data
public class SftpProperties {
/**
* 地址
*/
private String host = "localhost";
/**
* 端口号
*/
private int port = 22;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* Session 参数配置
*/
private Map<String, String> session;
/**
* 连接池配置
*/
private Pool pool;
/**
* 连接池配置类
*/
@Data
public static class Pool {
/**
* 池中最小的连接数,只有当 timeBetweenEvictionRuns 为正时才有效
*/
private int minIdle = 0;
/**
* 池中最大的空闲连接数,为负值时表示无限
*/
private int maxIdle = 8;
/**
* 池可以产生的最大对象数,为负值时表示无限
*/
private int maxActive = 16;
/**
* 当池耗尽时,阻塞的最长时间,为负值时无限等待
*/
private long maxWait = -1;
/**
* 从池中取出对象是是否检测可用
*/
private boolean testOnBorrow = true;
/**
* 将对象返还给池时检测是否可用
*/
private boolean testOnReturn = false;
/**
* 检查连接池对象是否可用
*/
private boolean testWhileIdle = true;
/**
* 距离上次空闲线程检测完成多久后再次执行
*/
private long timeBetweenEvictionRuns = 300000L;
}
}
yml 中对应的配置如下:
sftp:
host: 127.0.0.1
port: 22
username: root
password: 123456
session:
StrictHostKeyChecking: no
kex: diffie-hellman-group1-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group-exchange-sha256
pool:
max-idle: 8
min-idle: 1
max-active: 16
max-wait: 150000
test-on-borrow: true
test-on-return: false
test-while-idle: true
time-between-eviction-runs: 120000
- 创建连接池
SftpPool
。需要注意的一点是返还给连接池的连接要和新连接的状态相同,因此重写GenericObjectPool
的returnObject
方法,在原来的returnObject
前恢复(还原当前目录为初始目录等等),同时在SftpClient
的validateConnect
方法中添加判断是否恢复的条件(判断当前目录是否是初始目录等等)。
package com.haibara.toys.sftp.core;
import com.jcraft.jsch.SftpException;
import lombok.SneakyThrows;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.util.NoSuchElementException;
/**
* @author haibara
*/
public class SftpPool implements ObjectPool<SftpClient> {
private final GenericObjectPool<SftpClient> internalPool;
public SftpPool(SftpProperties sftpProperties) {
this.internalPool = new GenericObjectPool<SftpClient>(new SftpFactory(sftpProperties), getPoolConfig(sftpProperties.getPool())){
@Override
public void returnObject(SftpClient sftpClient) {
try {
sftpClient.getChannelSftp().cd(sftpClient.getOriginalDir());
} catch (Exception ignored) {
}
super.returnObject(sftpClient);
}
};
}
@Override
public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
internalPool.addObject();
}
@Override
public SftpClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
return internalPool.borrowObject();
}
@Override
public void clear() throws Exception, UnsupportedOperationException {
internalPool.clear();
}
@Override
public void close() {
internalPool.close();
}
@Override
public int getNumActive() {
return internalPool.getNumActive();
}
@Override
public int getNumIdle() {
return internalPool.getNumIdle();
}
@Override
public void invalidateObject(SftpClient obj) throws Exception {
internalPool.invalidateObject(obj);
}
@Override
public void returnObject(SftpClient obj) {
internalPool.returnObject(obj);
}
private static class SftpFactory extends BasePooledObjectFactory<SftpClient> {
private final SftpProperties sftpProperties;
public SftpFactory(SftpProperties sftpProperties) {
this.sftpProperties = sftpProperties;
}
@Override
public SftpClient create() throws Exception {
return new SftpClient(sftpProperties);
}
@Override
public PooledObject<SftpClient> wrap(SftpClient sftpClient) {
return new DefaultPooledObject<>(sftpClient);
}
@Override
public boolean validateObject(PooledObject<SftpClient> p) {
return p.getObject().validateConnect();
}
@Override
public void destroyObject(PooledObject<SftpClient> p) {
p.getObject().disconnect();
}
}
private GenericObjectPoolConfig<SftpClient> getPoolConfig(SftpProperties.Pool properties) {
if (properties == null) {
properties = new SftpProperties.Pool();
}
GenericObjectPoolConfig<SftpClient> config = new GenericObjectPoolConfig<>();
config.setMinIdle(properties.getMinIdle());
config.setMaxIdle(properties.getMaxIdle());
config.setMaxTotal(properties.getMaxActive());
config.setMaxWaitMillis(properties.getMaxWait());
config.setTestOnBorrow(properties.isTestOnBorrow());
config.setTestOnReturn(properties.isTestOnReturn());
config.setTestWhileIdle(properties.isTestWhileIdle());
config.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRuns());
return config;
}
}
最后只需要创建一个 SftpPool
对象就好了,使用代码如下(SftpTemplate
是对 SftpClient
封装的工具类):
@Test
void testSftp() {
String localFile = "";
String remotePath = "";
String fileName = "";
SftpClient sftpClient = null;
try (FileInputStream fileInputStream = new FileInputStream(localFile)) {
try {
sftpClient = sftpPool.borrowObject();
SftpTemplate sftpTemplate = new SftpTemplate(sftpClient);
sftpTemplate.cd(remotePath);
sftpTemplate.upload(fileInputStream, fileName);
} catch (Exception e) {
sftpPool.invalidateObject(sftpClient);
sftpClient = null;
} finally {
if (null != sftpClient) {
sftpPool.returnObject(sftpClient);
}
}
} catch (Exception e) {
// 从池中借出对象失败
}
}
具体代码见 hligaty/Toys 。
参考
apache common pool2原理与实战 - 海向 - 博客园 (cnblogs.com)