[Red5commits] [2371] added stream listeners that can get notified about received packets
jbauch
luke at codegent.com
Tue Oct 9 13:00:12 PDT 2007
added stream listeners that can get notified about received packets
Timestamp: 10/09/07 14:51:10 EST (less than one hour ago)
Change: 2371
Author: jbauch
Files (see diff or trac for details):
doc/trunk/changelog.txt
java/server/trunk/src/org/red5/server/api/stream/IBroadcastStream.java
java/server/trunk/src/org/red5/server/api/stream/IStreamListener.java
java/server/trunk/src/org/red5/server/api/stream/IStreamPacket.java
java/server/trunk/src/org/red5/server/net/rtmp/event/AudioData.java
java/server/trunk/src/org/red5/server/net/rtmp/event/Notify.java
java/server/trunk/src/org/red5/server/net/rtmp/event/VideoData.java
java/server/trunk/src/org/red5/server/stream/ClientBroadcastStream.java
java/server/trunk/src/org/red5/server/stream/ServerStream.java
java/server/trunk/test/org/red5/server/stream/NoSyncServerStream.java
Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/2371
Index: /java/server/trunk/test/org/red5/server/stream/NoSyncServerStream.java
===================================================================
--- /java/server/trunk/test/org/red5/server/stream/NoSyncServerStream.java (revision 1984)
+++ /java/server/trunk/test/org/red5/server/stream/NoSyncServerStream.java (revision 2371)
@@ -22,8 +22,11 @@
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
@@ -37,4 +40,6 @@
import org.red5.server.api.stream.IServerStream;
import org.red5.server.api.stream.IStreamFilenameGenerator;
+import org.red5.server.api.stream.IStreamListener;
+import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.api.stream.ResourceExistException;
import org.red5.server.api.stream.ResourceNotFoundException;
@@ -200,4 +205,7 @@
private long vodStartTS;
+ /** Listeners to get notified about received packets. */
+ private Set<IStreamListener> listeners = new CopyOnWriteArraySet<IStreamListener>();
+
/** Constructs a new ServerStream. */
public NoSyncServerStream() {
@@ -477,4 +485,18 @@
msgOut.pushMessage(message);
recordPipe.pushMessage(message);
+
+ // Notify listeners about received packet
+ if (message instanceof RTMPMessage) {
+ final IRTMPEvent rtmpEvent = ((RTMPMessage) message).getBody();
+ if (rtmpEvent instanceof IStreamPacket) {
+ for (IStreamListener listener: getStreamListeners()) {
+ try {
+ listener.packetReceived(this, (IStreamPacket) rtmpEvent);
+ } catch (Exception e) {
+ log.error("Error while notifying listener " + listener, e);
+ }
+ }
+ }
+ }
}
@@ -849,3 +871,18 @@
}
+ /** {@inheritDoc} */
+ public void addStreamListener(IStreamListener listener) {
+ listeners.add(listener);
+ }
+
+ /** {@inheritDoc} */
+ public Collection<IStreamListener> getStreamListeners() {
+ return listeners;
+ }
+
+ /** {@inheritDoc} */
+ public void removeStreamListener(IStreamListener listener) {
+ listeners.remove(listener);
+ }
+
}
Index: /java/server/trunk/src/org/red5/server/stream/ServerStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/ServerStream.java (revision 2162)
+++ /java/server/trunk/src/org/red5/server/stream/ServerStream.java (revision 2371)
@@ -23,7 +23,10 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.commons.logging.Log;
@@ -38,4 +41,6 @@
import org.red5.server.api.stream.IStreamAwareScopeHandler;
import org.red5.server.api.stream.IStreamFilenameGenerator;
+import org.red5.server.api.stream.IStreamListener;
+import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.api.stream.ResourceExistException;
import org.red5.server.api.stream.ResourceNotFoundException;
@@ -179,4 +184,7 @@
private RTMPMessage nextRTMPMessage;
+ /** Listeners to get notified about received packets. */
+ private Set<IStreamListener> listeners = new CopyOnWriteArraySet<IStreamListener>();
+
/** Constructs a new ServerStream. */
public ServerStream() {
@@ -604,4 +612,18 @@
msgOut.pushMessage(message);
recordPipe.pushMessage(message);
+
+ // Notify listeners about received packet
+ if (message instanceof RTMPMessage) {
+ final IRTMPEvent rtmpEvent = ((RTMPMessage) message).getBody();
+ if (rtmpEvent instanceof IStreamPacket) {
+ for (IStreamListener listener: getStreamListeners()) {
+ try {
+ listener.packetReceived(this, (IStreamPacket) rtmpEvent);
+ } catch (Exception e) {
+ log.error("Error while notifying listener " + listener, e);
+ }
+ }
+ }
+ }
}
@@ -865,3 +887,15 @@
}
}
+
+ public void addStreamListener(IStreamListener listener) {
+ listeners.add(listener);
+ }
+
+ public Collection<IStreamListener> getStreamListeners() {
+ return listeners;
+ }
+
+ public void removeStreamListener(IStreamListener listener) {
+ listeners.remove(listener);
+ }
}
Index: /java/server/trunk/src/org/red5/server/stream/ClientBroadcastStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/ClientBroadcastStream.java (revision 2158)
+++ /java/server/trunk/src/org/red5/server/stream/ClientBroadcastStream.java (revision 2371)
@@ -22,6 +22,9 @@
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import javax.management.ObjectName;
@@ -43,4 +46,6 @@
import org.red5.server.api.stream.IStreamCodecInfo;
import org.red5.server.api.stream.IStreamFilenameGenerator;
+import org.red5.server.api.stream.IStreamListener;
+import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.api.stream.IVideoStreamCodec;
import org.red5.server.api.stream.ResourceExistException;
@@ -187,4 +192,7 @@
private int videoTime = -1;
+ /** Listeners to get notified about received packets. */
+ private Set<IStreamListener> listeners = new CopyOnWriteArraySet<IStreamListener>();
+
/**
* Check and send notification if necessary
@@ -327,4 +335,15 @@
sendRecordFailedNotify(err.getMessage());
stop();
+ }
+
+ // Notify listeners about received packet
+ if (rtmpEvent instanceof IStreamPacket) {
+ for (IStreamListener listener: getStreamListeners()) {
+ try {
+ listener.packetReceived(this, (IStreamPacket) rtmpEvent);
+ } catch (Exception e) {
+ log.error("Error while notifying listener " + listener, e);
+ }
+ }
}
}
@@ -774,3 +793,18 @@
}
+ /** {@inheritDoc} */
+ public void addStreamListener(IStreamListener listener) {
+ listeners.add(listener);
+ }
+
+ /** {@inheritDoc} */
+ public Collection<IStreamListener> getStreamListeners() {
+ return listeners;
+ }
+
+ /** {@inheritDoc} */
+ public void removeStreamListener(IStreamListener listener) {
+ listeners.remove(listener);
+ }
+
}
Index: /java/server/trunk/src/org/red5/server/net/rtmp/event/VideoData.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/rtmp/event/VideoData.java (revision 1781)
+++ /java/server/trunk/src/org/red5/server/net/rtmp/event/VideoData.java (revision 2371)
@@ -22,4 +22,5 @@
import org.apache.mina.common.ByteBuffer;
import org.red5.io.IoConstants;
+import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.stream.IStreamData;
@@ -27,5 +28,5 @@
* Video data event
*/
-public class VideoData extends BaseEvent implements IoConstants, IStreamData {
+public class VideoData extends BaseEvent implements IoConstants, IStreamData, IStreamPacket {
/**
Index: /java/server/trunk/src/org/red5/server/net/rtmp/event/Notify.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/rtmp/event/Notify.java (revision 1698)
+++ /java/server/trunk/src/org/red5/server/net/rtmp/event/Notify.java (revision 2371)
@@ -24,4 +24,5 @@
import org.apache.mina.common.ByteBuffer;
import org.red5.server.api.service.IServiceCall;
+import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.stream.IStreamData;
@@ -29,5 +30,5 @@
* Stream notification event
*/
-public class Notify extends BaseEvent implements IStreamData {
+public class Notify extends BaseEvent implements IStreamData, IStreamPacket {
/**
* Service call
Index: /java/server/trunk/src/org/red5/server/net/rtmp/event/AudioData.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/rtmp/event/AudioData.java (revision 1781)
+++ /java/server/trunk/src/org/red5/server/net/rtmp/event/AudioData.java (revision 2371)
@@ -21,7 +21,8 @@
import org.apache.mina.common.ByteBuffer;
+import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.stream.IStreamData;
-public class AudioData extends BaseEvent implements IStreamData {
+public class AudioData extends BaseEvent implements IStreamData, IStreamPacket {
protected ByteBuffer data;
Index: /java/server/trunk/src/org/red5/server/api/stream/IStreamListener.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/stream/IStreamListener.java (revision 2371)
+++ /java/server/trunk/src/org/red5/server/api/stream/IStreamListener.java (revision 2371)
@@ -0,0 +1,40 @@
+package org.red5.server.api.stream;
+
+/*
+ * RED5 Open Source Flash Server - http://www.osflash.org/red5
+ *
+ * Copyright (c) 2006-2007 by respective authors (see below). All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation; either version 2.1 of the License, or (at your option) any later
+ * version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License along
+ * with this library; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/**
+ * Listener that is notified about packets received from a stream.
+ *
+ * @author The Red5 Project (red5 at osflash.org)
+ * @author Joachim Bauch (jojo at struktur.de)
+ */
+public interface IStreamListener {
+
+ /**
+ * A packet has been received from a stream.
+ *
+ * @param stream
+ * the stream the packet has been received for
+ * @param packet
+ * the packet received
+ */
+ public void packetReceived(IBroadcastStream stream, IStreamPacket packet);
+
+}
Index: /java/server/trunk/src/org/red5/server/api/stream/IBroadcastStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/stream/IBroadcastStream.java (revision 1719)
+++ /java/server/trunk/src/org/red5/server/api/stream/IBroadcastStream.java (revision 2371)
@@ -21,4 +21,5 @@
import java.io.IOException;
+import java.util.Collection;
import org.red5.server.messaging.IProvider;
@@ -84,3 +85,27 @@
*/
void setPublishedName(String name);
+
+ /**
+ * Add a listener to be notified about received packets.
+ *
+ * @param listener
+ * the listener to add
+ */
+ public void addStreamListener(IStreamListener listener);
+
+ /**
+ * Remove a listener from being notified about received packets.
+ *
+ * @param listener
+ * the listener to remove
+ */
+ public void removeStreamListener(IStreamListener listener);
+
+ /**
+ * Return registered stream listeners.
+ *
+ * @return the registered listeners
+ */
+ public Collection<IStreamListener> getStreamListeners();
+
}
Index: /java/server/trunk/src/org/red5/server/api/stream/IStreamPacket.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/stream/IStreamPacket.java (revision 2371)
+++ /java/server/trunk/src/org/red5/server/api/stream/IStreamPacket.java (revision 2371)
@@ -0,0 +1,53 @@
+package org.red5.server.api.stream;
+
+/*
+ * RED5 Open Source Flash Server - http://www.osflash.org/red5
+ *
+ * Copyright (c) 2006-2007 by respective authors (see below). All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation; either version 2.1 of the License, or (at your option) any later
+ * version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License along
+ * with this library; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * Packet containing stream data.
+ *
+ * @author The Red5 Project (red5 at osflash.org)
+ * @author Joachim Bauch (jojo at struktur.de)
+ */
+public interface IStreamPacket {
+
+ /**
+ * Type of this packet. This is one of the <code>TYPE_</code> constants.
+ *
+ * @return the type
+ */
+ public byte getDataType();
+
+ /**
+ * Timestamp of this packet.
+ *
+ * @return the timestamp in milliseconds
+ */
+ public int getTimestamp();
+
+ /**
+ * Packet contents.
+ *
+ * @return the contents
+ */
+ public ByteBuffer getData();
+
+}
Index: /doc/trunk/changelog.txt
===================================================================
--- /doc/trunk/changelog.txt (revision 2348)
+++ /doc/trunk/changelog.txt (revision 2371)
@@ -7,4 +7,7 @@
Red5 0.6.4 (unreleased)
------------------------
+New Features:
+- added stream listeners that can get notified about received packets
+
Bugfixes:
- pause near end of buffered streams works as expected (Jira APPSERVER-199)
Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.
More information about the Red5commits
mailing list