流媒体技术之RTSP

2016-06-19 18:48 38人阅读 评论(0) 收藏 举报
流媒体技术之RTSP 分类:
流媒体相关技术

版权声明:本文为博主原创文章,未经博主允许不得转载。

最近对于流媒体技术比较感兴趣,虽然读书的时候学过相关方面的基础知识,但是大学上课,你懂得,一方面理论与实际脱节很严重,另一方面考试完全就是突击。学了和没学一样。好了,吐槽结束,书归正文。

研究流媒体技术的前提是先明白三个协议,RTSP,RTCP和RTP。关于这三种协议具体的定义百度上可以说是一抓一大把。总的来说, RTSP控制负责控制,包括创建,播放和暂停等操作,RTCP和RTP可以认为是一种协议,最大的区别 是RTCP中没有负载(payload,也就是媒体数据流),RTP则包含了负载。RTCP主要负责传输server和client的状态,如已经接收了多少数据,时间戳是什么,而RTP主要作用就是传输流媒体数据。

大部分对于RTSP都提到了这一个词:“RTSP是文本协议”,这句话是什么意思?通俗点说,如果你想告诉服务器你的名字,你首先构建一个类似于name="xxxxx"的字符串,然后把这个字符串转成byte[],经过SOCKET传给服务器,服务器就能够知道你的名字了。与之形成对比的是RTCP,RTCP规定了每个比特的每一位都代表什么,例如一个RTCP包的第一个比特的前两位代表版本,第三位用来填充,而第二个比特代表这次会话的序列号。坦率的说,实现RTCP协议可比RTSP烧脑多了。

回到RTSP这个话题,RTSP协议包含以下几种操作,option,describe,setup,play,pause和teardown。option是询问服务器你能提供什么方法,describe则是获取服务器的详细信息,setup是与服务器建立连接,服务器返回一个sessionid用来之后进行鉴权,play就是通知服务器可以发数据了,pause则是通知服务器暂停发数据,teardown,挥泪告别,さようなら。

如果你在百度上搜索过如下的关键字:RTSP  Java。你会发现有人已经实现了RTSP协议,如果你真的使用了那份代码,恭喜你,你踩到坑啦。大部分转载的人并没有对转载的内容进行验证。我被网上的这份代码坑了号就,今天刚刚出坑,特此记录。

RTSPProtocal:RTSP协议类,主要负责创建RTSP文本

  1. public class RTSPProtocal {
  2. public static byte[] encodeOption(String address, String VERSION, int seq) {
  3. StringBuilder sb = new StringBuilder();
  4. sb.append("OPTIONS ");
  5. sb.append(address.substring(0, address.lastIndexOf("/")));
  6. sb.append(VERSION);
  7. sb.append("Cseq: ");
  8. sb.append(seq);
  9. sb.append("\r\n");
  10. sb.append("\r\n");
  11. System.out.println(sb.toString());
  12. //send(sb.toString().getBytes());
  13. return sb.toString().getBytes();
  14. }
  15. public static byte[] encodeDescribe(String address, String VERSION, int seq) {
  16. StringBuilder sb = new StringBuilder();
  17. sb.append("DESCRIBE ");
  18. sb.append(address);
  19. sb.append(VERSION);
  20. sb.append("Cseq: ");
  21. sb.append(seq);
  22. sb.append("\r\n");
  23. sb.append("\r\n");
  24. System.out.println(sb.toString());
  25. //send(sb.toString().getBytes());
  26. return sb.toString().getBytes();
  27. }
  28. public static byte[] encodeSetup(String address, String VERSION, String sessionid,
  29. int portOdd, int portEven, int seq, String trackInfo) {
  30. StringBuilder sb = new StringBuilder();
  31. sb.append("SETUP ");
  32. sb.append(address);
  33. sb.append("/");
  34. sb.append(trackInfo);
  35. sb.append(VERSION);
  36. sb.append("Cseq: ");
  37. sb.append(seq++);
  38. sb.append("\r\n");
  39. //"50002-50003"
  40. sb.append("Transport: RTP/AVP;UNICAST;client_port="+portEven+"-"+portOdd+";mode=play\r\n");
  41. sb.append("\r\n");
  42. System.out.println(sb.toString());
  43. System.out.println(sb.toString());
  44. //send(sb.toString().getBytes());
  45. return sb.toString().getBytes();
  46. }
  47. public static byte[] encodePlay(String address, String VERSION, String sessionid, int seq) {
  48. StringBuilder sb = new StringBuilder();
  49. sb.append("PLAY ");
  50. sb.append(address);
  51. sb.append(VERSION);
  52. sb.append("Session: ");
  53. sb.append(sessionid);
  54. sb.append("Cseq: ");
  55. sb.append(seq);
  56. sb.append("\r\n");
  57. sb.append("Range: npt=0.000-");
  58. sb.append("\r\n");
  59. sb.append("\r\n");
  60. System.out.println(sb.toString());
  61. //send(sb.toString().getBytes());
  62. return sb.toString().getBytes();
  63. }
  64. public static byte[] encodePause(String address, String VERSION, String sessionid, int seq) {
  65. StringBuilder sb = new StringBuilder();
  66. sb.append("PAUSE ");
  67. sb.append(address);
  68. sb.append("/");
  69. sb.append(VERSION);
  70. sb.append("Cseq: ");
  71. sb.append(seq);
  72. sb.append("\r\n");
  73. sb.append("Session: ");
  74. sb.append(sessionid);
  75. sb.append("\r\n");
  76. System.out.println(sb.toString());
  77. //send(sb.toString().getBytes());
  78. return sb.toString().getBytes();
  79. }
  80. public static byte[] encodeTeardown(String address, String VERSION, String sessionid, int seq) {
  81. StringBuilder sb = new StringBuilder();
  82. sb.append("TEARDOWN ");
  83. sb.append(address);
  84. sb.append("/");
  85. sb.append(VERSION);
  86. sb.append("Cseq: ");
  87. sb.append(seq);
  88. sb.append("\r\n");
  89. sb.append("User-Agent: LibVLC/2.2.1 (LIVE555 Streaming Media v2014.07.25)\r\n");
  90. sb.append("Session: ");
  91. sb.append(sessionid);
  92. sb.append("\r\n");
  93. System.out.println(sb.toString());
  94. return sb.toString().getBytes();
  95. //send(sb.toString().getBytes());
  96. //
  97. }
  98. }

