[转载] Thrift-client与spring集成

转载自http://shift-alt-ctrl.iteye.com/blog/1990030?utm_source=tuicool&utm_medium=referral

Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:

1) client端需要知道server端的IP + port,如果是分布式部署,还需要知道所有server的IP + port列表.

2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案.

3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利.

4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip + port,可以使用zookeeper来推送每个service的服务地址.

5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。

1. pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework</groupId>
  4. <artifactId>spring-context</artifactId>
  5. <version>3.0.7.RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.zookeeper</groupId>
  9. <artifactId>zookeeper</artifactId>
  10. <version>3.4.5</version>
  11. <!--<exclusions>-->
  12. <!--<exclusion>-->
  13. <!--<groupId>log4j</groupId>-->
  14. <!--<artifactId>log4j</artifactId>-->
  15. <!--</exclusion>-->
  16. <!--</exclusions>-->
  17. </dependency>
  18. <!--
  19. <dependency>
  20. <groupId>com.101tec</groupId>
  21. <artifactId>zkclient</artifactId>
  22. <version>0.4</version>
  23. </dependency>
  24. -->
  25. <dependency>
  26. <groupId>org.apache.thrift</groupId>
  27. <artifactId>libthrift</artifactId>
  28. <version>0.9.1</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.curator</groupId>
  32. <artifactId>curator-recipes</artifactId>
  33. <version>2.3.0</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>commons-pool</groupId>
  37. <artifactId>commons-pool</artifactId>
  38. <version>1.6</version>
  39. </dependency>
  40. </dependencies>

2. spring-thrift-client.xml

    其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.

  1. <!-- fixedAddress -->
  2. <!--
  3. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">
  4. <property name="service" value="com.demo.service.UserService"></property>
  5. <property name="serverAddress" value="127.0.0.1:9090:2"></property>
  6. <property name="maxActive" value="5"></property>
  7. <property name="idleTime" value="10000"></property>
  8. </bean>
  9. -->
  10. <!-- zookeeper -->
  11. <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
  12. <property name="connectString" value="127.0.0.1:2181"></property>
  13. <property name="namespace" value="demo/thrift-service"></property>
  14. </bean>
  15. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">
  16. <property name="service" value="com.demo.service.UserService"></property>
  17. <property name="maxActive" value="5"></property>
  18. <property name="idleTime" value="1800000"></property>
  19. <property name="addressProvider">
  20. <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">
  21. <property name="configPath" value="UserServiceImpl"></property>
  22. <property name="zookeeper" ref="thriftZookeeper"></property>
  23. </bean>
  24. </property>
  25. </bean>

3. ThriftServiceClientProxyFactory.java

