序言: 因为ssdb(一种nosql数据库) java客户端没有实现连接池,需要模仿jdbc里面的连接池的思想,实现一个简单的连接池,以下是自己的总结:
思想: 每new 一个SSDB实例 ,用完之后close,其实底层就是开了一个socket连接,又关闭了,这其实是相当的消耗资源的,所以连接池的作用就是重用。不是直接关闭而是放到一个容器中去,下次再用的时候,直接从这个容器中拿。那么要点以下几点:
- 容器的选择:是LinkList,,它不同于ArrayList,LinkList是一种线性的结构保持FIFO,相当于一个队列,优点是可以实现快速的插入和移除。
- 初始化连接数:可以在第一次初始化的时候就先创建了一定数量的连接,这其实是比较大的开销,但是为后面的快速调用是非常值得的。
- 最大连接数:当并发很大的时候,就需要大量的链接,当从池子里面取不到连接时,就会创建新的链接,任何机器都是有它的极限值,为了不至于把资源耗完,我们就只能限制这个极限值(最大连接数),一旦大于这个极限值的时候,我就拒绝提供链接,直接抛出异常(当然这里可以有很多种处理方式,比如阻塞进程,让它等待,直到超时)。
- 超时,最小空闲数:当一个链接在队列中空闲着,超过一定的时间,就会把它销毁掉,但是还是会保留一个最小的空闲数量(我这里没有实现)。
package ssdb.datasource; import com.udpwork.ssdb.SSDB; import java.util.LinkedList; /** * Desc: * User: weiguili(li5220008@gmail.com) * Date: 14-1-15 * Time: 上午9:18 */ public class DataSource { private String host = "192.168.98.250"; private int port = 9966; private int timeoutMs = 5000; private int minIdle = 10;//最小空闲数 private int iniCount = 5;//初始化连接 private int maxCount = 10;//最大连接数 int currentCount = 0;//当前连接数 LinkedList<SSDB> ssdbConnectionsPool = new LinkedList<SSDB>(); public DataSource() { try { for(int i=0;i<iniCount;i++){ currentCount++; ssdbConnectionsPool.addLast(createSSDB(null)); } } catch (Exception e) { throw new ExceptionInInitializerError(e); } } public SSDB createSSDB(ThreadLocal<SSDB> cacheSSDB) throws Exception { return new MySSDB(host,port,timeoutMs,this,cacheSSDB); } public SSDB getSSDB(ThreadLocal<SSDB> cacheSSDB) throws Exception{ synchronized (ssdbConnectionsPool){ if(ssdbConnectionsPool.size()>0){ return ssdbConnectionsPool.removeFirst(); } if(currentCount<maxCount){ currentCount++; return createSSDB(cacheSSDB); } throw new Exception("没有连接了"); } } }
- 覆盖close方法:这里为了不改变用户的使用习惯,我需要覆盖原来的SSDB中的close方法,我这里使用的是继承(在这种情况下,我的灵活性要求不大,JDBC里面用的是代理,最好是动态代理,实际上体现的是组合优于继承思想)。
package ssdb.datasource; import com.udpwork.ssdb.SSDB; /** * Desc: * User: weiguili(li5220008@gmail.com) * Date: 14-1-15 * Time: 上午10:02 */ public class MySSDB extends SSDB { private DataSource dataSource; private int reusedCount = 20;//可重用次数 private int currentReusedCount = 0;//当前重用次数 private ThreadLocal<SSDB> cacheSSDB = null; MySSDB(String host,int port,int timeoutMs, DataSource dataSource,ThreadLocal<SSDB> cacheSSDB) throws Exception{ super(host,port,timeoutMs); this.dataSource = dataSource; this.cacheSSDB = cacheSSDB; } @Override public void close() { currentReusedCount++; if(currentReusedCount<reusedCount){ this.dataSource.ssdbConnectionsPool.addLast(this); }else { this.dataSource.currentCount--; if(cacheSSDB !=null && cacheSSDB.get() !=null) cacheSSDB.remove();//当连接关闭时,把缓存也干掉 super.close(); } } }
- SSDBUtil,ThreadLocal:最后是需要提供一个工具类SSDBUtil,目的是把对连接池的使用,都放到一个静态域里面去,实际上就是让它实现单例。在这里有一点非常关键,ThreadLocal是个线程的本地变量,在这里把每个SSDB包一下放进去可以起到线程之间隔离开来,各自处理自己的连接,避免了资源的抢占,是解决并发的一种解决方案。
package ssdb; import com.udpwork.ssdb.SSDB; import ssdb.datasource.DataSource; /** * Desc: * User: weiguili(li5220008@gmail.com) * Date: 14-1-15 * Time: 上午10:42 */ public class SSDBUtil { private static DataSource dataSource = new DataSource(); static ThreadLocal<SSDB> cacheSSDB = new ThreadLocal<SSDB>();//包一下,线程之间隔离 public static SSDB getSSDB() throws Exception{ if(cacheSSDB.get() != null){ return cacheSSDB.get(); } SSDB ssdb = dataSource.getSSDB(cacheSSDB); cacheSSDB.set(ssdb); return ssdb; } /*public static SSDB getSSDB() throws Exception{ return dataSource.getSSDB(cacheSSDB); }*/ }
- 最后附上一张连接池的原理图:
- 示例代码:
package ssdb; import com.udpwork.ssdb.*; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * SSDB Java client SDK demo. */ public class Demo { public static void main(String[] args) throws Exception { SSDB ssdb = null; Response resp; byte[] b; ssdb = SSDBUtil.getSSDB(); /* kv */ System.out.println("---- kv -----"); //ssdb.set("a", "123"); ssdb.set("a", "122"); b = ssdb.get("a"); System.out.println(new String(b)); ssdb.del("a"); b = ssdb.get("a"); ssdb.set("a","90"); System.out.println(b); long incr = ssdb.incr("a", 10); System.out.println("-------increment the number by--------"); System.out.println(incr); resp = ssdb.scan("", "", 10); resp.print(); resp = ssdb.rscan("", "10000", 10); resp.print(); System.out.println(""); /* hashmap */ System.out.println("---- hashmap -----"); ssdb.hset("n", "a", "123"); b = ssdb.hget("n", "a"); System.out.println(new String(b)); ssdb.hdel("n", "a"); b = ssdb.hget("n", "a"); System.out.println(b); ssdb.hincr("n", "a", 10); ssdb.hset("n", "d", "124"); ssdb.hset("n", "c", "124"); ssdb.hset("n", "b", "124"); resp = ssdb.hscan("n", "a", "z", 10); //resp = ssdb.hrscan("n", "", "", 10); resp.print(); System.out.println(""); /* zset */ System.out.println("---- zset -----"); double d; ssdb.zset("hackers", "Alan Kay", 1940); ssdb.zset("hackers", "Richard Stallman", 1953); ssdb.zset("hackers", "Yukihiro Matsumoto", 1965); ssdb.zset("hackers", "Claude Shannon", 1916); ssdb.zset("hackers", "Linus Torvalds", 1999); ssdb.zset("hackers", "Alan Turing", 1912); ssdb.zset("n", "a", 1); d = ssdb.zget("n", "a"); System.out.println(d); ssdb.zdel("n", "a"); d = ssdb.zget("n", "a"); System.out.println(d); ssdb.zincr("n", "b", 10); //resp = ssdb.zscan("hackers", "", Double.valueOf(1912), Double.valueOf(1999), 10); resp = ssdb.zscan("test", "", null, Double.MAX_VALUE, 10); resp.print(); System.out.println(resp.items); System.out.println(resp.keys); System.out.println(resp.raw); System.out.println(""); /* multi */ ssdb.multi_set("a", "1b", "b", "2b"); resp = ssdb.multi_get("a", "b"); resp.print(); System.out.println(""); // ssdb.close(); concurrentTest(); } public static void concurrentTest(){ Executor pool = Executors.newFixedThreadPool(50); for(int i=0;i<1000;i++){ Runnable task = new Runnable() { @Override public void run() { synchronized (this){ System.out.println(Thread.currentThread().getName()); SSDB ssdb = null; try { ssdb = SSDBUtil.getSSDB(); System.out.println(ssdb); } catch (Exception e) { e.printStackTrace(); } finally { ssdb.close(); //ssdb.close(); } } } }; pool.execute(task); } } }