一.thrift可以做什么
thrift是一个跨语言通信的工具,支持的语言多,而且还提供服务器端的众多网络模型,使服务端的开发可以只专于服务本身的逻辑。
二.thrift重要概念
1.processor
实际服务器端业务逻辑处理的实现类。
2.transport
TTransport主要处理服务器端和客户端的网络读写。主要的接口
- open
- close
- read
- write
- flush
例如常用的客户端是TSocket就是TTransport的一个实现
在服务器端当有请求来的时候,通过TServerTransport可以创建TTransport对象,然后通过TTransport发送数据给客户端,主要的接口
- open
- listen
- accept
- close
例如TNonblockingTransport,TSocket等。ServerTransport可以理解为主要实现了accept以后要做的事情,到底是采用何种方式(同步异步,阻塞非阻塞)把请求交给processor处理。
3.protocol
传输协议,thrift客户端和服务器端远程调用时候传输数据时候采取的数据格式,例如
TBinaryProtocol 二进制格式
TCompactProtocol 高效和压缩的二进制格式
TDenseProtocoal 与TCompactProtocol相比,meta信息略有不同
TJSONProtocoal JSON
TDebugProtocoal text 格式 方便调试
4.server
服务器的作用就是把前面的processor,transport,protocol组合到一起,协同他们的工作,例如:
TSimplerServer 接受一个连接,处理连接请求,直到客户端关闭了连接,它才回去接受一个新的连接。
TNonblockingServer 使用非阻塞的I/O解决了TSimpleServer一个客户端阻塞其他所有客户端的问题。
THsHaServer(半同步/半异步的server)它使用一个单独的线程来处理网络I/O,一个独立的worker线程池来处理消息。这样,只要有空闲的worker线程,消息就会被立即处理,因此多条消息能被并行处理。
TThreadPoolServer 有一个专用的线程用来接受连接。一旦接受了一个连接,它就会被放入ThreadPoolExecutor中的一个worker线程里处理。
注意:使用何种server和需要使用某种server transport,transport可能是有要求的。
例如:TNonblockingServer.Args的构造方法中
public Args(org.apache.thrift.transport.TNonblockingServerTransport transport)
可以看出使用TNonblockingServer需要使用TNonblockingServerTransport才行。
例如:使用THsHaServer时候server和client都需要设置transport为TFramedTransport
三.demo 编写一个ID生成器
下面编写的全局ID生成器是https://github.com/twitter/snowflake的简单版本,删减了通过zookeeper保证workerid的不重复。基本原理相似,22位时间戳+10位workerid+12位毫秒级的计数器。
还一些其他的ID生成方案,例如UUID,通过MYSQL的REPLACE INTO等,可用于分库以后做主键。
step 1.编写thrift interface definition language文件
idserver.thrift
namespace java demo
service IDService {
i64 getId()
}
step 2.通过thrift工具生成server端和client端的代码
thrift -r --gen java idserver.thrift
step 3.编写业务逻辑实现类
idworker.java
import org.apache.thrift.TException; public class IDWorker implements IDService.Iface { private long lastTimestamp; private long timeabs = 1288834974657L;
//12 bits
private long sequence = 0l;
//10 bits
private long workerId; public IDWorker(long workerId) {
this.workerId = workerId;
} @Override
public long getId() throws TException {
long timestamp = System.currentTimeMillis();
if (timestamp < this.lastTimestamp) {
return -1;
} else if (timestamp == this.lastTimestamp) {
if (this.sequence < 2048) {
this.sequence = ++this.sequence;
System.out.println(this.sequence);
} else {
while (timestamp <= this.lastTimestamp) {
timestamp = System.currentTimeMillis();
}
}
} else {
this.sequence = 0;
}
this.lastTimestamp = timestamp;
return ((timestamp-timeabs) << 22) | this.workerId << 12 | this.sequence;
}
}
step 3.编写thrift服务器端主程序IDServer.java
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;
public class IDServer { /**
* 半同步/半异步的server。它使用一个单独的线程来处理网络I/O,一个独立的worker线程池来处理消息。
*/
public void startTHsHaServer() {
try {
TProcessor processor = new IDService.Processor<IDService.Iface>(new IDWorker(1l));
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(8080);
THsHaServer.Args args = new THsHaServer.Args(serverTransport);
args.processorFactory(new TProcessorFactory(processor));
args.protocolFactory(new TBinaryProtocol.Factory());
args.transportFactory(new TFramedTransport.Factory());
args.workerThreads(2);
TServer server = new THsHaServer(args);
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
} public static void main(String args[]) {
new IDServer().startTHsHaServer();
}
}
step 4.编写thrift客户端调用代码
ClientDemo.java
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport; public class ClientDemo extends Thread { @Override
public void run() {
super.run();
this.startClient();
} public void startClient() {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket("localhost", 8080, 30000));
TProtocol protocol = new TBinaryProtocol(transport);
IDService.Client client = new IDService.Client(protocol);
transport.open();
System.out.println("service result:"+client.getId());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (transport != null) {
transport.close();
}
}
} public static void main(String args[]) {
for (int i=0; i<1000; i++) {
new ClientDemo().start();
}
} }
参考资料