因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".

  1. @SuppressWarnings("rawtypes")
  2. public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {
  3. private String service;
  4. private String serverAddress;
  5. private Integer maxActive = 32;//最大活跃连接数
  6. ////ms,default 3 min,链接空闲时间
  7. //-1,关闭空闲检测
  8. private Integer idleTime = 180000;
  9. private ThriftServerAddressProvider addressProvider;
  10. private Object proxyClient;
  11. public void setMaxActive(Integer maxActive) {
  12. this.maxActive = maxActive;
  13. }
  14. public void setIdleTime(Integer idleTime) {
  15. this.idleTime = idleTime;
  16. }
  17. public void setService(String service) {
  18. this.service = service;
  19. }
  20. public void setServerAddress(String serverAddress) {
  21. this.serverAddress = serverAddress;
  22. }
  23. public void setAddressProvider(ThriftServerAddressProvider addressProvider) {
  24. this.addressProvider = addressProvider;
  25. }
  26. private Class objectClass;
  27. private GenericObjectPool<TServiceClient> pool;
  28. private PoolOperationCallBack callback = new PoolOperationCallBack() {
  29. @Override
  30. public void make(TServiceClient client) {
  31. System.out.println("create");
  32. }
  33. @Override
  34. public void destroy(TServiceClient client) {
  35. System.out.println("destroy");
  36. }
  37. };
  38. @Override
  39. public void afterPropertiesSet() throws Exception {
  40. if(serverAddress != null){
  41. addressProvider = new FixedAddressProvider(serverAddress);
  42. }
  43. ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  44. //加载Iface接口
  45. objectClass = classLoader.loadClass(service + "$Iface");
  46. //加载Client.Factory类
  47. Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");
  48. TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
  49. ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);
  50. GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
  51. poolConfig.maxActive = maxActive;
  52. poolConfig.minIdle = 0;
  53. poolConfig.minEvictableIdleTimeMillis = idleTime;
  54. poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;
  55. pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);
  56. proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {
  57. @Override
  58. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  59. //
  60. TServiceClient client = pool.borrowObject();
  61. try{
  62. return method.invoke(client, args);
  63. }catch(Exception e){
  64. throw e;
  65. }finally{
  66. pool.returnObject(client);
  67. }
  68. }
  69. });
  70. }
  71. @Override
  72. public Object getObject() throws Exception {
  73. return proxyClient;
  74. }
  75. @Override
  76. public Class<?> getObjectType() {
  77. return objectClass;
  78. }
  79. @Override
  80. public boolean isSingleton() {
  81. return true;  //To change body of implemented methods use File | Settings | File Templates.
  82. }
  83. public void close(){
  84. if(addressProvider != null){
  85. addressProvider.close();
  86. }
  87. }
  88. }

4. ThriftClientPoolFactory.java

"Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.

  1. /**
  2. * 连接池,thrift-client for spring
  3. */
  4. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{
  5. private final ThriftServerAddressProvider addressProvider;
  6. private final TServiceClientFactory<TServiceClient> clientFactory;
  7. private PoolOperationCallBack callback;
  8. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
  9. this.addressProvider = addressProvider;
  10. this.clientFactory = clientFactory;
  11. }
  12. protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {
  13. this.addressProvider = addressProvider;
  14. this.clientFactory = clientFactory;
  15. this.callback = callback;
  16. }
  17. @Override
  18. public TServiceClient makeObject() throws Exception {
  19. InetSocketAddress address = addressProvider.selector();
  20. TSocket tsocket = new TSocket(address.getHostName(),address.getPort());
  21. TProtocol protocol = new TBinaryProtocol(tsocket);
  22. TServiceClient client = this.clientFactory.getClient(protocol);
  23. tsocket.open();
  24. if(callback != null){
  25. try{
  26. callback.make(client);
  27. }catch(Exception e){
  28. //
  29. }
  30. }
  31. return client;
  32. }
  33. public void destroyObject(TServiceClient client) throws Exception {
  34. if(callback != null){
  35. try{
  36. callback.destroy(client);
  37. }catch(Exception e){
  38. //
  39. }
  40. }
  41. TTransport pin = client.getInputProtocol().getTransport();
  42. pin.close();
  43. }
  44. public boolean validateObject(TServiceClient client) {
  45. TTransport pin = client.getInputProtocol().getTransport();
  46. return pin.isOpen();
  47. }
  48. static interface PoolOperationCallBack {
  49. //销毁client之前执行
  50. void destroy(TServiceClient client);
  51. //创建成功是执行
  52. void make(TServiceClient client);
  53. }
  54. }

5. DynamicAddressProvider.java

