概述
Thrift是一个可互操作和可伸缩服务的框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的、高效的服务。
Thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器。thrift允许你定义一个简单的定义文件中的数据类型和服务接口(IDL)。以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。
其传输数据采用二进制格式,相对于XML和JSON等序列化方式体积更小,对于高并发、大数据量和多语言的环境更有优势。 Thrift它含有三个主要的组件:protocol,transport和server,其中,protocol定义了消息是怎样序列化的,transport定义了消息是怎样在客户端和服务器端之间通信的,server用于从transport接收序列化的消息,根据protocol反序列化之,调用用户定义的消息处理器,并序列化消息处理器的响应,然后再将它们写回transport。
官网地址:thrift.apache.org
基本概念
架构图
堆栈的顶部是从Thrift定义文件生成的代码。Thrift 服务生成的客户端和处理器代码。这些由图中的棕色框表示。红色框为发送的数据结构(内置类型除外)也会生成代码。协议和传输是Thrift运行时库的一部分。因此使用Thrift可以定义服务,并且可以*更改协议和传输,而无需重新生成代码。 Thrift还包括一个服务器基础结构,用于将协议和传输绑定在一起。有可用的阻塞,非阻塞,单线程和多线程服务器。 堆栈的“底层I / O”部分根据所开发语言而有所不同。对于Java和Python网络I / O,Thrift库利用内置库,而C ++实现使用自己的自定义实现。
数据类型:
基本类型:
-
bool:布尔值,true 或 false,对应 Java 的 boolean
-
byte:8 位有符号整数,对应 Java 的 byte
-
i16:16 位有符号整数,对应 Java 的 short
-
i32:32 位有符号整数,对应 Java 的 int
-
i64:64 位有符号整数,对应 Java 的 long
-
double:64 位浮点数,对应 Java 的 double
-
string:未知编码文本或二进制字符串,对应 Java 的 String
结构体类型:
-
struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean
集合类型:
-
list:对应 Java 的 ArrayList
-
set:对应 Java 的 HashSet
-
map:对应 Java 的 HashMap
异常类型:
-
exception:对应 Java 的 Exception
服务类型:
-
service:对应服务的类
数据传输层Transport
-
TSocket —— 使用阻塞式 I/O 进行传输,是最常见的模式
-
TFramedTransport —— 使用非阻塞方式,按块的大小进行传输,类似于 Java 中的 NIO,若使用 TFramedTransport 传输层,其服务器必须修改为非阻塞的服务类型
-
TNonblockingTransport —— 使用非阻塞方式,用于构建异步客户端
数据传输协议Protocol
Thrift 可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本 (text) 和二进制 (binary) 传输协议,为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目 / 产品中的实际需求。
常用协议有以下几种:
-
TBinaryProtocol : 二进制格式.
-
TCompactProtocol : 高效率的、密集的二进制压缩格式
-
TJSONProtocol : JSON格式
-
TSimpleJSONProtocol : 提供JSON只写协议, 生成的文件很容易通过脚本语言解析
注意:客户端和服务端的协议要一致。
服务器类型Server
-
TSimpleServer ——单线程服务器端使用标准的阻塞式 I/O,一般用于测试。
-
TThreadPoolServer —— 多线程服务器端使用标准的阻塞式 I/O,预先创建一组线程处理请求。
-
TNonblockingServer —— 多线程服务器端使用非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式。
-
THsHaServer —— 半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。它使用一个单独的线程来处理网络I/O,一个独立的worker线程池来处理消息。这样,只要有空闲的worker线程,消息就会被立即处理,因此多条消息能被并行处理。
-
TThreadedSelectorServer —— TThreadedSelectorServer允许你用多个线程来处理网络I/O。它维护了两个线程池,一个用来处理网络I/O,另一个用来进行请求的处理。当网络I/O是瓶颈的时候,TThreadedSelectorServer比THsHaServer的表现要好。
实现逻辑
服务端
实现服务处理接口 impl
创建TProcessor 创建TServerTransport 创建TProtocol 创建TServer 启动Server
客户端
创建Transport 创建TProtocol 基于TTransport和TProtocol创建 Client 调用Client的相应方法
ThriftServerDemo实例
新建 Maven
项目,并且添加 thrift
依赖包
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.thrift</groupId>
-
<artifactId>libthrift</artifactId>
-
<version>0.9.3</version>
-
</dependency>
-
<dependency>
-
<groupId>org.slf4j</groupId>
-
<artifactId>slf4j-log4j12</artifactId>
-
<version>1.7.12</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.logging.log4j</groupId>
-
<artifactId>log4j-api</artifactId>
-
<version>2.7</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.logging.log4j</groupId>
-
<artifactId>log4j-core</artifactId>
-
<version>2.7</version>
-
</dependency>
-
</dependencies>
-
<build>
-
<plugins>
-
<plugin>
-
<groupId>org.apache.maven.plugins</groupId>
-
<artifactId>maven-compiler-plugin</artifactId>
-
<version>3.3</version>
-
<configuration>
-
<source>1.8</source>
-
<target>1.8</target>
-
<encoding>utf-8</encoding>
-
</configuration>
-
</plugin>
-
</plugins>
-
</build>
编写 IDL
接口并生成接口文件
-
namespace java thrift.service
-
-
// 计算类型 - 仅限整数四则运算
-
enum ComputeType {
-
ADD = 0;
-
SUB = 1;
-
MUL = 2;
-
DIV = 3;
-
}
-
-
// 服务请求
-
struct ComputeRequest {
-
1:required i64 x;
-
2:required i64 y;
-
3:required ComputeType computeType;
-
}
-
-
// 服务响应
-
struct ComputeResponse {
-
1:required i32 errorNo;
-
2:optional string errorMsg;
-
3:required i64 computeRet;
-
}
-
-
service ComputeServer {
-
ComputeResponse getComputeResult(1:ComputeRequest request);
-
}
执行编译命令:
-
thrift-0.11.0.exe -r -gen java computeServer.thrift
拷贝生成的 Service
类文件到 IDEA
服务端接口实现
-
public class ThriftTestImpl implements ComputeServer.Iface {
-
private static final Logger logger = LogManager.getLogger(ThriftTestImpl.class);
-
public ComputeResponse getComputeResult(ComputeRequest request) {
-
ComputeType computeType = request.getComputeType();
-
long x = request.getX();
-
long y = request.getY();
-
logger.info("get compute result begin. [x:{}] [y:{}] [type:{}]", x, y, computeType.toString());
-
long begin = System.currentTimeMillis();
-
ComputeResponse response = new ComputeResponse();
-
response.setErrorNo(0);
-
try {
-
long ret;
-
if (computeType == ComputeType.ADD) {
-
ret = add(x, y);
-
response.setComputeRet(ret);
-
} else if (computeType == ComputeType.SUB) {
-
ret = sub(x, y);
-
response.setComputeRet(ret);
-
} else if (computeType == ComputeType.MUL) {
-
ret = mul(x, y);
-
response.setComputeRet(ret);
-
} else {
-
ret = div(x, y);
-
response.setComputeRet(ret);
-
}
-
} catch (Exception e) {
-
response.setErrorNo(1001);
-
response.setErrorMsg(e.getMessage());
-
logger.error("exception:", e);
-
}
-
long end = System.currentTimeMillis();
-
logger.info("get compute result end. [errno:{}] cost:[{}ms]", response.getErrorNo(), (end - begin));
-
return response;
-
}
-
private long add(long x, long y) {
-
return x + y;
-
}
-
private long sub(long x, long y) {
-
return x - y;
-
}
-
private long mul(long x, long y) {
-
return x * y;
-
}
-
private long div(long x, long y) {
-
return x / y;
-
}
-
}
服务端实现
-
public class ServerMain {
-
private static final Logger logger = LogManager.getLogger(ServerMain.class);
-
-
public static void main(String[] args) {
-
try {
-
//实现服务处理接口impl
-
ThriftTestImpl workImpl = new ThriftTestImpl();
-
//创建TProcessor
-
TProcessor tProcessor = new ComputeServer.Processor<ComputeServer.Iface>(workImpl);
-
//创建TServerTransport,非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式
-
final TNonblockingServerTransport transport = new TNonblockingServerSocket(9999);
-
//创建TProtocol
-
TThreadedSelectorServer.Args ttpsArgs = new TThreadedSelectorServer.Args(transport);
-
ttpsArgs.transportFactory(new TFramedTransport.Factory());
-
//二进制格式反序列化
-
ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());
-
ttpsArgs.processor(tProcessor);
-
ttpsArgs.selectorThreads(16);
-
ttpsArgs.workerThreads(32);
-
logger.info("compute service server on port :" + 9999);
-
//创建TServer
-
TServer server = new TThreadedSelectorServer(ttpsArgs);
-
//启动Server
-
server.serve();
-
} catch (Exception e) {
-
logger.error(e);
-
}
-
}
-
}
服务端整体代码结构
log4j2.xml配置文件
-
<?xml version="1.0" encoding="UTF-8"?>
-
<!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->
-
<!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出-->
-
<!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->
-
<configuration status="INFO" monitorInterval="30">
-
<!--先定义所有的appender-->
-
<appenders>
-
<!--这个输出控制台的配置-->
-
<console name="Console" target="SYSTEM_OUT">
-
<!--输出日志的格式-->
-
<PatternLayout pattern="%highlight{[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [%l] %m%n}"/>
-
</console>
-
-
<RollingFile name="RollingFileInfo" fileName="log/log.log" filePattern="log/log.log.%d{yyyy-MM-dd}">
-
<!-- 只接受level=INFO以上的日志 -->
-
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
-
<PatternLayout pattern="[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [ LOGID:%X{logid} ] [%l] %m%n"/>
-
<Policies>
-
<TimeBasedTriggeringPolicy modulate="true" interval="1"/>
-
<SizeBasedTriggeringPolicy/>
-
</Policies>
-
</RollingFile>
-
-
<RollingFile name="RollingFileError" fileName="log/error.log" filePattern="log/error.log.%d{yyyy-MM-dd}">
-
<!-- 只接受level=WARN以上的日志 -->
-
<Filters>
-
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY" />
-
</Filters>
-
<PatternLayout pattern="[ %p ] %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] [%l] %m%n"/>
-
<Policies>
-
<TimeBasedTriggeringPolicy modulate="true" interval="1"/>
-
<SizeBasedTriggeringPolicy/>
-
</Policies>
-
</RollingFile>
-
-
</appenders>
-
-
<!--然后定义logger,只有定义了logger并引入的appender,appender才会生效-->
-
<loggers>
-
<!--过滤掉spring和mybatis的一些无用的DEBUG信息-->
-
<logger name="org.springframework" level="INFO"></logger>
-
<logger name="org.mybatis" level="INFO"></logger>
-
<root level="all">
-
<appender-ref ref="Console"/>
-
<appender-ref ref="RollingFileInfo"/>
-
<appender-ref ref="RollingFileError"/>
-
</root>
-
</loggers>
-
</configuration>
Jmeter测试类编写
利用JMeter调用Java测试类去调用对应的后台服务,并记住每次调用并获取反馈值的RT,ERR%,只需要按照单线程的方式去实现测试业务,也无需添加各种埋点收集数据
新建一个 JavaMaven
工程,添加 JMeter
及 thrift
依赖包
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.jmeter</groupId>
-
<artifactId>ApacheJMeter_core</artifactId>
-
<version>4.0</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.jmeter</groupId>
-
<artifactId>ApacheJMeter_java</artifactId>
-
<version>4.0</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.apache.thrift</groupId>
-
<artifactId>libthrift</artifactId>
-
<version>0.11.0</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.logging.log4j</groupId>
-
<artifactId>log4j-api</artifactId>
-
<version>2.11.1</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.logging.log4j</groupId>
-
<artifactId>log4j-core</artifactId>
-
<version>2.11.1</version>
-
</dependency>
-
<dependency>
-
<groupId>org.slf4j</groupId>
-
<artifactId>slf4j-log4j12</artifactId>
-
<version>1.7.25</version>
-
</dependency>
-
</dependencies>
-
-
<build>
-
<plugins>
-
<plugin>
-
<groupId>org.apache.maven.plugins</groupId>
-
<artifactId>maven-compiler-plugin</artifactId>
-
<version>3.7.0</version>
-
<configuration>
-
<source>1.8</source>
-
<target>1.8</target>
-
<encoding>utf-8</encoding>
-
</configuration>
-
</plugin>
-
</plugins>
-
</build>
ThriftClient测试类编写
-
public class ThriftClient {
-
private ComputeServer.Client client = null;
-
private TTransport tTransport = null;
-
-
public ThriftClient(String ip,int port){
-
try {
-
TTransport tTransport = new TFramedTransport(new TSocket(ip,port));
-
tTransport.open();
-
TProtocol tProtocol = new TBinaryProtocol(tTransport);
-
client = new ComputeServer.Client(tProtocol);
-
} catch (TTransportException e) {
-
e.printStackTrace();
-
}
-
}
-
-
public ComputeResponse getResponse(ComputeRequest request){
-
try {
-
ComputeResponse response = client.getComputeResult(request);
-
return response;
-
} catch (TException e) {
-
e.printStackTrace();
-
return null;
-
}
-
}
-
-
public void close(){
-
if (tTransport != null && tTransport.isOpen()){
-
tTransport.close();
-
}
-
}
-
}
注意:需要把编写 IDL
接口文件拷贝到工程里
新建一个 JavaClass
,如下例中的 TestThriftByJmeter
,并继承 AbstractJavaSamplerClient
。 AbstractJavaSamplerClient
中默认实现了四个可以覆盖的方法,分别是 getDefaultParameters()
, setupTest()
, runTest()
和 teardownTest()
方法。
-
getDefaultParameters
方法主要用于设置传入界面的参数; -
setupTest
方法为初始化方法,用于初始化性能测试时的每个线程; -
runTest
方法为性能测试时的线程运行体; -
teardownTest
方法为测试结束方法,用于结束性能测试中的每个线程。
编写TestThriftByJmeter测试类
-
public class TestThriftByJmeter extends AbstractJavaSamplerClient {
-
private ThriftClient client;
-
private ComputeRequest request;
-
private ComputeResponse response;
-
-
//设置传入界面的参数
-
@Override
-
public Arguments getDefaultParameters(){
-
Arguments arguments = new Arguments();
-
arguments.addArgument("ip","172.16.14.251");
-
arguments.addArgument("port","9999");
-
arguments.addArgument("X","0");
-
arguments.addArgument("Y","0");
-
arguments.addArgument("type","0");
-
return arguments;
-
}
-
-
//初始化方法
-
@Override
-
public void setupTest(JavaSamplerContext context){
-
//获取Jmeter中设置的参数
-
String ip = context.getParameter("ip");
-
int port = context.getIntParameter("port");
-
int x = context.getIntParameter("X");
-
int y = context.getIntParameter("Y");
-
ComputeType type = ComputeType.findByValue(context.getIntParameter("type"));
-
-
//创建客户端
-
client = new ThriftClient(ip,port);
-
//设置request请求
-
request = new ComputeRequest(x,y,type);
-
super.setupTest(context);
-
}
-
-
//性能测试线程运行体
-
@Override
-
public SampleResult runTest(JavaSamplerContext context) {
-
SampleResult result = new SampleResult();
-
//开始统计响应时间标记
-
result.sampleStart();
-
try {
-
long begin = System.currentTimeMillis();
-
response = client.getResponse(request);
-
long cost = (System.currentTimeMillis() - begin);
-
//打印时间戳差值。Java请求响应时间
-
System.out.println(response.toString()+" 总计花费:["+cost+"ms]");
-
-
if (response == null){
-
//设置测试结果为fasle
-
result.setSuccessful(false);
-
return result;
-
}
-
if (response.getErrorNo() == 0){
-
//设置测试结果为true
-
result.setSuccessful(true);
-
}else{
-
result.setSuccessful(false);
-
result.setResponseMessage("ERROR");
-
}
-
}catch (Exception e){
-
result.setSuccessful(false);
-
result.setResponseMessage("ERROR");
-
e.printStackTrace();
-
}finally {
-
//结束统计响应时间标记
-
result.sampleEnd();
-
}
-
return result;
-
}
-
-
//测试结束方法
-
public void tearDownTest(JavaSamplerContext context) {
-
if (client != null) {
-
client.close();
-
}
-
-
super.teardownTest(context);
-
}
-
-
}
特别说明:
-
result.setSamplerLabel("7D"); //设置java Sampler的标题
-
result.setResponseOK(); //设置响应成功
-
result.setResponseData(); //设置响应内容
编写测试Run Main方法
-
public class RunMain {
-
public static void main(String[] args) {
-
Arguments arguments = new Arguments();
-
arguments.addArgument("ip","172.16.14.251");
-
arguments.addArgument("port","9999");
-
arguments.addArgument("X","1");
-
arguments.addArgument("Y","3");
-
arguments.addArgument("type","0");
-
JavaSamplerContext context = new JavaSamplerContext(arguments);
-
TestThriftByJmeter jmeter = new TestThriftByJmeter();
-
-
jmeter.setupTest(context);
-
jmeter.runTest(context);
-
jmeter.tearDownTest(context);
-
-
}
-
}
测试结果通过
使用 mvn cleanpackage
打包测试代码
使用 mvn dependency:copy-dependencies-DoutputDirectory=lib
复制所依赖的jar包都会到项目下的lib目录下
复制测试代码 jar
包到 jmeter\lib\ext
目录下,复制依赖包到 jmeter\lib
目录下
这里有两点需要注意:
-
如果你的jar依赖了其他第三方jar,需要将其一起放到lib/ext下,否则会出现ClassNotFound错误
-
如果在将jar放入lib/ext后,你还是无法找到你编写的类,且此时你是开着JMeter的,则需要重启一下JMeter
打开 Jmeter
,在添加 Java
请求时,注意要选择 Jmeter
测试类,下面的列表中可以看到参数和默认值。
下面我们将进行性能压测,设置线程组,设置10个并发线程。
服务端日志: