package RTPReceive;
import java.awt.*;
import java.net.*;
import java.awt.event.*;
import java.util.Vector;
import javax.media.*;
import javax.media.rtp.*;
import javax.media.rtp.event.*;
import javax.media.protocol.DataSource;
import javax.media.control.BufferControl;
// 接收RTP传输数据的类
public class RTPReceive implements ReceiveStreamListener, SessionListener,ControllerListener {
String sessions[] = null; // RTP会话字符串数组
RTPManager mgrs[] = null; // RTP管理器数组
Vector playerWindows = null; // 管理播放器窗口的向量
boolean dataReceived = false; // 是否接收到数据的标志
Object dataSync = new Object(); // 同步对象
// 构造函数
public RTPReceive(String sessions[]) {
this.sessions = sessions;
}
// 初始化RTP会话,准备接收数据
protected boolean initialize() {
try {
playerWindows = new Vector(); // 构造一个向量数组管理多个播放窗口
mgrs = new RTPManager[sessions.length]; // 为每一个RTP会话建立一个管理器
SessionLabel session;
for (int i = 0; i < sessions.length; i++) { // 对每一个RTP会话
try {
session = new SessionLabel(sessions[i]); // 解析RTP会话地址
}
catch (IllegalArgumentException e) {
System.err.println("Failed to parse the session address given: " + sessions[i]);
return false;
}
System.err.println(" - Open RTP session for: addr: " + session.addr + " port: " + session.port);
mgrs[i] = (RTPManager) RTPManager.newInstance(); // 为每一个RTP会话产生一个RTP管理器
mgrs[i].addSessionListener(this); // 添加会话监听
mgrs[i].addReceiveStreamListener(this); // 添加接收到数据的监听
InetAddress ipAddr = InetAddress.getByName(session.addr);
SessionAddress localAddr = null;
SessionAddress destAddr = null;
if( ipAddr.isMulticastAddress()) { // 对于组播,本地和目的地的IP地址相同,均采用组播地址
localAddr= new SessionAddress( ipAddr,session.port);
destAddr = new SessionAddress( ipAddr,session.port);
}
else {
localAddr= new SessionAddress( InetAddress.getLocalHost(),session.port); // 用本机IP地址和端口号构造源会话地址
destAddr = new SessionAddress( ipAddr, session.port); // 用目的机(发送端)的IP地址和端口号构造目的会话地址
}
mgrs[i].initialize( localAddr); // 将本机会话地址给RTP管理器
BufferControl bc = (BufferControl)mgrs[i].getControl("javax.media.control.BufferControl");
if (bc != null)
bc.setBufferLength(350); // 设置缓冲区大小(也可以使用其他值)
mgrs[i].addTarget(destAddr); // 加入目的会话地址
}
}
catch (Exception e){
System.err.println("Cannot create the RTP Session: " + e.getMessage());
return false;
}
// 等待数据到达
long then = System.currentTimeMillis();
long waitingPeriod = 60000; // 最多等待60秒
try{
synchronized (dataSync) {
while (!dataReceived && System.currentTimeMillis() - then < waitingPeriod) { // 等待上面所设定的时间
if (!dataReceived)
System.err.println(" - Waiting for RTP data to arrive...");
dataSync.wait(1000);
}
}
}
catch (Exception e) {}
if (!dataReceived) { // 在设定的时间内没有等到数据
System.err.println("No RTP data was received.");
close();
return false;
}
return true;
}
// 关闭播放器和会话管理器
protected void close() {
// 关闭播放窗口
for (int i = 0; i < playerWindows.size(); i++) {
try {
((PlayerWindow)playerWindows.elementAt(i)).close();
}
catch (Exception e) {}
}
playerWindows.removeAllElements(); // 删除所有播放窗口
// 关闭RTP会话
for (int i = 0; i < mgrs.length; i++) {
if (mgrs[i] != null) {
mgrs[i].removeTargets( "Closing session from RTPReceive");
mgrs[i].dispose(); // 关闭RTP会话管理器
mgrs[i] = null;
}
}
}
// 判断数据是否接收完成
public boolean isDone() {
return playerWindows.size() == 0;
}
// 通过播放器查找播放窗口
PlayerWindow find(Player p) {
for (int i = 0; i < playerWindows.size(); i++) {
PlayerWindow pw = (PlayerWindow)playerWindows.elementAt(i);
if (pw.player == p)
return pw;
}
return null;
}
// 通过接收数据流查找播放窗口
PlayerWindow find(ReceiveStream strm) {
for (int i = 0; i < playerWindows.size(); i++) {
PlayerWindow pw = (PlayerWindow)playerWindows.elementAt(i);
if (pw.stream == strm)
return pw;
}
return null;
}
/***************************************************************
* 下面这个函数实现了 ReceiveStreamListener 接口
****************************************************************/
public synchronized void update( ReceiveStreamEvent evt) {
RTPManager mgr = (RTPManager)evt.getSource();
Participant participant = evt.getParticipant(); // 得到加入者(发送者)
ReceiveStream stream = evt.getReceiveStream(); // 得到接收到的数据流
if (evt instanceof NewReceiveStreamEvent) { // 接收到新的数据流
try {
stream = ((NewReceiveStreamEvent)evt).getReceiveStream(); // 得到新数据流
DataSource ds = stream.getDataSource(); // 得到数据源
RTPControl ctl = (RTPControl)ds.getControl("javax.media.rtp.RTPControl"); // 得到RTP控制器
if (ctl != null)
System.err.println(" - Recevied new RTP stream: " + ctl.getFormat()); // 得到接收数据的格式
else
System.err.println(" - Recevied new RTP stream");
if (participant == null)
System.err.println(" The sender of this stream had yet to be identified.");
else
System.err.println(" The stream comes from: " + participant.getCNAME());
Player p = javax.media.Manager.createPlayer(ds); // 通过数据源构造一个媒体播放器
if (p == null)
return;
p.addControllerListener(this); // 给播放器添加控制器监听
p.realize(); // 实现播放器
PlayerWindow pw = new PlayerWindow(p, stream); // 通过播放器和数据流构造一个播放窗口
playerWindows.addElement(pw); // 将该播放窗口加入向量数组中
// 通知initialize()函数中的等待过程:已经接收到了一个新数据流
synchronized (dataSync) {
dataReceived = true;
dataSync.notifyAll();
}
}
catch (Exception e) {
System.err.println("NewReceiveStreamEvent exception " + e.getMessage());
return;
}
}
else if (evt instanceof StreamMappedEvent) { // 数据流映射事件
if (stream != null && stream.getDataSource() != null) {
DataSource ds = stream.getDataSource();
RTPControl ctl = (RTPControl)ds.getControl("javax.media.rtp.RTPControl");
System.err.println(" - The previously unidentified stream ");
if (ctl != null)
System.err.println(" " + ctl.getFormat()); // 得到格式
System.err.println(" had now been identified as sent by: " + participant.getCNAME());
}
}
else if (evt instanceof ByeEvent) { // 数据接收完毕
System.err.println(" - Got \"bye\" from: " + participant.getCNAME());
PlayerWindow pw = find(stream);
if (pw != null) {
pw.close(); // 关闭播放窗口
playerWindows.removeElement(pw); // 从向量中去掉该播放窗口
}
}
}
/***************************************************************
* 下面这个函数实现了 SessionListener 接口
****************************************************************/
public synchronized void updat