将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.

  1. /**
  2. * 可以动态获取address地址,方案设计参考
  3. * 1) 可以间歇性的调用一个web-service来获取地址
  4. * 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等)
  5. * 3) 可以基于zookeeper-watcher机制,获取最新地址
  6. * <p/>
  7. * 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
  8. * 如下实现,仅供参考
  9. */
  10. public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {
  11. private String configPath;
  12. private PathChildrenCache cachedPath;
  13. private CuratorFramework zookeeper;
  14. //用来保存当前provider所接触过的地址记录
  15. //当zookeeper集群故障时,可以使用trace中地址,作为"备份"
  16. private Set<String> trace = new HashSet<String>();
  17. private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
  18. private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
  19. private Object lock = new Object();
  20. private static final Integer DEFAULT_PRIORITY = 1;
  21. public void setConfigPath(String configPath) {
  22. this.configPath = configPath;
  23. }
  24. public void setZookeeper(CuratorFramework zookeeper) {
  25. this.zookeeper = zookeeper;
  26. }
  27. @Override
  28. public void afterPropertiesSet() throws Exception {
  29. //如果zk尚未启动,则启动
  30. if(zookeeper.getState() == CuratorFrameworkState.LATENT){
  31. zookeeper.start();
  32. }
  33. buildPathChildrenCache(zookeeper, configPath, true);
  34. cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
  35. }
  36. private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
  37. cachedPath = new PathChildrenCache(client, path, cacheData);
  38. cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
  39. @Override
  40. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  41. PathChildrenCacheEvent.Type eventType = event.getType();
  42. switch (eventType) {
  43. //                    case CONNECTION_RECONNECTED:
  44. //
  45. //                        break;
  46. case CONNECTION_SUSPENDED:
  47. case CONNECTION_LOST:
  48. System.out.println("Connection error,waiting...");
  49. return;
  50. default:
  51. //
  52. }
  53. //任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
  54. cachedPath.rebuild();
  55. rebuild();
  56. }
  57. protected void rebuild() throws Exception {
  58. List<ChildData> children = cachedPath.getCurrentData();
  59. if (children == null || children.isEmpty()) {
  60. //有可能所有的thrift server都与zookeeper断开了链接
  61. //但是,有可能,thrift client与thrift server之间的网络是良好的
  62. //因此此处是否需要清空container,是需要多方面考虑的.
  63. container.clear();
  64. System.out.println("thrift server-cluster error....");
  65. return;
  66. }
  67. List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
  68. for (ChildData data : children) {
  69. String address = new String(data.getData(), "utf-8");
  70. current.addAll(transfer(address));
  71. trace.add(address);
  72. }
  73. Collections.shuffle(current);
  74. synchronized (lock) {
  75. container.clear();
  76. container.addAll(current);
  77. inner.clear();
  78. inner.addAll(current);
  79. }
  80. }
  81. });
  82. }
  83. private List<InetSocketAddress> transfer(String address){
  84. String[] hostname = address.split(":");
  85. Integer priority = DEFAULT_PRIORITY;
  86. if (hostname.length == 3) {
  87. priority = Integer.valueOf(hostname[2]);
  88. }
  89. String ip = hostname[0];
  90. Integer port = Integer.valueOf(hostname[1]);
  91. List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
  92. for (int i = 0; i < priority; i++) {
  93. result.add(new InetSocketAddress(ip, port));
  94. }
  95. return result;
  96. }
  97. @Override
  98. public List<InetSocketAddress> getAll() {
  99. return Collections.unmodifiableList(container);
  100. }
  101. @Override
  102. public synchronized InetSocketAddress selector() {
  103. if (inner.isEmpty()) {
  104. if(!container.isEmpty()){
  105. inner.addAll(container);
  106. }else if(!trace.isEmpty()){
  107. synchronized (lock) {
  108. for(String hostname : trace){
  109. container.addAll(transfer(hostname));
  110. }
  111. Collections.shuffle(container);
  112. inner.addAll(container);
  113. }
  114. }
  115. }
  116. return inner.poll();//null
  117. }
  118. @Override
  119. public void close() {
  120. try {
  121. cachedPath.close();
  122. zookeeper.close();
  123. } catch (Exception e) {
  124. //
  125. }
  126. }
  127. }

到此为止,我们的Thrift基本上就可以顺利运行起来了.更多代码,参见附件.

Thrift-server端开发与配置,参见[Thrift-server]

上一篇:如何让 linux unzip 命令 不输出结果


下一篇:linux zip命令