Thrift入门及Java实例演示【转】

概述

Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++、Java、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、and OCaml 等等编程语言间无缝结合的、高效的服务。

Thrift最初由facebook开发,07年四月开放源码,08年5月进入Apache孵化器。Thrift允许你定义一个简单的定义文件中的数据类型和服务接口。以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。

官网地址:thrift.apache.org

下载配置

Maven构建项目,在pom.xml 中添加如下内容:

<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.11.0</version>
</dependency>
  • 基本概念

    数据类型

    • 基本类型:
      bool:布尔值,true 或 false,对应 Java 的 boolean
      byte:8 位有符号整数,对应 Java 的 byte
      i16:16 位有符号整数,对应 Java 的 short
      i32:32 位有符号整数,对应 Java 的 int
      i64:64 位有符号整数,对应 Java 的 long
      double:64 位浮点数,对应 Java 的 double
      string:utf-8编码的字符串,对应 Java 的 String

    • 结构体类型:
      struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean

    • 容器类型:
      list:对应 Java 的 ArrayList
      set:对应 Java 的 HashSet
      map:对应 Java 的 HashMap

    • 异常类型:
      exception:对应 Java 的 Exception

    • 服务类型:
      service:对应服务的类

    服务端编码基本步骤

    1. 实现服务处理接口impl
    2. 创建TProcessor
    3. 创建TServerTransport
    4. 创建TProtocol
    5. 创建TServer
    6. 启动Server

    客户端编码基本步骤

    1. 创建Transport
    2. 创建TProtocol
    3. 基于TTransport和TProtocol创建Client
    4. 调用Client的相应方法

    数据传输协议

    1. TBinaryProtocol 二进制格式
    2. TCompactProtocol 压缩格式
    3. TJSONProtocol JSON格式
    4. TSimpleJSONProtocol 提供JSON只写协议,生成的文件很容易通过脚本语言解析

    提示:客户端和服务端的协议要一致

    实例演示

    Thrift生成代码

    创建Thrift文件,比如D:\Tonny\Doc\Project\prj-oxygen\common\src\main\java\org\tonny\thrift\demo\config\news\NewsModel.thrift ,内容如下:

  • namespace java org.tonny.thrift.demo

    service HelloWorldService {
      string sayHello(1:string username)
    }

  • include类型的数据,文件引用,所以要用“文件名+类型”,即文件名.模块名
  • 使用从官网提供下载的thrift-0.11.0.exe,运用这个工具生成相关代码:

    thrift-0.9..exe -r -gen java ./HelloWorld.thrift

    将生成的HelloWorldService.java 文件复制到自己测试的工程中,我的工程是用Maven构建的,故在pom.xml中增加如下内容:

    <dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.9.3</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.5</version>
    </dependency>
  • 实现接口Iface

    Java代码:HelloWorldImpl.java

    package com.thrift.demo;
    
    import org.apache.thrift.TException;
    
    public class HelloWorldImpl implements HelloWorldService.Iface {
    
     public HelloWorldImpl() {
    } @Override
    public String sayHello(String username) throws TException {
    return "Hi," + username + " welcome to thrift demo world";
    } }

    TSimpleServer服务端

    简单的单线程服务模型,一般用于测试。
    编写服务端server代码:ThriftServer.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TBinaryProtocol.Factory;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TSimpleServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TServerSocket;
    import org.apache.thrift.transport.TTransportException;
    import org.apache.thrift.transport.TTransportFactory; import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl; /**
    ************************************************************
    * @类名 ThriftServer
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class ThriftServerDemo { public void startServer() {
    try {
    System.out.println("Starting Thrift Server......"); TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl()); TServerSocket serverTransport = new TServerSocket(8191); TTransportFactory transportFactory = new TFramedTransport.Factory(); Factory factory = new TBinaryProtocol.Factory(); TServer.Args tArgs = new TServer.Args(serverTransport);
    tArgs.protocolFactory(factory);
    tArgs.transportFactory(transportFactory);
    tArgs.processor(processor); // 简单的单线程服务模型,一般用于测试
    TServer server = new TSimpleServer(tArgs); server.serve(); } catch (TTransportException e) {
    System.out.println("Starting Thrift Server......Error!!!");
    e.printStackTrace();
    } } public static void main(String[] args) {
    ThriftServerDemo server = new ThriftServerDemo();
    server.startServer();
    } }

    编写客户端Client代码:ThriftClientDemo.java

    package com.thrift.demo.client;
    
    import org.apache.thrift.TException;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException; import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.HelloWorldService.Client; /**
    ************************************************************
    * @类名 ThriftClient
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class ThriftClientDemo { public static void main(String[] args) {
    try {
    TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 8191, 5000));
    // 协议要和服务端一致
    TProtocol protocol = new TBinaryProtocol(transport); Client client = new HelloWorldService.Client(protocol); transport.open(); String string = client.sayHello("Neo"); System.out.println(string); transport.close(); } catch (TTransportException e) {
    e.printStackTrace();
    } catch (TException e) {
    e.printStackTrace();
    }
    } }

    先运行服务端程序,日志如下:

    Starting Thrift Server......

    再运行客户端调用程序,日志如下:

    Hello World,Hello Thrift!!! Hi:Neo

    测试成功,和预期的返回信息一致。

    TThreadPoolServer 服务模型

    线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。
    编写服务端代码:HelloServerDemo.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadPoolServer;
    import org.apache.thrift.transport.TServerSocket; import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl; /**
    ************************************************************
    * @类名 HelloServerDemo
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class HelloServerDemo {
    public static final int SERVER_PORT = 8191; public void startServer() {
    try {
    System.out.println("HelloWorld TThreadPoolServer start ...."); TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl()); TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
    TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
    ttpsArgs.processor(tprocessor);
    ttpsArgs.protocolFactory(new TBinaryProtocol.Factory()); // 线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。
    TServer server = new TThreadPoolServer(ttpsArgs);
    server.serve(); } catch (Exception e) {
    System.out.println("Server start error!!!");
    e.printStackTrace();
    }
    } public static void main(String[] args) {
    HelloServerDemo server = new HelloServerDemo();
    server.startServer();
    } }

    客户端Client代码和之前的一样,只要数据传输的协议一致即可,客户端测试成功,结果如下:

    Hello World,Hello Thrift!!! Hi:Neo

    TNonblockingServer 服务模型

    使用非阻塞式IO,服务端和客户端需要指定 TFramedTransport 数据传输的方式。
    编写服务端代码:HelloServerDemo.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.server.TNonblockingServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket; import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl; /**
    ************************************************************
    * @类名 HelloServerDemo
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class HelloServerDemo {
    public static final int SERVER_PORT = 8191; public void startServer() {
    try {
    System.out.println("HelloWorld TNonblockingServer start ...."); TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl()); TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT);
    TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
    tnbArgs.processor(tprocessor);
    tnbArgs.transportFactory(new TFramedTransport.Factory());
    tnbArgs.protocolFactory(new TCompactProtocol.Factory()); // 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式
    TServer server = new TNonblockingServer(tnbArgs);
    server.serve(); } catch (Exception e) {
    System.out.println("Server start error!!!");
    e.printStackTrace();
    }
    } public static void main(String[] args) {
    HelloServerDemo server = new HelloServerDemo();
    server.startServer();
    }
    }

    编写客户端代码:HelloClientDemo.java

    package com.thrift.demo.client;
    
    import org.apache.thrift.TException;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException; import com.thrift.demo.service.HelloWorldService; /**
    ************************************************************
    * @类名 HelloClientDemo
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class HelloClientDemo { public static final String SERVER_IP = "127.0.0.1"; public static final int SERVER_PORT = 8191; public static final int TIMEOUT = 30000; public void startClient(String userName) {
    TTransport transport = null;
    try {
    transport = new TFramedTransport(new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT));
    // 协议要和服务端一致
    TProtocol protocol = new TCompactProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();
    String result = client.sayHello(userName);
    System.out.println("Thrify client result =: " + result);
    } catch (TTransportException e) {
    e.printStackTrace();
    } catch (TException e) {
    e.printStackTrace();
    } finally {
    if (null != transport) {
    transport.close();
    }
    }
    } public static void main(String[] args) {
    HelloClientDemo client = new HelloClientDemo();
    client.startClient("Neo"); }
    }

    客户端的测试成功,结果如下:

    Thrify client result =: Hello World,Hello Thrift!!! Hi:Neo

    THsHaServer服务模型

    半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。
    编写服务端代码:HelloServerDemo.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.THsHaServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket; import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl; /**
    ************************************************************
    * @类名 HelloServerDemo
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class HelloServerDemo { public static final int SERVER_PORT = 8191; public void startServer() {
    try {
    System.out.println("HelloWorld THsHaServer start ...."); TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl()); TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT);
    THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
    thhsArgs.processor(tprocessor);
    thhsArgs.transportFactory(new TFramedTransport.Factory());
    thhsArgs.protocolFactory(new TBinaryProtocol.Factory()); // 半同步半异步的服务模型
    TServer server = new THsHaServer(thhsArgs);
    server.serve(); } catch (Exception e) {
    System.out.println("Server start error!!!");
    e.printStackTrace();
    }
    } public static void main(String[] args) {
    HelloServerDemo server = new HelloServerDemo();
    server.startServer();
    }
    }

    客户端代码和上一个服务模型的Client中的类似,只要注意传输协议一致以及指定传输方式为TFramedTransport。

    异步客户端

    编写服务端代码:HelloServerDemo.java

    package com.thrift.demo.client;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.server.TNonblockingServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket; import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl; /**
    ************************************************************
    * @类名 HelloServerDemo
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class HelloServerDemo { public static final int SERVER_PORT = 8191; public void startServer() {
    try {
    System.out.println("HelloWorld TNonblockingServer start ...."); TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl()); TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT);
    TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
    tnbArgs.processor(tprocessor);
    tnbArgs.transportFactory(new TFramedTransport.Factory());
    tnbArgs.protocolFactory(new TCompactProtocol.Factory()); // 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式
    TServer server = new TNonblockingServer(tnbArgs);
    server.serve(); } catch (Exception e) {
    System.out.println("Server start error!!!");
    e.printStackTrace();
    }
    } public static void main(String[] args) {
    HelloServerDemo server = new HelloServerDemo();
    server.startServer();
    }
    }

    编写客户端Client代码:HelloAsynClientDemo.java

    package com.thrift.demo.client;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit; import org.apache.thrift.TException;
    import org.apache.thrift.async.AsyncMethodCallback;
    import org.apache.thrift.async.TAsyncClientManager;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.protocol.TProtocolFactory;
    import org.apache.thrift.transport.TNonblockingSocket;
    import org.apache.thrift.transport.TNonblockingTransport; import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.HelloWorldService.AsyncClient.sayHello_call; /**
    ************************************************************
    * @类名 HelloAsynClientDemo
    *
    * @AUTHOR Neo
    ************************************************************
    */
    public class HelloClientDemo { public static final String SERVER_IP = "127.0.0.1"; public static final int SERVER_PORT = 8191; public static final int TIMEOUT = 30000; public void startClient(String userName) {
    try {
    TAsyncClientManager clientManager = new TAsyncClientManager();
    TNonblockingTransport transport = new TNonblockingSocket(SERVER_IP, SERVER_PORT, TIMEOUT); TProtocolFactory tprotocol = new TCompactProtocol.Factory();
    HelloWorldService.AsyncClient asyncClient = new HelloWorldService.AsyncClient(tprotocol, clientManager, transport);
    System.out.println("Client start ....."); CountDownLatch latch = new CountDownLatch(1);
    AsynCallback callBack = new AsynCallback(latch);
    System.out.println("call method sayHello start ...");
    asyncClient.sayHello(userName, callBack);
    System.out.println("call method sayHello .... end");
    boolean wait = latch.await(30, TimeUnit.SECONDS);
    System.out.println("latch.await =:" + wait);
    } catch (Exception e) {
    e.printStackTrace();
    }
    System.out.println("startClient end.");
    } public class AsynCallback implements AsyncMethodCallback<sayHello_call> { private CountDownLatch latch; public AsynCallback(CountDownLatch latch) {
    this.latch = latch;
    } @Override
    public void onComplete(sayHello_call response) {
    System.out.println("onComplete");
    try {
    // Thread.sleep(1000L * 1);
    System.out.println("AsynCall result =:" + response.getResult().toString());
    } catch (TException e) {
    e.printStackTrace();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    latch.countDown();
    }
    } @Override
    public void onError(Exception exception) {
    System.out.println("onError :" + exception.getMessage());
    latch.countDown();
    }
    } public static void main(String[] args) {
    HelloAsynClientDemo client = new HelloAsynClientDemo();
    client.startClient("Neo");
    } }

    先运行服务程序,再运行客户端程序,测试结果如下:

    Client start .....
    call method sayHello start ...
    call method sayHello .... end
    onComplete
    AsynCall result =:Hello World,Hello Thrift!!! Hi:Neo
    latch.await =:true
    startClient end.

    设计思路

    • Thrift的Server类型有TSimpleServer、TNonblockingServer、THsHaServer、TThreadedSelectorServer、TThreadPoolServer
    • TSimpleServer是单线程阻塞IO的方式,仅用于demo
    • TNonblockingServer是单线程非阻塞IO的方式,通过java.nio.channels.Selector的select()接收连接请求,但是处理消息仍然是单线程,吞吐量有限不可用于生产
    • THsHaServer使用一个单独的线程处理IO,一个独立的worker线程池处理消息, 可以并行处理所有请求
    • TThreadPoolServer使用一个专用连接接收connection,一旦接收到请求就会放入ThreadPoolExecutor中的一个worker里处理,当请求处理完毕该worker释放并回到线程池中,可以配置线程最大值,当达到线程最大值时请求会被阻塞。TThreadPoolServer性能表现优异,代价是并发高时会创建大量线程
    • TThreadedSelectorServer是thrift 0.8引入的实现,处理IO也使用了线程池,比THsHaServer有更高的吞吐量和更低的时延,与TThreadPoolServer比性能相近且能应对网络IO较多的情况
    • 对于客户端较少的情况,TThreadPoolServer也有优异的性能表现,但是考虑到未来SOA可能的高并发,决定采用TThreadedSelectorServer
上一篇:Thrift入门及Java实例演示


下一篇:使用C#和Thrift来访问Hbase实例