[Red5commits] [656] lhubbard
luke@codegent.com
luke at codegent.com
Sat Apr 8 19:00:05 EDT 2006
--
Timestamp: 04/09/06 07:09:43 (2 hours ago)
Change: 656
Author: lhubbard
Files (see diff or trac for details):
java/server/trunk/src/org/red5/server/Context.java
java/server/trunk/src/org/red5/server/api/IContext.java
java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java
java/server/trunk/src/org/red5/server/net/remoting/Connection.java
java/server/trunk/src/org/red5/server/net/remoting/RemotingHandler.java
java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java
java/server/trunk/src/org/red5/server/stream/DownStreamSink.java
java/server/trunk/src/org/red5/server/stream/FileStreamSink.java
java/server/trunk/src/org/red5/server/stream/FileStreamSource.java
java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java
java/server/trunk/src/org/red5/server/stream/ISinkContainer.java
java/server/trunk/src/org/red5/server/stream/IStream.java
java/server/trunk/src/org/red5/server/stream/IStreamSink.java
java/server/trunk/src/org/red5/server/stream/IStreamSource.java
java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java
java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java
java/server/trunk/src/org/red5/server/stream/Stream.java
java/server/trunk/src/org/red5/server/stream/StreamManager.java
java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java
java/server/trunk/src/org/red5/server/stream/TemporaryStream.java
java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java
java/server/trunk/src/org/red5/server/zcontext
Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/656
Index: /va/server/trunk/src/org/red5/server/net/remoting/Connection.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/Connection.java (revision 651)
+++ (revision )
@@ -1,17 +1,0 @@
-package org.red5.server.net.remoting;
-
-import javax.servlet.http.HttpSession;
-
-import org.apache.mina.common.IoSession;
-import org.red5.server.zcontext.AppContext;
-
-public class Connection {
-
-
- protected HttpSession httpSession;
- protected IoSession ioSession;
- protected AppContext appCtx;
-
- //
-
-}
Index: /va/server/trunk/src/org/red5/server/net/remoting/RemotingHandler.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/RemotingHandler.java (revision 651)
+++ (revision )
@@ -1,92 +1,0 @@
-package org.red5.server.net.remoting;
-
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.LoggingFilter;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.red5.server.net.remoting.message.RemotingCall;
-import org.red5.server.net.remoting.message.RemotingPacket;
-import org.red5.server.service.ServiceInvoker;
-import org.red5.server.zcontext.GlobalContext;
-//import org.red5.server.net.remoting.message.RemotingResponse;
-
-public class RemotingHandler extends IoHandlerAdapter {
-
- protected static Log log =
- LogFactory.getLog(RemotingHandler.class.getName());
-
- private ProtocolCodecFactory codecFactory = null;
- private ServiceInvoker serviceInvoker = null;
- private GlobalContext globalContext = null;
-
- public void setGlobalContext(GlobalContext globalContext) {
- this.globalContext = globalContext;
- }
-
- public void setCodecFactory(ProtocolCodecFactory codecFactory) {
- this.codecFactory = codecFactory;
- }
-
- public void setServiceInvoker(ServiceInvoker serviceInvoker) {
- this.serviceInvoker = serviceInvoker;
- }
-
- public void exceptionCaught(IoSession arg0, Throwable arg1) throws Exception {
- // TODO Auto-generated method stub
- super.exceptionCaught(arg0, arg1);
- }
-
- public void messageReceived(IoSession session, Object message) throws Exception {
- log.info("Message recieved: "+message);
- if(!(message instanceof RemotingPacket))
- return;
- RemotingPacket req = (RemotingPacket) message;
- Iterator it = req.getCalls().iterator();
- while(it.hasNext()){
- RemotingCall call = (RemotingCall) it.next();
- //serviceInvoker.invoke(call, globalContext);
- call.setResult(call.getArguments()[0]);
- call.setStatus(RemotingCall.STATUS_SUCCESS_RESULT);
- }
- //RemotingResponse resp = new RemotingResponse(req.getCalls());
- //session.write(resp).join(); // wait for it to write
- }
-
- public void messageSent(IoSession arg0, Object arg1) throws Exception {
- // TODO Auto-generated method stub
- super.messageSent(arg0, arg1);
- }
-
- public void sessionClosed(IoSession arg0) throws Exception {
- // TODO Auto-generated method stub
- super.sessionClosed(arg0);
- }
-
- public void sessionCreated(IoSession session) throws Exception {
- // TODO Auto-generated method stub
-
- session.getFilterChain().addFirst(
- "protocolFilter",new ProtocolCodecFilter(codecFactory) );
- session.getFilterChain().addLast(
- "logger", new LoggingFilter() );
-
- }
-
- public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
- // TODO Auto-generated method stub
- super.sessionIdle(arg0, arg1);
- }
-
- public void sessionOpened(IoSession arg0) throws Exception {
- // TODO Auto-generated method stub
- super.sessionOpened(arg0);
- }
-
-
-}
Index: /java/server/trunk/src/org/red5/server/stream/IStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IStream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IStream.java (revision 656)
@@ -0,0 +1,19 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+public interface IStream {
+
+ // start pull downstream
+ public abstract void start();
+
+ // stop pull downsteam
+ public abstract void stop();
+
+ // pull down stream, this is where we pause
+ public abstract void written(Message message);
+
+ // push up stream
+ public abstract void publish(Message message);
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/DownStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/DownStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/DownStreamSink.java (revision 656)
@@ -0,0 +1,61 @@
+package org.red5.server.stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.server.net.rtmp.Channel;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Message;
+
+public class DownStreamSink extends BaseStreamSink implements IStreamSink, Constants {
+
+ protected static Log log =
+ LogFactory.getLog(DownStreamSink.class.getName());
+
+ private Channel video;
+ private Channel audio;
+ private Channel data;
+
+ public DownStreamSink(Channel video, Channel audio, Channel data){
+ this.video = video;
+ this.audio = audio;
+ this.data = data;
+ }
+
+ public void close() {
+ this.video.close();
+ this.audio.close();
+ this.data.close();
+ super.close();
+ }
+
+ public void enqueue(Message message){
+ //log.info("out ts:"+message.getTimestamp());
+ switch(message.getDataType()){
+ case TYPE_VIDEO_DATA:
+ case TYPE_STREAM_METADATA:
+ //log.debug("write video");
+ video.write(message);
+ break;
+ case TYPE_AUDIO_DATA:
+ audio.write(message);
+ //log.debug("write audio");
+ break;
+ default:
+ data.write(message);
+ //log.debug("write other");
+ break;
+ }
+ }
+
+ public Channel getAudio() {
+ return audio;
+ }
+
+ public Channel getData() {
+ return data;
+ }
+
+ public Channel getVideo() {
+ return video;
+ }
+}
Index: /java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java (revision 656)
@@ -0,0 +1,34 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Message;
+
+public class BaseStreamSink implements IStreamSink, Constants {
+
+ protected ISinkContainer sinkContainer = null;
+ protected IVideoStreamCodec videoCodec = null;
+
+ public boolean canAccept() {
+ return true;
+ }
+
+ public void enqueue(Message message) {
+ // override this...
+ }
+
+ public void close() {
+ if (this.sinkContainer != null)
+ this.sinkContainer.disconnect(this);
+
+ this.videoCodec = null;
+ }
+
+ public void setVideoCodec(IVideoStreamCodec codec) {
+ this.videoCodec = codec;
+ }
+
+ public void setSinkContainer(ISinkContainer container) {
+ this.sinkContainer = container;
+ }
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/FileStreamSource.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/FileStreamSource.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/FileStreamSource.java (revision 656)
@@ -0,0 +1,85 @@
+package org.red5.server.stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.io.flv.IFLV;
+import org.red5.io.flv.IReader;
+import org.red5.io.flv.ITag;
+import org.red5.io.flv.IKeyFrameDataAnalyzer.KeyFrameMeta;
+import org.red5.server.net.rtmp.message.AudioData;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Invoke;
+import org.red5.server.net.rtmp.message.Message;
+import org.red5.server.net.rtmp.message.Notify;
+import org.red5.server.net.rtmp.message.Unknown;
+import org.red5.server.net.rtmp.message.VideoData;
+
+public class FileStreamSource implements ISeekableStreamSource, Constants {
+
+ protected static Log log =
+ LogFactory.getLog(FileStreamSource.class.getName());
+
+ private IReader reader = null;
+ private KeyFrameMeta keyFrameMeta = null;
+
+ public FileStreamSource(IReader reader){
+ this.reader = reader;
+ }
+
+ public void close() {
+ reader.close();
+ }
+
+ public Message dequeue() {
+
+ if(!reader.hasMoreTags()) return null;
+ ITag tag = reader.readTag();
+
+ Message msg = null;
+ switch(tag.getDataType()){
+ case TYPE_AUDIO_DATA:
+ msg = new AudioData();
+ break;
+ case TYPE_VIDEO_DATA:
+ msg = new VideoData();
+ break;
+ case TYPE_INVOKE:
+ msg = new Invoke();
+ break;
+ case TYPE_NOTIFY:
+ msg = new Notify();
+ break;
+ default:
+ log.warn("Unexpected type? "+tag.getDataType());
+ msg = new Unknown(tag.getDataType());
+ break;
+ }
+ msg.setData(tag.getBody());
+ msg.setTimestamp(tag.getTimestamp());
+ msg.setSealed(true);
+ return msg;
+ }
+
+ public boolean hasMore() {
+ return reader.hasMoreTags();
+ }
+
+ synchronized public int seek(int ts) {
+ if (keyFrameMeta == null) {
+ keyFrameMeta = reader.analyzeKeyFrames();
+ }
+ if (keyFrameMeta.positions.length == 0) {
+ // no video keyframe metainfo, it's an audio-only FLV
+ // we skip the seek for now.
+ // TODO add audio-seek capability
+ return ts;
+ }
+ int frame = 0;
+ for (int i = 0; i < keyFrameMeta.positions.length; i++) {
+ if (keyFrameMeta.timestamps[i] > ts) break;
+ frame = i;
+ }
+ reader.position(keyFrameMeta.positions[frame]);
+ return keyFrameMeta.timestamps[frame];
+ }
+}
Index: /java/server/trunk/src/org/red5/server/stream/ISinkContainer.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/ISinkContainer.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/ISinkContainer.java (revision 656)
@@ -0,0 +1,9 @@
+package org.red5.server.stream;
+
+public interface ISinkContainer {
+
+ public void connect(IStreamSink stream);
+
+ public void disconnect(IStreamSink stream);
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/FileStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/FileStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/FileStreamSink.java (revision 656)
@@ -0,0 +1,39 @@
+package org.red5.server.stream;
+
+import java.io.IOException;
+
+import org.red5.io.flv.ITag;
+import org.red5.io.flv.IWriter;
+import org.red5.io.flv.impl.Tag;
+import org.red5.server.net.rtmp.message.Message;
+
+public class FileStreamSink extends BaseStreamSink implements IStreamSink {
+
+ private IWriter writer;
+
+ public FileStreamSink(IWriter writer){
+ this.writer = writer;
+ }
+
+ public void close() {
+ writer.close();
+ super.close();
+ }
+
+ public void enqueue(Message message) {
+
+ ITag tag = new Tag();
+
+ tag.setDataType(message.getDataType());
+ tag.setTimestamp(message.getTimestamp());
+ tag.setBodySize(message.getData().limit());
+ tag.setBody(message.getData());
+
+ try {
+ writer.writeTag(tag);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
Index: /java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java (revision 656)
@@ -0,0 +1,15 @@
+package org.red5.server.stream;
+
+/**
+ * Stream source that can be seeked in timeline
+ * @author The Red5 Project (red5 at osflash.org)
+ * @author Steven Gong (steven.gong at gmail.com)
+ */
+public interface ISeekableStreamSource extends IStreamSource {
+ /**
+ * Seek the stream source to timestamp ts
+ * @param ts Timestamp to seek to
+ * @return Actual timestamp seeked to
+ */
+ int seek(int ts);
+}
Index: /java/server/trunk/src/org/red5/server/stream/IStreamSource.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IStreamSource.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IStreamSource.java (revision 656)
@@ -0,0 +1,13 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+public interface IStreamSource {
+
+ public abstract boolean hasMore();
+
+ public abstract Message dequeue();
+
+ public abstract void close();
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java (revision 656)
@@ -0,0 +1,8 @@
+package org.red5.server.stream;
+
+public class TemporaryDownStream extends DownStreamSink {
+
+ public TemporaryDownStream() {
+ super(null, null, null);
+ }
+}
Index: /java/server/trunk/src/org/red5/server/stream/StreamManager.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/StreamManager.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/StreamManager.java (revision 656)
@@ -0,0 +1,134 @@
+package org.red5.server.stream;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.io.flv.IFLV;
+import org.red5.io.flv.IFLVService;
+import org.red5.io.flv.IWriter;
+import org.red5.server.net.rtmp.message.Status;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.io.Resource;
+
+public class StreamManager implements ApplicationContextAware {
+
+ protected static Log log =
+ LogFactory.getLog(StreamManager.class.getName());
+
+ private ApplicationContext appCtx = null;
+ private String streamDir = "streams";
+ private HashMap published = new HashMap();
+ private IFLVService flvService;
+
+ public void setApplicationContext(ApplicationContext appCtx) throws BeansException {
+ this.appCtx = appCtx;
+ }
+
+ public void setFlvService(IFLVService flvService) {
+ this.flvService = flvService;
+ }
+
+ public void publishStream(Stream stream){
+
+ // If we have a read mode stream, we shouldnt be publishing return
+ if(stream.getMode().equals(Stream.MODE_READ)) return;
+
+ MultiStreamSink multi = (MultiStreamSink) published.get(stream.getName());
+ if (multi == null)
+ // sink doesn't exist, create new
+ multi = new MultiStreamSink();
+
+ stream.setUpstream(multi);
+ published.put(stream.getName(),multi);
+
+ // If the mode is live, we dont need to do anything else
+ if(stream.getMode().equals(Stream.MODE_LIVE)) return;
+
+ // The mode must be record or append
+ try {
+ Resource res = appCtx.getResource("streams/" + stream.getName()+".flv");
+ if(stream.getMode().equals(Stream.MODE_RECORD) && res.exists())
+ res.getFile().delete();
+ if(!res.exists()) res = appCtx.getResource("streams/").createRelative(stream.getName()+".flv");
+ if(!res.exists()) res.getFile().createNewFile();
+ File file = res.getFile();
+ IFLV flv = flvService.getFLV(file);
+ IWriter writer = null;
+ if(stream.getMode().equals(Stream.MODE_RECORD))
+ writer = flv.writer();
+ else if(stream.getMode().equals(Stream.MODE_APPEND))
+ writer = flv.append();
+ multi.connect(new FileStreamSink(writer));
+ } catch (IOException e) {
+ log.error("Error recording stream: "+stream, e);
+ }
+ }
+
+ public void deleteStream(Stream stream){
+ if (stream.getUpstream() != null && published.containsKey(stream.getName())) {
+ // Notify all clients that stream is no longer published
+ MultiStreamSink multi = (MultiStreamSink) published.get(stream.getName());
+ Status unpublish = new Status(Status.NS_PLAY_UNPUBLISHNOTIFY);
+ unpublish.setClientid(stream.getStreamId());
+ unpublish.setDetails(stream.getName());
+ Iterator it = multi.streams.iterator();
+ while (it.hasNext()) {
+ Stream s = (Stream) it.next();
+ s.getDownstream().getData().sendStatus(unpublish);
+ }
+ published.remove(stream.getName());
+ }
+ stream.close();
+ }
+
+ public boolean isPublishedStream(String name){
+ return published.containsKey(name);
+ }
+
+ public boolean isFileStream(String name) {
+ if (this.isPublishedStream(name))
+ // A stream cannot be published and file based at the same time
+ return false;
+
+ try {
+ File file = appCtx.getResources("streams/" + name)[0].getFile();
+ if (file.exists())
+ return true;
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ return false;
+ }
+
+ public void connectToPublishedStream(Stream stream){
+ MultiStreamSink multi = (MultiStreamSink) published.get(stream.getName());
+ multi.connect(stream);
+ }
+
+ public IStreamSource lookupStreamSource(String name){
+ return createFileStreamSource(name);
+ }
+
+ protected IStreamSource createFileStreamSource(String name){
+ Resource[] resource = null;
+ FileStreamSource source = null;
+ try {
+ File file = appCtx.getResources("streams/" + name)[0].getFile();
+ IFLV flv = flvService.getFLV(file);
+ source = new FileStreamSource(flv.reader());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return source;
+ }
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java (revision 656)
@@ -0,0 +1,27 @@
+package org.red5.server.stream;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface IVideoStreamCodec {
+
+ /*
+ * Reset the codec to its initial state.
+ */
+ public void reset();
+
+ /*
+ * Returns true if the codec knows how to handle the passed
+ * stream data.
+ */
+ public boolean canHandleData(ByteBuffer data);
+
+ /*
+ * Update the state of the codec with the passed data.
+ */
+ public boolean addData(ByteBuffer data);
+
+ /*
+ * Return the data for a keyframe.
+ */
+ public ByteBuffer getKeyframe();
+}
Index: /java/server/trunk/src/org/red5/server/stream/IStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IStreamSink.java (revision 656)
@@ -0,0 +1,18 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+public interface IStreamSink {
+
+ public abstract boolean canAccept();
+
+ // push down stream
+ public abstract void enqueue(Message message);
+
+ public abstract void close();
+
+ public abstract void setVideoCodec(IVideoStreamCodec codec);
+
+ public abstract void setSinkContainer(ISinkContainer container);
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/Stream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/Stream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/Stream.java (revision 656)
@@ -0,0 +1,329 @@
+package org.red5.server.stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.ByteBuffer;
+import org.red5.server.net.rtmp.BaseConnection;
+import org.red5.server.net.rtmp.Channel;
+import org.red5.server.net.rtmp.message.AudioData;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Message;
+import org.red5.server.net.rtmp.message.Ping;
+import org.red5.server.net.rtmp.message.Status;
+import org.red5.server.net.rtmp.message.StreamBytesRead;
+import org.red5.server.net.rtmp.message.VideoData;
+
+public class Stream extends BaseStreamSink implements Constants, IStream, IStreamSink {
+
+ public static final String MODE_READ = "read";
+ public static final String MODE_RECORD = "record";
+ public static final String MODE_APPEND = "append";
+ public static final String MODE_LIVE = "live";
+
+ protected static Log log =
+ LogFactory.getLog(Stream.class.getName());
+
+ private int writeQueue = 0;
+
+ private long startTime = 0;
+ private long startTS = 0;
+ private long currentTS = 0;
+ private int playLength = -1;
+ private String name = "";
+ private boolean paused = false;
+ private String mode = MODE_READ;
+
+ private DownStreamSink downstream = null;
+ private IStreamSink upstream = null;
+ private IStreamSource source = null;
+ private VideoCodecFactory videoCodecFactory = null;
+
+ private int streamId = 0;
+ private boolean initialMessage = true;
+
+ private BaseConnection conn;
+
+ public Stream(BaseConnection conn){
+ this.conn = conn;
+ }
+
+ public Stream(BaseConnection conn, String type) {
+ this.conn = conn;
+ this.mode = type;
+ }
+
+ public int getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(int streamId) {
+ this.streamId = streamId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+
+ public DownStreamSink getDownstream() {
+ return downstream;
+ }
+
+ public void setDownstream(DownStreamSink downstream) {
+ this.downstream = downstream;
+ }
+
+ public IStreamSource getSource() {
+ return source;
+ }
+
+ public void setSource(IStreamSource source) {
+ this.source = source;
+ }
+
+ public IStreamSink getUpstream() {
+ return upstream;
+ }
+
+ public void setUpstream(IStreamSink upstream) {
+ this.upstream = upstream;
+ }
+
+ public void setVideoCodecFactory(VideoCodecFactory factory) {
+ this.videoCodecFactory = factory;
+ }
+
+ protected int bytesReadInterval = 125000;
+ protected int bytesRead = 0;
+
+ public void publish(){
+ Status publish = new Status(Status.NS_PUBLISH_START);
+ publish.setClientid(streamId);
+ publish.setDetails(name);
+ Channel data = downstream.getData();
+ if (data != null)
+ // temporary streams don't have a data channel so check for it
+ data.sendStatus(publish);
+
+ initialMessage = true;
+ }
+
+ public void pause(){
+ paused = true;
+ Status pause = new Status("NetStream.Pause.Notify");
+ pause.setClientid(1);
+ pause.setDetails(name);
+ downstream.getData().sendStatus(pause);
+ }
+
+ public void resume(int resumeTS){
+ if (!paused) return;
+ paused = false;
+ if (!(source instanceof ISeekableStreamSource)) return;
+ ISeekableStreamSource sss = (ISeekableStreamSource) source;
+ int ts = sss.seek(resumeTS);
+
+ Ping ping = new Ping();
+ ping.setValue1((short) 4);
+ ping.setValue2(streamId);
+
+ conn.ping(ping);
+
+ Ping ping2 = new Ping();
+ ping2.setValue1((short) 0);
+ ping2.setValue2(streamId);
+
+ conn.ping(ping2);
+
+ Status play = new Status("NetStream.Unpause.Notify");
+ play.setClientid(1);
+ play.setDetails(name);
+ downstream.getData().sendStatus(play);
+
+ AudioData blankAudio = new AudioData();
+ blankAudio.setTimestamp(ts);
+ downstream.getData().write(blankAudio);
+ }
+
+ public void seek(int time) {
+ if (!(source instanceof ISeekableStreamSource)) return;
+ ISeekableStreamSource sss = (ISeekableStreamSource) source;
+ int ts = sss.seek(time);
+
+ if (!paused) {
+ // seems not necessary, but seen in FMS's dump
+ Ping ping0 = new Ping();
+ ping0.setValue1((short) 1);
+ ping0.setValue2(streamId);
+ conn.ping(ping0);
+ }
+
+ Ping ping1 = new Ping();
+ ping1.setValue1((short) 4);
+ ping1.setValue2(streamId);
+
+ conn.ping(ping1);
+
+ Ping ping2 = new Ping();
+ ping2.setValue1((short) 0);
+ ping2.setValue2(streamId);
+
+ conn.ping(ping2);
+
+ Status play = new Status("NetStream.Seek.Notify");
+ play.setClientid(1);
+ play.setDetails(name);
+ downstream.getData().sendStatus(play);
+
+ AudioData blankAudio = new AudioData();
+ blankAudio.setTimestamp(ts);
+ downstream.getData().write(blankAudio);
+
+ if (paused) {
+ if(source !=null && source.hasMore()){
+ write(source.dequeue());
+ }
+ }
+ }
+
+ public void start(int startTS, int length) {
+ startTime = System.currentTimeMillis();
+ playLength = length;
+
+ if (startTS > 0 && source instanceof ISeekableStreamSource) {
+ ((ISeekableStreamSource) source).seek(startTS);
+ }
+
+ Status reset = new Status(Status.NS_PLAY_RESET);
+ Status start = new Status(Status.NS_PLAY_START);
+ reset.setClientid(streamId);
+ start.setClientid(streamId);
+ reset.setDetails(name);
+ start.setDetails(name);
+
+ // This hack fixes the on meta data problem
+ // TODO: Perhaps its a good idea to init each channel with a blank audio packet
+ AudioData blankAudio = new AudioData();
+ downstream.getData().write(blankAudio);
+
+ downstream.getData().sendStatus(reset);
+ downstream.getVideo().sendStatus(start);
+
+ initialMessage = true;
+ }
+
+ public void start(){
+ start(0, -1);
+ }
+
+ public void stop(){
+
+ }
+
+ public long getBufferLength(){
+ final long now = System.currentTimeMillis();
+ final long time = now - startTime;
+ final long sentTS = currentTS - startTS;
+ return time - sentTS;
+ }
+
+ public boolean canAccept(){
+ return downstream.canAccept();
+ }
+
+ public void enqueue(Message message){
+ write(message);
+ }
+
+ protected void write(Message message){
+ if (downstream.canAccept()){
+ if (initialMessage) {
+ initialMessage = false;
+ startTS = message.getTimestamp();
+ if (this.videoCodec != null) {
+ ByteBuffer keyframe = this.videoCodec.getKeyframe();
+ if (keyframe != null) {
+ // Send initial keyframe to client
+ Message msg = new VideoData();
+ msg.setTimestamp(message.getTimestamp()-1);
+ msg.setData(keyframe);
+ msg.setSealed(true);
+ this.write(msg);
+ }
+ }
+ }
+ currentTS = message.getTimestamp();
+ if (playLength >= 0 && currentTS - startTS > playLength) return;
+
+ if(log.isDebugEnabled())
+ log.debug("Sending downstream: " + message.getTimestamp());
+ //writeQueue++;
+
+ downstream.enqueue(message);
+ }
+ }
+
+ private int ts = 0;
+ private int bytesReadPacketCount = 0;
+
+ public void publish(Message message){
+ ByteBuffer data = message.getData();
+ if (this.initialMessage && (message instanceof VideoData)) {
+ this.initialMessage = false;
+
+ if (this.videoCodecFactory != null) {
+ this.videoCodec = this.videoCodecFactory.getVideoCodec(data);
+ if (this.upstream != null)
+ this.upstream.setVideoCodec(this.videoCodec);
+ }
+ }
+
+ if (this.videoCodec != null)
+ this.videoCodec.addData(data);
+
+ ts += message.getTimestamp();
+ bytesRead += message.getData().limit();
+ if(bytesReadPacketCount < Math.floor(bytesRead / bytesReadInterval)){
+ bytesReadPacketCount++;
+ StreamBytesRead streamBytesRead = new StreamBytesRead();
+ streamBytesRead.setBytesRead(bytesRead);
+ log.debug(streamBytesRead);
+ conn.getChannel((byte)2).write(streamBytesRead);
+ }
+ message.setTimestamp(ts);
+ if(upstream != null && upstream.canAccept()){
+ upstream.enqueue(message);
+ } else {
+ log.warn("No where for upstream packet to go :(");
+ }
+ }
+
+ public void written(Message message){
+ if(paused) return;
+ writeQueue--;
+ synchronized (this) {
+ if(source !=null && source.hasMore()){
+ write(source.dequeue());
+ }
+ }
+ }
+
+ public void close(){
+ if(upstream!=null) upstream.close();
+ if(downstream!=null) downstream.close();
+ if(source!=null) source.close();
+ super.close();
+ }
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java (revision 656)
@@ -0,0 +1,74 @@
+package org.red5.server.stream;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.server.net.rtmp.message.Message;
+
+public class MultiStreamSink extends BaseStreamSink implements IStreamSink, ISinkContainer {
+
+ protected static Log log =
+ LogFactory.getLog(MultiStreamSink.class.getName());
+
+ protected LinkedList streams = new LinkedList();
+
+ public void connect(IStreamSink stream){
+ synchronized (streams) {
+ streams.add(stream);
+ }
+ stream.setVideoCodec(this.videoCodec);
+ stream.setSinkContainer(this);
+ }
+
+ public void disconnect(IStreamSink stream) {
+ synchronized (streams) {
+ streams.remove(stream);
+ }
+ stream.setSinkContainer(null);
+ }
+
+ public void setVideoCodec(IVideoStreamCodec codec) {
+ super.setVideoCodec(codec);
+
+ // Update already connected streams
+ synchronized (streams) {
+ Iterator it = streams.iterator();
+ while (it.hasNext()) {
+ IStreamSink stream = (IStreamSink) it.next();
+ stream.setVideoCodec(codec);
+ }
+ }
+ }
+
+ // push message to all connected streams
+ public void enqueue(Message message) {
+ synchronized (streams) {
+ final Iterator it = streams.iterator();
+ while (it.hasNext()){
+ IStreamSink stream = (IStreamSink) it.next();
+ if (log.isDebugEnabled())
+ log.debug("Sending");
+ if (stream.canAccept()){
+ stream.enqueue(message);
+ } else {
+ log.warn("Out cant accept");
+ }
+ }
+ }
+ }
+
+ public void close(){
+ synchronized (streams) {
+ final Iterator it = streams.iterator();
+ while (it.hasNext()){
+ IStreamSink stream = (IStreamSink) it.next();
+ stream.close();
+ }
+ streams.clear();
+ }
+
+ super.close();
+ }
+}
Index: /java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java (revision 656)
@@ -0,0 +1,46 @@
+package org.red5.server.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.ByteBuffer;
+
+public class VideoCodecFactory {
+
+ private Log log = LogFactory.getLog(VideoCodecFactory.class.getName());
+
+ private List codecs = new ArrayList();
+
+ public void setCodecs(List codecs) {
+ this.codecs = codecs;
+ }
+
+ public IVideoStreamCodec getVideoCodec(ByteBuffer data) {
+ IVideoStreamCodec result = null;
+ Iterator it = this.codecs.iterator();
+ while (it.hasNext()) {
+ IVideoStreamCodec codec;
+ IVideoStreamCodec storedCodec = (IVideoStreamCodec) it.next();
+ // XXX: this is a bit of a hack to create new instances of the configured
+ // video codec for each stream
+ try {
+ codec = (IVideoStreamCodec) storedCodec.getClass().newInstance();
+ } catch (Exception e) {
+ log.error("Could not create video codec instance.", e);
+ continue;
+ }
+
+ log.info("Trying codec " + codec);
+ if (codec.canHandleData(data)) {
+ result = codec;
+ break;
+ }
+ }
+
+ // No codec for this video data
+ return result;
+ }
+}
Index: /java/server/trunk/src/org/red5/server/stream/TemporaryStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/TemporaryStream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/TemporaryStream.java (revision 656)
@@ -0,0 +1,25 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+// Very simple class that can be used as temporary stream while waiting for
+// the "real" published live stream.
+public class TemporaryStream extends Stream implements IStreamSource {
+
+ public TemporaryStream(String name, String mode) {
+ super(null, mode);
+ this.setName(name);
+ this.setSource(this);
+ this.setDownstream(new TemporaryDownStream());
+ }
+
+ public boolean hasMore() {
+ return false;
+ }
+
+ public Message dequeue() {
+ return null;
+ }
+
+ public void close() {}
+}
Index: /java/server/trunk/src/org/red5/server/Context.java
===================================================================
--- /java/server/trunk/src/org/red5/server/Context.java (revision 651)
+++ /java/server/trunk/src/org/red5/server/Context.java (revision 656)
@@ -9,5 +9,5 @@
import org.red5.server.api.IScopeHandler;
import org.red5.server.api.IScopeResolver;
-import org.red5.server.api.persistance.IPersistanceStore;
+import org.red5.server.api.persistance.IPersistenceStore;
import org.red5.server.api.service.IServiceInvoker;
import org.red5.server.exception.ScopeHandlerNotFoundException;
@@ -26,5 +26,5 @@
private IServiceInvoker serviceInvoker;
private IMappingStrategy mappingStrategy;
- private IPersistanceStore persistanceStore;
+ private IPersistenceStore persistanceStore;
public Context(){
@@ -61,9 +61,9 @@
}
- public IPersistanceStore getPersistanceStore() {
+ public IPersistenceStore getPersistanceStore() {
return persistanceStore;
}
- public void setPersistanceStore(IPersistanceStore persistanceStore) {
+ public void setPersistanceStore(IPersistenceStore persistanceStore) {
this.persistanceStore = persistanceStore;
}
Index: /java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java (revision 656)
@@ -0,0 +1,8 @@
+package org.red5.server.api.persistance;
+
+public interface IPersistenceStore {
+
+ public boolean save(IPersistable obj);
+ public boolean load(IPersistable obj);
+
+}
Index: /va/server/trunk/src/org/red5/server/api/persistance/IPersistanceStore.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/persistance/IPersistanceStore.java (revision 651)
+++ (revision )
@@ -1,8 +1,0 @@
-package org.red5.server.api.persistance;
-
-public interface IPersistanceStore {
-
- public boolean save(IPersistable obj);
- public boolean load(IPersistable obj);
-
-}
Index: /java/server/trunk/src/org/red5/server/api/IContext.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/IContext.java (revision 651)
+++ /java/server/trunk/src/org/red5/server/api/IContext.java (revision 656)
@@ -1,5 +1,5 @@
package org.red5.server.api;
-import org.red5.server.api.persistance.IPersistanceStore;
+import org.red5.server.api.persistance.IPersistenceStore;
import org.red5.server.api.service.IServiceInvoker;
import org.springframework.context.ApplicationContext;
@@ -20,5 +20,5 @@
public IClientRegistry getClientRegistry();
public IServiceInvoker getServiceInvoker();
- public IPersistanceStore getPersistanceStore();
+ public IPersistenceStore getPersistanceStore();
public Object lookupService(String serviceName);
public IScopeHandler lookupScopeHandler(String path);
More information about the Red5commits
mailing list