RTSPClient:使用RTSPProtocal中的静态方法获取字符创,拥有发送和接收数据的功能

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.net.Socket;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. public class RTSPClient {
  11. private static final int BUFFER_SIZE = 8192;
  12. private String localIpAddress;
  13. private String remoteIpAddress;
  14. private int localPort;
  15. private int localPortOdd;
  16. private int localPortEven;
  17. private int remoteIPort;
  18. private int remotePortOdd;
  19. private int remotePortEven;
  20. private Map<Integer, ReceiveSocket> map = new HashMap<>();
  21. public int getRemotePortOdd() {
  22. return remotePortOdd;
  23. }
  24. public void setRemotePortOdd(int remotePortOdd) {
  25. this.remotePortOdd = remotePortOdd;
  26. }
  27. public int getRemotePortEven() {
  28. return remotePortEven;
  29. }
  30. public void setRemotePortEven(int remotePortEven) {
  31. this.remotePortEven = remotePortEven;
  32. }
  33. public void addSocket(Integer port, ReceiveSocket socket){
  34. map.put(port, socket);
  35. }
  36. private String rtspAddress;
  37. private Socket tcpSocket;
  38. private SocketChannel socketChannel;
  39. private Selector selector;
  40. public String getLocalIpAddress() {
  41. return localIpAddress;
  42. }
  43. public void setLocalIpAddress(String localIpAddress) {
  44. this.localIpAddress = localIpAddress;
  45. }
  46. public int getLocalPort() {
  47. return localPort;
  48. }
  49. public void setLocalPort(int localPort) {
  50. this.localPort = localPort;
  51. }
  52. public int getLocalPortOdd() {
  53. return localPortOdd;
  54. }
  55. public void setLocalPortOdd(int localPortOdd) {
  56. this.localPortOdd = localPortOdd;
  57. }
  58. public int getLocalPortEven() {
  59. return localPortEven;
  60. }
  61. public void setLocalPortEven(int localPortEven) {
  62. this.localPortEven = localPortEven;
  63. }
  64. public String getRtspAddress() {
  65. return rtspAddress;
  66. }
  67. public void setRtspAddress(String rtspAddress) {
  68. this.rtspAddress = rtspAddress;
  69. }
  70. public Socket getTcpSocket() {
  71. return tcpSocket;
  72. }
  73. public void setTcpSocket(Socket tcpSocket) {
  74. this.tcpSocket = tcpSocket;
  75. }
  76. public String getRemoteIpAddress() {
  77. return remoteIpAddress;
  78. }
  79. public void setRemoteIpAddress(String remoteIpAddress) {
  80. this.remoteIpAddress = remoteIpAddress;
  81. }
  82. public int getRemoteIPort() {
  83. return remoteIPort;
  84. }
  85. public void setRemoteIPort(int remoteIPort) {
  86. this.remoteIPort = remoteIPort;
  87. }
  88. public Selector getSelector() {
  89. return selector;
  90. }
  91. public void setSelector(Selector selector) {
  92. this.selector = selector;
  93. }
  94. //new InetSocketAddress(
  95. //remoteIp, 554),
  96. //new InetSocketAddress("192.168.31.106", 0),
  97. //"rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp"
  98. public void inital() throws IOException{
  99. socketChannel = SocketChannel.open();
  100. socketChannel.socket().setSoTimeout(30000);
  101. socketChannel.configureBlocking(false);
  102. InetSocketAddress localAddress = new InetSocketAddress(this.localIpAddress, localPort);
  103. InetSocketAddress remoteAddress=new InetSocketAddress(this.remoteIpAddress, 554);
  104. socketChannel.socket().bind(localAddress);
  105. if (socketChannel.connect(remoteAddress)) {
  106. System.out.println("开始建立连接:" + remoteAddress);
  107. }
  108. if (selector == null) {
  109. // 创建新的Selector
  110. try {
  111. selector = Selector.open();
  112. } catch (final IOException e) {
  113. e.printStackTrace();
  114. }
  115. }
  116. socketChannel.register(selector, SelectionKey.OP_CONNECT
  117. | SelectionKey.OP_READ | SelectionKey.OP_WRITE, this);
  118. System.out.println("端口打开成功");
  119. }
  120. public void write(byte[] out) throws IOException {
  121. if (out == null || out.length < 1) {
  122. return;
  123. }
  124. System.out.println(out.toString());
  125. ByteBuffer sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
  126. sendBuf.clear();
  127. sendBuf.put(out);
  128. sendBuf.flip();
  129. if (isConnected()) {
  130. try {
  131. socketChannel.write(sendBuf);
  132. } catch (final IOException e) {
  133. }
  134. } else {
  135. System.out.println("通道为空或者没有连接上");
  136. }
  137. }
  138. public boolean isConnected() {
  139. return socketChannel != null && socketChannel.isConnected();
  140. }
  141. public byte[] receive() {
  142. if (isConnected()) {
  143. try {
  144. int len = 0;
  145. int readBytes = 0;
  146. ByteBuffer receiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
  147. synchronized (receiveBuf) {
  148. receiveBuf.clear();
  149. try {
  150. while ((len = socketChannel.read(receiveBuf)) > 0) {
  151. readBytes += len;
  152. }
  153. } finally {
  154. receiveBuf.flip();
  155. }
  156. if (readBytes > 0) {
  157. final byte[] tmp = new byte[readBytes];
  158. receiveBuf.get(tmp);
  159. return tmp;
  160. } else {
  161. System.out.println("接收到数据为空,重新启动连接");
  162. return null;
  163. }
  164. }
  165. } catch (final IOException e) {
  166. System.out.println("接收消息错误:");
  167. }
  168. } else {
  169. System.out.println("端口没有连接");
  170. }
  171. return null;
  172. }
  173. /*
  174. * 非常重要
  175. * */
  176. public void sendBeforePlay(){
  177. ReceiveSocket socketEven = map.get(this.localPortEven);
  178. ReceiveSocket socketOdd = map.get(this.localPortOdd);
  179. if(socketEven == null){
  180. socketEven = new ReceiveSocket(this.localIpAddress,this.localPortEven);
  181. map.put(this.localPortEven, socketEven);
  182. }
  183. if(socketOdd == null){
  184. socketEven = new ReceiveSocket(this.localIpAddress, this.localPortOdd);
  185. map.put(this.localPortOdd, socketOdd);
  186. }
  187. byte[] bytes = new byte[1];
  188. bytes[0]=0;
  189. try {
  190. socketEven.send(bytes, this.remoteIpAddress, this.remotePortEven);
  191. socketOdd.send(bytes, this.remoteIpAddress, this.remotePortOdd);
  192. } catch (IOException e) {
  193. e.printStackTrace();
  194. }
  195. return;
  196. }
  197. public void reConnect(SelectionKey key) throws IOException {
  198. if (isConnected()) {
  199. return;
  200. }
  201. // 完成SocketChannel的连接
  202. socketChannel.finishConnect();
  203. while (!socketChannel.isConnected()) {
  204. try {
  205. Thread.sleep(300);
  206. } catch (final InterruptedException e) {
  207. e.printStackTrace();
  208. }
  209. socketChannel.finishConnect();
  210. }
  211. }
  212. }

