RPC - Thrift

背景:公司提供给第三方的数据传输接口一直是以Hessian的协议进行发布的,但是由于交通车辆通行数据量较大,导致第三方反应出现数据延迟的情况或者连接超时的情况,所以需要更换Hessian,换成性能更高的Thrift协议

区别:

  Hessian  Thrift
优点

1、简单易用,面向接口,通过接口暴露服务,jar包只有200、300k,不需要配置防火墙
2、效率高,复杂对象序列化速度仅次于RMI,简单对象序列化优于RMI,二进制传输
3、多语言支持:wiki、Java、Flash/Flex、Python、C++、.NET C#、PHP、Ruby、Objective-C
4、可与spring集成,配置简单,使用HessianServiceExporter提供bean服务
5、在数据量不大的情况下是一个比较好的选择
6、可以保持java代码的一致,编写简单方便

1、支持非常多的语言绑定
2、thrift文件生成目标代码,简单易用
3、消息定义文件支持注释
4、数据结构与传输表现的分离,支持多种消息格式
5、包含完整的客户端/服务端堆栈,可快速实现RPC
6、支持同步和异步通信
7、在大数据量下,性能较强
缺点 大数据量情况下,性能不是很高,会出现一些意料之外的错误 编写代码步骤较繁琐,需要使用thrift.exe工具来进行代码编译,与idea结合的不是很好
     

 

Thrift Demo

1、首先需要编写一个Thrift文件  StudyService.thrift

//定义Thrift服务
service StudyService {
 
    string thriftFun(1:required string name) ;

}

2、用thrift.exe对StudyService.thrift进行java代码生成(https://thrift.apache.org/ 下载thrift编译工具),执行命令(windows)

thrift-0.13.0.exe -gen java StudyThrift.thrift

  RPC - Thrift

 

 

3、把生成的代码拷贝到项目当中:

RPC - Thrift

 

 4、开始编写实现类:

@Slf4j
@Service
public class StudyServiceImpl implements StudyService.Iface {
    @Override
    public String thriftFun(String name) throws  TException {
        log.info("getStudentByName");

        return name;
    }


}

 5、编写Thrift的conf文件,即启动类

@Component
public class ThriftServerConf {
    /**
     * 监听的端口
     */
    @Value("${server.thrift.port}")
    private Integer port;

    /**
     * 线程池最小线程数
     */
    @Value("${server.thrift.min-thread-pool}")
    private Integer minThreadPool;

    /**
     * 线程池最大线程数
     */
    @Value("${server.thrift.max-thread-pool}")
    private Integer maxThreadPool;

    /**
     * 业务服务对象
     */
    @Autowired
    StudyServiceImpl myServerService;

    public void start() {
        try {
            //thrift支持的scoker有很多种
            //非阻塞的socker
            TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
            //THsHaServer 一个高可用的server
            //minWorkerThreads 最小的工作线程2
            //maxWorkerThreads 最大的工作线程4
            //如果这里Args不使用executorService指定线程池的话,创建THsHaServer会创建一个默认的LinkedBlockingQueue
            THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(minThreadPool).maxWorkerThreads(maxThreadPool);
            //可以自定义指定线程池
            //ExecutorService pool = Executors.newFixedThreadPool(minThreadPool);
            //arg.executorService(pool);

            //Processor处理区  用于处理业务逻辑
            //泛型就是实现的业务
            StudyService.Processor<StudyServiceImpl> processor = new StudyService.Processor<>(myServerService);

            //---------------thrift传输协议------------------------------
            //1. TBinaryProtocol      二进制传输协议
            //2. TCompactProtocol     压缩协议 他是基于TBinaryProtocol二进制协议在进一步的压缩,使得体积更小
            //3. TJSONProtocol        Json格式传输协议
            //4. TSimpleJSONProtocol  简单JSON只写协议,生成的文件很容易通过脚本语言解析,实际开发中很少使用
            //5. TDebugProtocol       简单易懂的可读协议,调试的时候用于方便追踪传输过程中的数据
            //-----------------------------------------------------------
            //设置工厂
            //协议工厂 TCompactProtocol压缩工厂  二进制压缩协议
            arg.protocolFactory(new TCompactProtocol.Factory());
            //---------------thrift传输格式------------------------------


            //---------------thrift数据传输方式------------------------------
            //1. TSocker            阻塞式Scoker 相当于Java中的ServerSocket
            //2. TFrameTransport    以frame为单位进行数据传输,非阻塞式服务中使用
            //3. TFileTransport     以文件的形式进行传输
            //4. TMemoryTransport   将内存用于IO,Java实现的时候内部实际上是使用了简单的ByteArrayOutputStream
            //5. TZlibTransport     使用zlib进行压缩,与其他传世方式联合使用;java当前无实现所以无法使用
            //传输工厂 更加底层的概念
            arg.transportFactory(new TFramedTransport.Factory());
            //arg.transportFactory(new TTransportFactory());
            //---------------thrift数据传输方式------------------------------

            //设置处理器(Processor)工厂
            arg.processorFactory(new TProcessorFactory(processor));

            //---------------thrift支持的服务模型------------------------------
            //1.TSimpleServer  简单的单线程服务模型,用于测试
            //2.TThreadPoolServer 多线程服务模型,使用的标准的阻塞式IO;运用了线程池,当线程池不够时会创建新的线程,当线程池出现大量空闲线程,线程池会对线程进行回收
            //3.TNonBlockingServer 多线程服务模型,使用非阻塞式IO(需要使用TFramedTransport数据传输方式)
            //4.THsHaServer YHsHa引入了线程池去处理(需要使用TFramedTransport数据传输方式),其模型把读写任务放到线程池去处理;Half-sync/Half-async(半同步半异步)的处理模式;Half-sync是在处理IO时间上(sccept/read/writr io),Half-async用于handler对RPC的同步处理
            //----------------------------
            //根据参数实例化server
            //半同步半异步的server
            TServer server = new THsHaServer(arg);
            //---------------thrift支持的服务模型------------------------------

            System.out.println("shrift server started; port:" + port);
            //启动server
            // 异步非阻塞的死循环
            server.serve();
        } catch (TTransportException e) {
            e.printStackTrace();
        }
    }
}

  6、项目启动,将Thrift进行启动发布即可

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }

    /**
     * 向Spring注册一个Bean对象
     * initMethod = "start"  表示会执行名为start的方法
     * start方法执行之后,就会阻塞接受客户端的请求
     *
     * @return
     */
    @Bean(initMethod = "start")
    public ThriftServerConf init() {
        ThriftServerConf thriftServer = new ThriftServerConf();
        return thriftServer;
    }
}

  7、编写客户端进行测试

public class ThriftClientTest {
    public static void main(String[] args) throws TException {
        TTransport tTransport = new TFramedTransport(new TSocket("127.0.0.1", 9991), 600);
        tTransport.open();
        //协议对象 这里使用协议对象需要和服务器的一致
        TProtocol tProtocol = new TCompactProtocol(tTransport);
        StudyService.Client client = new StudyService.Client(tProtocol);

        String s = client.thriftFun("测试名称");
        System.out.println(JSONUtil.toJsonStr(s));

    }
}

  

项目源码git地址:https://github.com/1924605670/STUDY_DEMO.git

 

上一篇:NoSql Cassandra


下一篇:微服务基础知识