版权声明:本文为博主原创文章,未经博主允许不得转载。
最近对于流媒体技术比较感兴趣,虽然读书的时候学过相关方面的基础知识,但是大学上课,你懂得,一方面理论与实际脱节很严重,另一方面考试完全就是突击。学了和没学一样。好了,吐槽结束,书归正文。
研究流媒体技术的前提是先明白三个协议,RTSP,RTCP和RTP。关于这三种协议具体的定义百度上可以说是一抓一大把。总的来说, RTSP控制负责控制,包括创建,播放和暂停等操作,RTCP和RTP可以认为是一种协议,最大的区别 是RTCP中没有负载(payload,也就是媒体数据流),RTP则包含了负载。RTCP主要负责传输server和client的状态,如已经接收了多少数据,时间戳是什么,而RTP主要作用就是传输流媒体数据。
大部分对于RTSP都提到了这一个词:“RTSP是文本协议”,这句话是什么意思?通俗点说,如果你想告诉服务器你的名字,你首先构建一个类似于name="xxxxx"的字符串,然后把这个字符串转成byte[],经过SOCKET传给服务器,服务器就能够知道你的名字了。与之形成对比的是RTCP,RTCP规定了每个比特的每一位都代表什么,例如一个RTCP包的第一个比特的前两位代表版本,第三位用来填充,而第二个比特代表这次会话的序列号。坦率的说,实现RTCP协议可比RTSP烧脑多了。
回到RTSP这个话题,RTSP协议包含以下几种操作,option,describe,setup,play,pause和teardown。option是询问服务器你能提供什么方法,describe则是获取服务器的详细信息,setup是与服务器建立连接,服务器返回一个sessionid用来之后进行鉴权,play就是通知服务器可以发数据了,pause则是通知服务器暂停发数据,teardown,挥泪告别,さようなら。
如果你在百度上搜索过如下的关键字:RTSP Java。你会发现有人已经实现了RTSP协议,如果你真的使用了那份代码,恭喜你,你踩到坑啦。大部分转载的人并没有对转载的内容进行验证。我被网上的这份代码坑了号就,今天刚刚出坑,特此记录。
RTSPProtocal:RTSP协议类,主要负责创建RTSP文本
- public class RTSPProtocal {
- public static byte[] encodeOption(String address, String VERSION, int seq) {
- StringBuilder sb = new StringBuilder();
- sb.append("OPTIONS ");
- sb.append(address.substring(0, address.lastIndexOf("/")));
- sb.append(VERSION);
- sb.append("Cseq: ");
- sb.append(seq);
- sb.append("\r\n");
- sb.append("\r\n");
- System.out.println(sb.toString());
- //send(sb.toString().getBytes());
- return sb.toString().getBytes();
- }
- public static byte[] encodeDescribe(String address, String VERSION, int seq) {
- StringBuilder sb = new StringBuilder();
- sb.append("DESCRIBE ");
- sb.append(address);
- sb.append(VERSION);
- sb.append("Cseq: ");
- sb.append(seq);
- sb.append("\r\n");
- sb.append("\r\n");
- System.out.println(sb.toString());
- //send(sb.toString().getBytes());
- return sb.toString().getBytes();
- }
- public static byte[] encodeSetup(String address, String VERSION, String sessionid,
- int portOdd, int portEven, int seq, String trackInfo) {
- StringBuilder sb = new StringBuilder();
- sb.append("SETUP ");
- sb.append(address);
- sb.append("/");
- sb.append(trackInfo);
- sb.append(VERSION);
- sb.append("Cseq: ");
- sb.append(seq++);
- sb.append("\r\n");
- //"50002-50003"
- sb.append("Transport: RTP/AVP;UNICAST;client_port="+portEven+"-"+portOdd+";mode=play\r\n");
- sb.append("\r\n");
- System.out.println(sb.toString());
- System.out.println(sb.toString());
- //send(sb.toString().getBytes());
- return sb.toString().getBytes();
- }
- public static byte[] encodePlay(String address, String VERSION, String sessionid, int seq) {
- StringBuilder sb = new StringBuilder();
- sb.append("PLAY ");
- sb.append(address);
- sb.append(VERSION);
- sb.append("Session: ");
- sb.append(sessionid);
- sb.append("Cseq: ");
- sb.append(seq);
- sb.append("\r\n");
- sb.append("Range: npt=0.000-");
- sb.append("\r\n");
- sb.append("\r\n");
- System.out.println(sb.toString());
- //send(sb.toString().getBytes());
- return sb.toString().getBytes();
- }
- public static byte[] encodePause(String address, String VERSION, String sessionid, int seq) {
- StringBuilder sb = new StringBuilder();
- sb.append("PAUSE ");
- sb.append(address);
- sb.append("/");
- sb.append(VERSION);
- sb.append("Cseq: ");
- sb.append(seq);
- sb.append("\r\n");
- sb.append("Session: ");
- sb.append(sessionid);
- sb.append("\r\n");
- System.out.println(sb.toString());
- //send(sb.toString().getBytes());
- return sb.toString().getBytes();
- }
- public static byte[] encodeTeardown(String address, String VERSION, String sessionid, int seq) {
- StringBuilder sb = new StringBuilder();
- sb.append("TEARDOWN ");
- sb.append(address);
- sb.append("/");
- sb.append(VERSION);
- sb.append("Cseq: ");
- sb.append(seq);
- sb.append("\r\n");
- sb.append("User-Agent: LibVLC/2.2.1 (LIVE555 Streaming Media v2014.07.25)\r\n");
- sb.append("Session: ");
- sb.append(sessionid);
- sb.append("\r\n");
- System.out.println(sb.toString());
- return sb.toString().getBytes();
- //send(sb.toString().getBytes());
- //
- }
- }
RTSPClient:使用RTSPProtocal中的静态方法获取字符创,拥有发送和接收数据的功能
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.net.Socket;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.HashMap;
- import java.util.Map;
- public class RTSPClient {
- private static final int BUFFER_SIZE = 8192;
- private String localIpAddress;
- private String remoteIpAddress;
- private int localPort;
- private int localPortOdd;
- private int localPortEven;
- private int remoteIPort;
- private int remotePortOdd;
- private int remotePortEven;
- private Map<Integer, ReceiveSocket> map = new HashMap<>();
- public int getRemotePortOdd() {
- return remotePortOdd;
- }
- public void setRemotePortOdd(int remotePortOdd) {
- this.remotePortOdd = remotePortOdd;
- }
- public int getRemotePortEven() {
- return remotePortEven;
- }
- public void setRemotePortEven(int remotePortEven) {
- this.remotePortEven = remotePortEven;
- }
- public void addSocket(Integer port, ReceiveSocket socket){
- map.put(port, socket);
- }
- private String rtspAddress;
- private Socket tcpSocket;
- private SocketChannel socketChannel;
- private Selector selector;
- public String getLocalIpAddress() {
- return localIpAddress;
- }
- public void setLocalIpAddress(String localIpAddress) {
- this.localIpAddress = localIpAddress;
- }
- public int getLocalPort() {
- return localPort;
- }
- public void setLocalPort(int localPort) {
- this.localPort = localPort;
- }
- public int getLocalPortOdd() {
- return localPortOdd;
- }
- public void setLocalPortOdd(int localPortOdd) {
- this.localPortOdd = localPortOdd;
- }
- public int getLocalPortEven() {
- return localPortEven;
- }
- public void setLocalPortEven(int localPortEven) {
- this.localPortEven = localPortEven;
- }
- public String getRtspAddress() {
- return rtspAddress;
- }
- public void setRtspAddress(String rtspAddress) {
- this.rtspAddress = rtspAddress;
- }
- public Socket getTcpSocket() {
- return tcpSocket;
- }
- public void setTcpSocket(Socket tcpSocket) {
- this.tcpSocket = tcpSocket;
- }
- public String getRemoteIpAddress() {
- return remoteIpAddress;
- }
- public void setRemoteIpAddress(String remoteIpAddress) {
- this.remoteIpAddress = remoteIpAddress;
- }
- public int getRemoteIPort() {
- return remoteIPort;
- }
- public void setRemoteIPort(int remoteIPort) {
- this.remoteIPort = remoteIPort;
- }
- public Selector getSelector() {
- return selector;
- }
- public void setSelector(Selector selector) {
- this.selector = selector;
- }
- //new InetSocketAddress(
- //remoteIp, 554),
- //new InetSocketAddress("192.168.31.106", 0),
- //"rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp"
- public void inital() throws IOException{
- socketChannel = SocketChannel.open();
- socketChannel.socket().setSoTimeout(30000);
- socketChannel.configureBlocking(false);
- InetSocketAddress localAddress = new InetSocketAddress(this.localIpAddress, localPort);
- InetSocketAddress remoteAddress=new InetSocketAddress(this.remoteIpAddress, 554);
- socketChannel.socket().bind(localAddress);
- if (socketChannel.connect(remoteAddress)) {
- System.out.println("开始建立连接:" + remoteAddress);
- }
- if (selector == null) {
- // 创建新的Selector
- try {
- selector = Selector.open();
- } catch (final IOException e) {
- e.printStackTrace();
- }
- }
- socketChannel.register(selector, SelectionKey.OP_CONNECT
- | SelectionKey.OP_READ | SelectionKey.OP_WRITE, this);
- System.out.println("端口打开成功");
- }
- public void write(byte[] out) throws IOException {
- if (out == null || out.length < 1) {
- return;
- }
- System.out.println(out.toString());
- ByteBuffer sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
- sendBuf.clear();
- sendBuf.put(out);
- sendBuf.flip();
- if (isConnected()) {
- try {
- socketChannel.write(sendBuf);
- } catch (final IOException e) {
- }
- } else {
- System.out.println("通道为空或者没有连接上");
- }
- }
- public boolean isConnected() {
- return socketChannel != null && socketChannel.isConnected();
- }
- public byte[] receive() {
- if (isConnected()) {
- try {
- int len = 0;
- int readBytes = 0;
- ByteBuffer receiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
- synchronized (receiveBuf) {
- receiveBuf.clear();
- try {
- while ((len = socketChannel.read(receiveBuf)) > 0) {
- readBytes += len;
- }
- } finally {
- receiveBuf.flip();
- }
- if (readBytes > 0) {
- final byte[] tmp = new byte[readBytes];
- receiveBuf.get(tmp);
- return tmp;
- } else {
- System.out.println("接收到数据为空,重新启动连接");
- return null;
- }
- }
- } catch (final IOException e) {
- System.out.println("接收消息错误:");
- }
- } else {
- System.out.println("端口没有连接");
- }
- return null;
- }
- /*
- * 非常重要
- * */
- public void sendBeforePlay(){
- ReceiveSocket socketEven = map.get(this.localPortEven);
- ReceiveSocket socketOdd = map.get(this.localPortOdd);
- if(socketEven == null){
- socketEven = new ReceiveSocket(this.localIpAddress,this.localPortEven);
- map.put(this.localPortEven, socketEven);
- }
- if(socketOdd == null){
- socketEven = new ReceiveSocket(this.localIpAddress, this.localPortOdd);
- map.put(this.localPortOdd, socketOdd);
- }
- byte[] bytes = new byte[1];
- bytes[0]=0;
- try {
- socketEven.send(bytes, this.remoteIpAddress, this.remotePortEven);
- socketOdd.send(bytes, this.remoteIpAddress, this.remotePortOdd);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return;
- }
- public void reConnect(SelectionKey key) throws IOException {
- if (isConnected()) {
- return;
- }
- // 完成SocketChannel的连接
- socketChannel.finishConnect();
- while (!socketChannel.isConnected()) {
- try {
- Thread.sleep(300);
- } catch (final InterruptedException e) {
- e.printStackTrace();
- }
- socketChannel.finishConnect();
- }
- }
- }
ReceiveSocket:用来接收服务器发来的RTP和RTCP协议数据,只是简单地对UDP进行了包装而已
- import java.io.IOException;
- import java.net.DatagramPacket;
- import java.net.DatagramSocket;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.net.SocketException;
- import java.net.UnknownHostException;
- public class ReceiveSocket implements Runnable{
- private DatagramSocket ds;
- public ReceiveSocket(String localAddress, int port){
- try {
- InetSocketAddress addr = new InetSocketAddress("192.168.31.106", port);
- ds = new DatagramSocket(addr);//监听16264端口
- } catch (SocketException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void run() {
- // TODO Auto-generated method stub
- while(true){
- byte[] buf = new byte[20];
- DatagramPacket dp = new DatagramPacket(buf,buf.length);
- try
- {
- ds.receive(dp);
- String ip = dp.getAddress().getHostAddress(); //数据提取
- String data = new String(dp.getData(),0,dp.getLength());
- int port = dp.getPort();
- System.out.println(data+"."+port+".."+ip);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- public void send(byte[] buf, String ip, int rec_port) throws IOException {
- // TODO Auto-generated method stub
- DatagramPacket dp = new DatagramPacket(buf,buf.length,InetAddress.getByName(ip),rec_port);//10000为定义的端口
- ds.send(dp);
- //ds.close();
- }
- }
PlayerClient:播放类,通过不同状态之间的相互转化完成RTSP协议的交互工作。这里有一点非常关键:请注意setup这个状态,在和服务器建立连接之后,如果直接发送PLAY请求,服务器不会向指定的端口发送RTCP数据(这个问题困扰了我一晚上)。因此在发送PLAY请求之前,client接收RTCP和RTP的两个端口必须先向服务器的RTCP和RTP端口发送任意的数据,发送方式为UDP,服务器在setup操作时已经返回RTCP和RTP的端口信息。具体的实现参考sendBeforePlay()。我在网上没有找到这么操作的原因,这还是通过wireshark对VLC进行抓包才发现这个隐藏逻辑。
- import java.io.IOException;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.util.Iterator;
- public class PlayerClient {
- private RTSPClient rtspClient = new RTSPClient();
- private static final String VERSION = " RTSP/1.0\r\n";
- private static final String RTSP_OK = "RTSP/1.0 200 OK";
- private Selector selector;
- private enum Status {
- init, options, describe, setup, play, pause, teardown
- }
- private Status sysStatus = Status.init;
- private String rtspAddress = "rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp";
- private String localAddress = "192.168.31.106";
- private int localPort=0;
- private String remoteAddress = "218.204.223.237";
- private int count=0;
- private String sessionid;
- private String trackInfo;
- private boolean isSended=true;
- private int localPortOdd=50002;
- private int localPortEven=50003;
- private ReceiveSocket socket1 = new ReceiveSocket(localAddress,localPortOdd);
- private ReceiveSocket socket2 = new ReceiveSocket(localAddress,localPortEven);
- public void init(){
- rtspClient.setLocalIpAddress(localAddress);
- rtspClient.setLocalPort(localPort);
- rtspClient.setRemoteIpAddress(remoteAddress);
- rtspClient.setRemoteIPort(554);
- rtspClient.setRtspAddress(rtspAddress);
- rtspClient.setLocalPortEven(this.localPortEven);
- rtspClient.setLocalPortOdd(this.localPortOdd);
- rtspClient.addSocket(this.localPortOdd, socket1);
- rtspClient.addSocket(this.localPortEven, socket2);
- try
- {
- rtspClient.inital();
- } catch (IOException e) {
- e.printStackTrace();
- }
- this.selector = rtspClient.getSelector();
- new Thread(socket1).start();
- new Thread(socket2).start();
- }
- public void run() throws IOException{
- int seq=2;
- while(true){
- if(rtspClient.isConnected() && isSended){
- switch (sysStatus) {
- case init:
- byte[] message = RTSPProtocal.encodeOption(this.rtspAddress, this.VERSION, seq);
- this.rtspClient.write(message);
- break;
- case options:
- seq++;
- message = RTSPProtocal.encodeDescribe(this.rtspAddress, this.VERSION, seq);
- this.rtspClient.write(message);
- break;
- case describe:
- seq++;
- message = RTSPProtocal.encodeSetup(this.rtspAddress, VERSION, sessionid,
- localPortEven, localPortOdd,seq, trackInfo);
- this.rtspClient.write(message);
- break;
- case setup:
- if(sessionid==null&&sessionid.length()>0){
- System.out.println("setup还没有正常返回");
- }else{
- seq++;
- message = RTSPProtocal.encodePlay(this.rtspAddress, VERSION, sessionid, seq);
- this.rtspClient.write(message);
- }
- break;
- case play:
- count++;
- System.out.println("count: "+count);
- break;
- case pause:
- break;
- default:
- break;
- }
- isSended=false;
- }
- else{
- }
- select();
- }
- }
- private void handle(byte[] msg) {
- String tmp = new String(msg);
- System.out.println("返回内容:"+tmp);
- if (tmp.startsWith(RTSP_OK)) {
- switch (sysStatus) {
- case init:
- sysStatus = Status.options;
- System.out.println("option ok");
- isSended=true;
- break;
- case options:
- sysStatus = Status.describe;
- trackInfo=tmp.substring(tmp.indexOf("trackID"));
- System.out.println("describe ok");
- isSended=true;
- break;
- case describe:
- sessionid = tmp.substring(tmp.indexOf("Session: ") + 9, tmp
- .indexOf("Date:"));
- int index = tmp.indexOf("server_port=");
- String serverPort1 = tmp.substring(tmp.indexOf("server_port=") + 12, tmp
- .indexOf("-", index));
- String serverPort2 = tmp.substring(tmp.indexOf("-", index) + 1, tmp
- .indexOf("\r\n", index));
- this.rtspClient.setRemotePortEven(Integer.valueOf(serverPort1));
- this.rtspClient.setRemotePortOdd(Integer.valueOf(serverPort2));
- if(sessionid!=null&&sessionid.length()>0){
- sysStatus = Status.setup;
- System.out.println("setup ok");
- }
- isSended=true;
- break;
- case setup:
- sysStatus = Status.play;
- System.out.println("play ok");
- this.rtspClient.sendBeforePlay();
- this.rtspClient.sendBeforePlay();
- isSended=true;
- break;
- case play:
- //sysStatus = Status.pause;
- System.out.println("pause ok");
- isSended=true;
- break;
- case pause:
- sysStatus = Status.teardown;
- System.out.println("teardown ok");
- isSended=true;
- //shutdown.set(true);
- break;
- case teardown:
- sysStatus = Status.init;
- System.out.println("exit start");
- isSended=true;
- break;
- default:
- break;
- }
- } else {
- System.out.println("返回错误:" + tmp);
- }
- }
- private void select() {
- int n = 0;
- try
- {
- if (selector == null) {
- return;
- }
- n = selector.select(1000);
- } catch (final Exception e) {
- e.printStackTrace();
- }
- // 如果select返回大于0,处理事件
- if (n > 0) {
- for (final Iterator<SelectionKey> i = selector.selectedKeys()
- .iterator(); i.hasNext();) {
- // 得到下一个Key
- final SelectionKey sk = i.next();
- i.remove();
- // 检查其是否还有效
- if (!sk.isValid()) {
- continue;
- }
- if (sk.isReadable()) {
- byte[] message = rtspClient.receive();
- handle(message);
- }
- if (sk.isConnectable()) {
- try {
- rtspClient.reConnect(sk);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
Test:测试类
- public class Test {
- public static void main(String[] args){
- PlayerClient player = new PlayerClient();
- player.init();
- try
- {
- player.run();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
只要在ReceiveSocket的run方法中打断点,你就会发现源源不断的数据向你发来,是不是感觉很爽,哈哈哈。