ReceiveSocket:用来接收服务器发来的RTP和RTCP协议数据,只是简单地对UDP进行了包装而已

  1. import java.io.IOException;
  2. import java.net.DatagramPacket;
  3. import java.net.DatagramSocket;
  4. import java.net.InetAddress;
  5. import java.net.InetSocketAddress;
  6. import java.net.SocketException;
  7. import java.net.UnknownHostException;
  8. public class ReceiveSocket implements Runnable{
  9. private DatagramSocket ds;
  10. public ReceiveSocket(String localAddress, int port){
  11. try {
  12. InetSocketAddress addr = new InetSocketAddress("192.168.31.106", port);
  13. ds = new DatagramSocket(addr);//监听16264端口
  14. } catch (SocketException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. @Override
  19. public void run() {
  20. // TODO Auto-generated method stub
  21. while(true){
  22. byte[] buf = new byte[20];
  23. DatagramPacket dp = new DatagramPacket(buf,buf.length);
  24. try
  25. {
  26. ds.receive(dp);
  27. String ip = dp.getAddress().getHostAddress();   //数据提取
  28. String data = new String(dp.getData(),0,dp.getLength());
  29. int port = dp.getPort();
  30. System.out.println(data+"."+port+".."+ip);
  31. } catch (IOException e) {
  32. // TODO Auto-generated catch block
  33. e.printStackTrace();
  34. }
  35. }
  36. }
  37. public void send(byte[] buf, String ip, int rec_port) throws IOException {
  38. // TODO Auto-generated method stub
  39. DatagramPacket dp = new DatagramPacket(buf,buf.length,InetAddress.getByName(ip),rec_port);//10000为定义的端口
  40. ds.send(dp);
  41. //ds.close();
  42. }
  43. }

PlayerClient:播放类,通过不同状态之间的相互转化完成RTSP协议的交互工作。这里有一点非常关键:请注意setup这个状态,在和服务器建立连接之后,如果直接发送PLAY请求,服务器不会向指定的端口发送RTCP数据(这个问题困扰了我一晚上)。因此在发送PLAY请求之前,client接收RTCP和RTP的两个端口必须先向服务器的RTCP和RTP端口发送任意的数据,发送方式为UDP,服务器在setup操作时已经返回RTCP和RTP的端口信息。具体的实现参考sendBeforePlay()。我在网上没有找到这么操作的原因,这还是通过wireshark对VLC进行抓包才发现这个隐藏逻辑。

  1. import java.io.IOException;
  2. import java.nio.channels.SelectionKey;
  3. import java.nio.channels.Selector;
  4. import java.util.Iterator;
  5. public class PlayerClient {
  6. private RTSPClient rtspClient = new RTSPClient();
  7. private static final String VERSION = " RTSP/1.0\r\n";
  8. private static final String RTSP_OK = "RTSP/1.0 200 OK";
  9. private Selector selector;
  10. private enum Status {
  11. init, options, describe, setup, play, pause, teardown
  12. }
  13. private Status sysStatus = Status.init;
  14. private String rtspAddress = "rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp";
  15. private String localAddress = "192.168.31.106";
  16. private int localPort=0;
  17. private String remoteAddress = "218.204.223.237";
  18. private int count=0;
  19. private String sessionid;
  20. private String trackInfo;
  21. private boolean isSended=true;
  22. private int localPortOdd=50002;
  23. private int localPortEven=50003;
  24. private ReceiveSocket socket1 = new ReceiveSocket(localAddress,localPortOdd);
  25. private ReceiveSocket socket2 = new ReceiveSocket(localAddress,localPortEven);
  26. public void init(){
  27. rtspClient.setLocalIpAddress(localAddress);
  28. rtspClient.setLocalPort(localPort);
  29. rtspClient.setRemoteIpAddress(remoteAddress);
  30. rtspClient.setRemoteIPort(554);
  31. rtspClient.setRtspAddress(rtspAddress);
  32. rtspClient.setLocalPortEven(this.localPortEven);
  33. rtspClient.setLocalPortOdd(this.localPortOdd);
  34. rtspClient.addSocket(this.localPortOdd, socket1);
  35. rtspClient.addSocket(this.localPortEven, socket2);
  36. try
  37. {
  38. rtspClient.inital();
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. }
  42. this.selector = rtspClient.getSelector();
  43. new Thread(socket1).start();
  44. new Thread(socket2).start();
  45. }
  46. public void run() throws IOException{
  47. int seq=2;
  48. while(true){
  49. if(rtspClient.isConnected() && isSended){
  50. switch (sysStatus) {
  51. case init:
  52. byte[] message = RTSPProtocal.encodeOption(this.rtspAddress, this.VERSION, seq);
  53. this.rtspClient.write(message);
  54. break;
  55. case options:
  56. seq++;
  57. message = RTSPProtocal.encodeDescribe(this.rtspAddress, this.VERSION, seq);
  58. this.rtspClient.write(message);
  59. break;
  60. case describe:
  61. seq++;
  62. message = RTSPProtocal.encodeSetup(this.rtspAddress, VERSION, sessionid,
  63. localPortEven, localPortOdd,seq, trackInfo);
  64. this.rtspClient.write(message);
  65. break;
  66. case setup:
  67. if(sessionid==null&&sessionid.length()>0){
  68. System.out.println("setup还没有正常返回");
  69. }else{
  70. seq++;
  71. message = RTSPProtocal.encodePlay(this.rtspAddress, VERSION, sessionid, seq);
  72. this.rtspClient.write(message);
  73. }
  74. break;
  75. case play:
  76. count++;
  77. System.out.println("count: "+count);
  78. break;
  79. case pause:
  80. break;
  81. default:
  82. break;
  83. }
  84. isSended=false;
  85. }
  86. else{
  87. }
  88. select();
  89. }
  90. }
  91. private void handle(byte[] msg) {
  92. String tmp = new String(msg);
  93. System.out.println("返回内容:"+tmp);
  94. if (tmp.startsWith(RTSP_OK)) {
  95. switch (sysStatus) {
  96. case init:
  97. sysStatus = Status.options;
  98. System.out.println("option ok");
  99. isSended=true;
  100. break;
  101. case options:
  102. sysStatus = Status.describe;
  103. trackInfo=tmp.substring(tmp.indexOf("trackID"));
  104. System.out.println("describe ok");
  105. isSended=true;
  106. break;
  107. case describe:
  108. sessionid = tmp.substring(tmp.indexOf("Session: ") + 9, tmp
  109. .indexOf("Date:"));
  110. int index = tmp.indexOf("server_port=");
  111. String serverPort1 = tmp.substring(tmp.indexOf("server_port=") + 12, tmp
  112. .indexOf("-", index));
  113. String serverPort2 = tmp.substring(tmp.indexOf("-", index) + 1, tmp
  114. .indexOf("\r\n", index));
  115. this.rtspClient.setRemotePortEven(Integer.valueOf(serverPort1));
  116. this.rtspClient.setRemotePortOdd(Integer.valueOf(serverPort2));
  117. if(sessionid!=null&&sessionid.length()>0){
  118. sysStatus = Status.setup;
  119. System.out.println("setup ok");
  120. }
  121. isSended=true;
  122. break;
  123. case setup:
  124. sysStatus = Status.play;
  125. System.out.println("play ok");
  126. this.rtspClient.sendBeforePlay();
  127. this.rtspClient.sendBeforePlay();
  128. isSended=true;
  129. break;
  130. case play:
  131. //sysStatus = Status.pause;
  132. System.out.println("pause ok");
  133. isSended=true;
  134. break;
  135. case pause:
  136. sysStatus = Status.teardown;
  137. System.out.println("teardown ok");
  138. isSended=true;
  139. //shutdown.set(true);
  140. break;
  141. case teardown:
  142. sysStatus = Status.init;
  143. System.out.println("exit start");
  144. isSended=true;
  145. break;
  146. default:
  147. break;
  148. }
  149. } else {
  150. System.out.println("返回错误:" + tmp);
  151. }
  152. }
  153. private void select() {
  154. int n = 0;
  155. try
  156. {
  157. if (selector == null) {
  158. return;
  159. }
  160. n = selector.select(1000);
  161. } catch (final Exception e) {
  162. e.printStackTrace();
  163. }
  164. // 如果select返回大于0,处理事件
  165. if (n > 0) {
  166. for (final Iterator<SelectionKey> i = selector.selectedKeys()
  167. .iterator(); i.hasNext();) {
  168. // 得到下一个Key
  169. final SelectionKey sk = i.next();
  170. i.remove();
  171. // 检查其是否还有效
  172. if (!sk.isValid()) {
  173. continue;
  174. }
  175. if (sk.isReadable()) {
  176. byte[] message = rtspClient.receive();
  177. handle(message);
  178. }
  179. if (sk.isConnectable()) {
  180. try {
  181. rtspClient.reConnect(sk);
  182. } catch (IOException e) {
  183. // TODO Auto-generated catch block
  184. e.printStackTrace();
  185. }
  186. }
  187. }
  188. }
  189. }
  190. }

Test:测试类

  1. public class Test {
  2. public static void main(String[] args){
  3. PlayerClient player = new PlayerClient();
  4. player.init();
  5. try
  6. {
  7. player.run();
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. }

只要在ReceiveSocket的run方法中打断点,你就会发现源源不断的数据向你发来,是不是感觉很爽,哈哈哈。

上一篇:SQL Server 全局变量


下一篇:windows版爬取csdn