[Red5commits] [656] lhubbard

luke@codegent.com luke at codegent.com
Sat Apr 8 19:00:05 EDT 2006


--


Timestamp: 04/09/06 07:09:43 (2 hours ago) 
Change: 656 
Author: lhubbard

Files (see diff or trac for details): 
java/server/trunk/src/org/red5/server/Context.java
java/server/trunk/src/org/red5/server/api/IContext.java
java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java
java/server/trunk/src/org/red5/server/net/remoting/Connection.java
java/server/trunk/src/org/red5/server/net/remoting/RemotingHandler.java
java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java
java/server/trunk/src/org/red5/server/stream/DownStreamSink.java
java/server/trunk/src/org/red5/server/stream/FileStreamSink.java
java/server/trunk/src/org/red5/server/stream/FileStreamSource.java
java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java
java/server/trunk/src/org/red5/server/stream/ISinkContainer.java
java/server/trunk/src/org/red5/server/stream/IStream.java
java/server/trunk/src/org/red5/server/stream/IStreamSink.java
java/server/trunk/src/org/red5/server/stream/IStreamSource.java
java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java
java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java
java/server/trunk/src/org/red5/server/stream/Stream.java
java/server/trunk/src/org/red5/server/stream/StreamManager.java
java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java
java/server/trunk/src/org/red5/server/stream/TemporaryStream.java
java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java
java/server/trunk/src/org/red5/server/zcontext


Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/656

Index: /va/server/trunk/src/org/red5/server/net/remoting/Connection.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/Connection.java (revision 651)
+++  (revision )
@@ -1,17 +1,0 @@
-package org.red5.server.net.remoting;
-
-import javax.servlet.http.HttpSession;
-
-import org.apache.mina.common.IoSession;
-import org.red5.server.zcontext.AppContext;
-
-public class Connection {
-
-	
-	protected HttpSession httpSession;
-	protected IoSession ioSession;
-	protected AppContext appCtx;
-	
-	// 
-	
-}
Index: /va/server/trunk/src/org/red5/server/net/remoting/RemotingHandler.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/RemotingHandler.java (revision 651)
+++  (revision )
@@ -1,92 +1,0 @@
-package org.red5.server.net.remoting;
-
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.LoggingFilter;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.red5.server.net.remoting.message.RemotingCall;
-import org.red5.server.net.remoting.message.RemotingPacket;
-import org.red5.server.service.ServiceInvoker;
-import org.red5.server.zcontext.GlobalContext;
-//import org.red5.server.net.remoting.message.RemotingResponse;
-
-public class RemotingHandler extends IoHandlerAdapter {
-
-	protected static Log log =
-        LogFactory.getLog(RemotingHandler.class.getName());
-	
-	private ProtocolCodecFactory codecFactory = null;
-	private ServiceInvoker serviceInvoker = null;
-	private GlobalContext globalContext = null;
-
-	public void setGlobalContext(GlobalContext globalContext) {
-		this.globalContext = globalContext;
-	}
-	
-	public void setCodecFactory(ProtocolCodecFactory codecFactory) {
-		this.codecFactory = codecFactory;
-	}
-	
-	public void setServiceInvoker(ServiceInvoker serviceInvoker) {
-		this.serviceInvoker = serviceInvoker;
-	}
-
-	public void exceptionCaught(IoSession arg0, Throwable arg1) throws Exception {
-		// TODO Auto-generated method stub
-		super.exceptionCaught(arg0, arg1);
-	}
-
-	public void messageReceived(IoSession session, Object message) throws Exception {
-		log.info("Message recieved: "+message);
-		if(!(message instanceof RemotingPacket))
-			return;
-		RemotingPacket req = (RemotingPacket) message;
-		Iterator it = req.getCalls().iterator();
-		while(it.hasNext()){
-			RemotingCall call = (RemotingCall) it.next();
-			//serviceInvoker.invoke(call, globalContext);
-			call.setResult(call.getArguments()[0]);
-			call.setStatus(RemotingCall.STATUS_SUCCESS_RESULT);
-		}
-		//RemotingResponse resp = new RemotingResponse(req.getCalls());
-		//session.write(resp).join(); // wait for it to write
-	}
-
-	public void messageSent(IoSession arg0, Object arg1) throws Exception {
-		// TODO Auto-generated method stub
-		super.messageSent(arg0, arg1);
-	}
-
-	public void sessionClosed(IoSession arg0) throws Exception {
-		// TODO Auto-generated method stub
-		super.sessionClosed(arg0);
-	}
-
-	public void sessionCreated(IoSession session) throws Exception {
-		// TODO Auto-generated method stub
-		
-		session.getFilterChain().addFirst(
-                "protocolFilter",new ProtocolCodecFilter(codecFactory) );
-        session.getFilterChain().addLast(
-                "logger", new LoggingFilter() );
-		
-	}
-
-	public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
-		// TODO Auto-generated method stub
-		super.sessionIdle(arg0, arg1);
-	}
-
-	public void sessionOpened(IoSession arg0) throws Exception {
-		// TODO Auto-generated method stub
-		super.sessionOpened(arg0);
-	}
-
-	
-}
Index: /java/server/trunk/src/org/red5/server/stream/IStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IStream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IStream.java (revision 656)
@@ -0,0 +1,19 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+public interface IStream {
+
+	// start pull downstream
+	public abstract void start();
+
+	// stop pull downsteam
+	public abstract void stop();
+	
+	// pull down stream, this is where we pause
+	public abstract void written(Message message);
+	
+	// push up stream
+	public abstract void publish(Message message);	
+	
+}
Index: /java/server/trunk/src/org/red5/server/stream/DownStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/DownStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/DownStreamSink.java (revision 656)
@@ -0,0 +1,61 @@
+package org.red5.server.stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.server.net.rtmp.Channel;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Message;
+
+public class DownStreamSink extends BaseStreamSink implements IStreamSink, Constants {
+
+	protected static Log log =
+        LogFactory.getLog(DownStreamSink.class.getName());
+	
+	private Channel video;
+	private Channel audio;
+	private Channel data;
+	
+	public DownStreamSink(Channel video, Channel audio, Channel data){
+		this.video = video;
+		this.audio = audio;
+		this.data = data;
+	}
+	
+	public void close() {
+		this.video.close();
+		this.audio.close();
+		this.data.close();
+		super.close();
+	}
+	
+	public void enqueue(Message message){
+		//log.info("out ts:"+message.getTimestamp());
+		switch(message.getDataType()){
+		case TYPE_VIDEO_DATA:
+		case TYPE_STREAM_METADATA:
+			//log.debug("write video");
+			video.write(message);
+			break;
+		case TYPE_AUDIO_DATA:
+			audio.write(message);
+			//log.debug("write audio");
+			break;
+		default:
+			data.write(message);
+			//log.debug("write other");
+			break;
+		}
+	}
+
+	public Channel getAudio() {
+		return audio;
+	}
+
+	public Channel getData() {
+		return data;
+	}
+
+	public Channel getVideo() {
+		return video;
+	}
+}
Index: /java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/BaseStreamSink.java (revision 656)
@@ -0,0 +1,34 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Message;
+
+public class BaseStreamSink implements IStreamSink, Constants {
+
+	protected ISinkContainer sinkContainer = null;
+	protected IVideoStreamCodec videoCodec = null;
+	
+	public boolean canAccept() {
+		return true;
+	}
+
+	public void enqueue(Message message) {
+		// override this...
+	}
+
+	public void close() {
+		if (this.sinkContainer != null)
+			this.sinkContainer.disconnect(this);
+		
+		this.videoCodec = null;
+	}
+
+	public void setVideoCodec(IVideoStreamCodec codec) {
+		this.videoCodec = codec;
+	}
+
+	public void setSinkContainer(ISinkContainer container) {
+		this.sinkContainer = container;
+	}
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/FileStreamSource.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/FileStreamSource.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/FileStreamSource.java (revision 656)
@@ -0,0 +1,85 @@
+package org.red5.server.stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.io.flv.IFLV;
+import org.red5.io.flv.IReader;
+import org.red5.io.flv.ITag;
+import org.red5.io.flv.IKeyFrameDataAnalyzer.KeyFrameMeta;
+import org.red5.server.net.rtmp.message.AudioData;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Invoke;
+import org.red5.server.net.rtmp.message.Message;
+import org.red5.server.net.rtmp.message.Notify;
+import org.red5.server.net.rtmp.message.Unknown;
+import org.red5.server.net.rtmp.message.VideoData;
+
+public class FileStreamSource implements ISeekableStreamSource, Constants {
+
+	protected static Log log =
+        LogFactory.getLog(FileStreamSource.class.getName());
+	
+	private IReader reader = null;
+	private KeyFrameMeta keyFrameMeta = null;
+	
+	public FileStreamSource(IReader reader){
+		this.reader = reader;
+	}
+	
+	public void close() {
+		reader.close();
+	}
+
+	public Message dequeue() {
+		
+		if(!reader.hasMoreTags()) return null;
+		ITag tag = reader.readTag();
+		
+		Message msg = null;
+		switch(tag.getDataType()){
+		case TYPE_AUDIO_DATA:
+			msg = new AudioData();
+			break;
+		case TYPE_VIDEO_DATA:
+			msg = new VideoData();
+			break;
+		case TYPE_INVOKE:
+			msg = new Invoke();
+			break;
+		case TYPE_NOTIFY:
+			msg = new Notify();
+			break;
+		default:
+			log.warn("Unexpected type? "+tag.getDataType());
+			msg = new Unknown(tag.getDataType());
+			break;
+		}
+		msg.setData(tag.getBody());
+		msg.setTimestamp(tag.getTimestamp());
+		msg.setSealed(true);
+		return msg;
+	}
+
+	public boolean hasMore() {
+		return reader.hasMoreTags();
+	}
+
+	synchronized public int seek(int ts) {
+		if (keyFrameMeta == null) {
+			keyFrameMeta = reader.analyzeKeyFrames();
+		}
+		if (keyFrameMeta.positions.length == 0) {
+			// no video keyframe metainfo, it's an audio-only FLV
+			// we skip the seek for now.
+			// TODO add audio-seek capability
+			return ts;
+		}
+		int frame = 0;
+		for (int i = 0; i < keyFrameMeta.positions.length; i++) {
+			if (keyFrameMeta.timestamps[i] > ts) break;
+			frame = i;
+		}
+		reader.position(keyFrameMeta.positions[frame]);
+		return keyFrameMeta.timestamps[frame];
+	}
+}
Index: /java/server/trunk/src/org/red5/server/stream/ISinkContainer.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/ISinkContainer.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/ISinkContainer.java (revision 656)
@@ -0,0 +1,9 @@
+package org.red5.server.stream;
+
+public interface ISinkContainer {
+
+	public void connect(IStreamSink stream);
+
+	public void disconnect(IStreamSink stream);
+
+}
Index: /java/server/trunk/src/org/red5/server/stream/FileStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/FileStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/FileStreamSink.java (revision 656)
@@ -0,0 +1,39 @@
+package org.red5.server.stream;
+
+import java.io.IOException;
+
+import org.red5.io.flv.ITag;
+import org.red5.io.flv.IWriter;
+import org.red5.io.flv.impl.Tag;
+import org.red5.server.net.rtmp.message.Message;
+
+public class FileStreamSink extends BaseStreamSink implements IStreamSink {
+
+	private IWriter writer;
+	
+	public FileStreamSink(IWriter writer){
+		this.writer = writer;
+	}
+	
+	public void close() {
+		writer.close();
+		super.close();
+	}
+
+	public void enqueue(Message message) {
+		
+		ITag tag = new Tag();
+		
+		tag.setDataType(message.getDataType());
+		tag.setTimestamp(message.getTimestamp());
+		tag.setBodySize(message.getData().limit());
+		tag.setBody(message.getData());
+		
+		try {
+			writer.writeTag(tag);
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}	
+	}
+}
Index: /java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/ISeekableStreamSource.java (revision 656)
@@ -0,0 +1,15 @@
+package org.red5.server.stream;
+
+/**
+ * Stream source that can be seeked in timeline
+ * @author The Red5 Project (red5 at osflash.org)
+ * @author Steven Gong (steven.gong at gmail.com)
+ */
+public interface ISeekableStreamSource extends IStreamSource {
+	/**
+	 * Seek the stream source to timestamp ts
+	 * @param ts Timestamp to seek to
+	 * @return Actual timestamp seeked to
+	 */
+	int seek(int ts);
+}
Index: /java/server/trunk/src/org/red5/server/stream/IStreamSource.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IStreamSource.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IStreamSource.java (revision 656)
@@ -0,0 +1,13 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+public interface IStreamSource {
+
+	public abstract boolean hasMore();
+	
+	public abstract Message dequeue();
+	
+	public abstract void close();
+	
+}
Index: /java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/TemporaryDownStream.java (revision 656)
@@ -0,0 +1,8 @@
+package org.red5.server.stream;
+
+public class TemporaryDownStream extends DownStreamSink {
+
+	public TemporaryDownStream() {
+		super(null, null, null);
+	}
+}
Index: /java/server/trunk/src/org/red5/server/stream/StreamManager.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/StreamManager.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/StreamManager.java (revision 656)
@@ -0,0 +1,134 @@
+package org.red5.server.stream;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.io.flv.IFLV;
+import org.red5.io.flv.IFLVService;
+import org.red5.io.flv.IWriter;
+import org.red5.server.net.rtmp.message.Status;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.io.Resource;
+
+public class StreamManager implements ApplicationContextAware {
+
+	protected static Log log =
+        LogFactory.getLog(StreamManager.class.getName());
+	
+	private ApplicationContext appCtx = null;
+	private String streamDir = "streams";
+	private HashMap published = new HashMap();
+	private IFLVService flvService;
+
+	public void setApplicationContext(ApplicationContext appCtx) throws BeansException {
+		this.appCtx = appCtx;
+	}
+	
+	public void setFlvService(IFLVService flvService) {
+		this.flvService = flvService;
+	}
+
+	public void publishStream(Stream stream){
+		
+		// If we have a read mode stream, we shouldnt be publishing return
+		if(stream.getMode().equals(Stream.MODE_READ)) return;
+		
+		MultiStreamSink multi = (MultiStreamSink) published.get(stream.getName());
+		if (multi == null)
+			// sink doesn't exist, create new
+			multi = new MultiStreamSink();
+			
+		stream.setUpstream(multi);
+		published.put(stream.getName(),multi);
+		
+		// If the mode is live, we dont need to do anything else
+		if(stream.getMode().equals(Stream.MODE_LIVE)) return;
+		
+		// The mode must be record or append
+		try {				
+			Resource res = appCtx.getResource("streams/" + stream.getName()+".flv");
+			if(stream.getMode().equals(Stream.MODE_RECORD) && res.exists()) 
+				res.getFile().delete();
+			if(!res.exists()) res = appCtx.getResource("streams/").createRelative(stream.getName()+".flv");
+			if(!res.exists()) res.getFile().createNewFile(); 
+			File file = res.getFile();
+			IFLV flv = flvService.getFLV(file);
+			IWriter writer = null; 
+			if(stream.getMode().equals(Stream.MODE_RECORD)) 
+				writer = flv.writer();
+			else if(stream.getMode().equals(Stream.MODE_APPEND))
+				writer = flv.append();
+			multi.connect(new FileStreamSink(writer));
+		} catch (IOException e) {
+			log.error("Error recording stream: "+stream, e);
+		}
+	}
+	
+	public void deleteStream(Stream stream){
+		if (stream.getUpstream() != null && published.containsKey(stream.getName())) {
+			// Notify all clients that stream is no longer published
+			MultiStreamSink multi = (MultiStreamSink) published.get(stream.getName());
+			Status unpublish = new Status(Status.NS_PLAY_UNPUBLISHNOTIFY);
+			unpublish.setClientid(stream.getStreamId());
+			unpublish.setDetails(stream.getName());
+			Iterator it = multi.streams.iterator();
+			while (it.hasNext()) {
+				Stream s = (Stream) it.next();
+				s.getDownstream().getData().sendStatus(unpublish);
+			}
+			published.remove(stream.getName());
+		}
+		stream.close();
+	}
+	
+	public boolean isPublishedStream(String name){
+		return published.containsKey(name);
+	}
+	
+	public boolean isFileStream(String name) {
+		if (this.isPublishedStream(name))
+			// A stream cannot be published and file based at the same time
+			return false;
+		
+		try {
+			File file = appCtx.getResources("streams/" + name)[0].getFile();
+			if (file.exists())
+				return true;
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		return false;
+	}
+	
+	public void connectToPublishedStream(Stream stream){
+		MultiStreamSink multi = (MultiStreamSink) published.get(stream.getName());
+		multi.connect(stream);
+	}
+	
+	public IStreamSource lookupStreamSource(String name){
+		return createFileStreamSource(name);
+	}
+
+	protected IStreamSource createFileStreamSource(String name){
+		Resource[] resource = null;
+		FileStreamSource source = null;
+		try {
+			File file = appCtx.getResources("streams/" + name)[0].getFile();
+			IFLV flv = flvService.getFLV(file);
+			source = new FileStreamSource(flv.reader());
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		return source;
+	}
+	
+}
Index: /java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IVideoStreamCodec.java (revision 656)
@@ -0,0 +1,27 @@
+package org.red5.server.stream;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface IVideoStreamCodec {
+
+	/*
+	 * Reset the codec to its initial state.
+	 */
+	public void reset();
+	
+	/*
+	 * Returns true if the codec knows how to handle the passed
+	 * stream data.
+	 */
+	public boolean canHandleData(ByteBuffer data);
+
+	/*
+	 * Update the state of the codec with the passed data.
+	 */
+	public boolean addData(ByteBuffer data);
+
+	/*
+	 * Return the data for a keyframe.
+	 */
+	public ByteBuffer getKeyframe();
+}
Index: /java/server/trunk/src/org/red5/server/stream/IStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/IStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/IStreamSink.java (revision 656)
@@ -0,0 +1,18 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+public interface IStreamSink {
+
+	public abstract boolean canAccept();
+	
+	// push down stream
+	public abstract void enqueue(Message message);
+	
+	public abstract void close();
+	
+	public abstract void setVideoCodec(IVideoStreamCodec codec);
+	
+	public abstract void setSinkContainer(ISinkContainer container);
+	
+}
Index: /java/server/trunk/src/org/red5/server/stream/Stream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/Stream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/Stream.java (revision 656)
@@ -0,0 +1,329 @@
+package org.red5.server.stream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.ByteBuffer;
+import org.red5.server.net.rtmp.BaseConnection;
+import org.red5.server.net.rtmp.Channel;
+import org.red5.server.net.rtmp.message.AudioData;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmp.message.Message;
+import org.red5.server.net.rtmp.message.Ping;
+import org.red5.server.net.rtmp.message.Status;
+import org.red5.server.net.rtmp.message.StreamBytesRead;
+import org.red5.server.net.rtmp.message.VideoData;
+
+public class Stream extends BaseStreamSink implements Constants, IStream, IStreamSink {
+	
+	public static final String MODE_READ = "read";
+	public static final String MODE_RECORD = "record";
+	public static final String MODE_APPEND = "append";
+	public static final String MODE_LIVE = "live";
+	
+	protected static Log log =
+        LogFactory.getLog(Stream.class.getName());
+	
+	private int writeQueue = 0;
+	
+	private long startTime = 0;
+	private long startTS = 0;
+	private long currentTS = 0;
+	private int playLength = -1;
+	private String name = "";
+	private boolean paused = false;
+	private String mode = MODE_READ;
+	
+	private DownStreamSink downstream = null;
+	private IStreamSink upstream = null;
+	private IStreamSource source = null;
+	private VideoCodecFactory videoCodecFactory = null;
+	
+	private int streamId = 0;
+	private boolean initialMessage = true;
+	
+	private BaseConnection conn;
+	
+	public Stream(BaseConnection conn){
+		this.conn = conn;
+	}
+	
+	public Stream(BaseConnection conn, String type) {
+		this.conn = conn;
+		this.mode = type;
+	}
+	
+	public int getStreamId() {
+		return streamId;
+	}
+	
+	public void setStreamId(int streamId) {
+		this.streamId = streamId;
+	}
+	
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	public String getMode() {
+		return mode;
+	}
+
+	public void setMode(String mode) {
+		this.mode = mode;
+	}
+
+	public DownStreamSink getDownstream() {
+		return downstream;
+	}
+
+	public void setDownstream(DownStreamSink downstream) {
+		this.downstream = downstream;
+	}
+
+	public IStreamSource getSource() {
+		return source;
+	}
+
+	public void setSource(IStreamSource source) {
+		this.source = source;
+	}
+
+	public IStreamSink getUpstream() {
+		return upstream;
+	}
+
+	public void setUpstream(IStreamSink upstream) {
+		this.upstream = upstream;
+	}
+
+	public void setVideoCodecFactory(VideoCodecFactory factory) {
+		this.videoCodecFactory = factory;
+	}
+
+	protected int bytesReadInterval = 125000;
+	protected int bytesRead = 0;
+	
+	public void publish(){
+		Status publish = new Status(Status.NS_PUBLISH_START);
+		publish.setClientid(streamId);
+		publish.setDetails(name);
+		Channel data = downstream.getData();
+		if (data != null)
+			// temporary streams don't have a data channel so check for it
+			data.sendStatus(publish);
+		
+		initialMessage = true;
+	}
+	
+	public void pause(){
+		paused = true;
+		Status pause  = new Status("NetStream.Pause.Notify");
+		pause.setClientid(1);
+		pause.setDetails(name);
+		downstream.getData().sendStatus(pause);
+	}
+	
+	public void resume(int resumeTS){
+		if (!paused) return;
+		paused = false;
+		if (!(source instanceof ISeekableStreamSource)) return;
+		ISeekableStreamSource sss = (ISeekableStreamSource) source;
+		int ts = sss.seek(resumeTS);
+		
+		Ping ping = new Ping();
+		ping.setValue1((short) 4);
+		ping.setValue2(streamId);
+		
+		conn.ping(ping);
+		
+		Ping ping2 = new Ping();
+		ping2.setValue1((short) 0);
+		ping2.setValue2(streamId);
+		
+		conn.ping(ping2);
+		
+		Status play  = new Status("NetStream.Unpause.Notify");
+		play.setClientid(1);
+		play.setDetails(name);
+		downstream.getData().sendStatus(play);
+		
+		AudioData blankAudio = new AudioData();
+		blankAudio.setTimestamp(ts);
+		downstream.getData().write(blankAudio);
+	}
+	
+	public void seek(int time) {
+		if (!(source instanceof ISeekableStreamSource)) return;
+		ISeekableStreamSource sss = (ISeekableStreamSource) source;
+		int ts = sss.seek(time);
+		
+		if (!paused) {
+			// seems not necessary, but seen in FMS's dump
+			Ping ping0 = new Ping();
+			ping0.setValue1((short) 1);
+			ping0.setValue2(streamId);
+			conn.ping(ping0);
+		}
+		
+		Ping ping1 = new Ping();
+		ping1.setValue1((short) 4);
+		ping1.setValue2(streamId);
+		
+		conn.ping(ping1);
+		
+		Ping ping2 = new Ping();
+		ping2.setValue1((short) 0);
+		ping2.setValue2(streamId);
+		
+		conn.ping(ping2);
+		
+		Status play  = new Status("NetStream.Seek.Notify");
+		play.setClientid(1);
+		play.setDetails(name);
+		downstream.getData().sendStatus(play);
+		
+		AudioData blankAudio = new AudioData();
+		blankAudio.setTimestamp(ts);
+		downstream.getData().write(blankAudio);
+		
+		if (paused) {
+			if(source !=null && source.hasMore()){
+				write(source.dequeue());
+			}
+		}
+	}
+	
+	public void start(int startTS, int length) {
+		startTime = System.currentTimeMillis();
+		playLength = length;
+		
+		if (startTS > 0 && source instanceof ISeekableStreamSource) {
+			((ISeekableStreamSource) source).seek(startTS);
+		}
+		
+		Status reset = new Status(Status.NS_PLAY_RESET);
+		Status start = new Status(Status.NS_PLAY_START);
+		reset.setClientid(streamId);
+		start.setClientid(streamId);
+		reset.setDetails(name);
+		start.setDetails(name);
+		
+		// This hack fixes the on meta data problem
+		// TODO: Perhaps its a good idea to init each channel with a blank audio packet
+		AudioData blankAudio = new AudioData();
+		downstream.getData().write(blankAudio);
+				
+		downstream.getData().sendStatus(reset);
+		downstream.getVideo().sendStatus(start);
+		
+		initialMessage = true;
+	}
+	
+	public void start(){
+		start(0, -1);
+	}
+	
+	public void stop(){
+		
+	}
+
+	public long getBufferLength(){
+		final long now = System.currentTimeMillis();
+		final long time = now - startTime;
+		final long sentTS = currentTS - startTS;
+		return time - sentTS;
+	}
+	
+	public boolean canAccept(){
+		return downstream.canAccept();
+	}
+	
+	public void enqueue(Message message){ 
+		write(message);
+	}
+	
+	protected void write(Message message){
+		if (downstream.canAccept()){
+			if (initialMessage) {
+				initialMessage = false;
+				startTS = message.getTimestamp();
+				if (this.videoCodec != null) {
+					ByteBuffer keyframe = this.videoCodec.getKeyframe();
+					if (keyframe != null) {
+						// Send initial keyframe to client
+						Message msg = new VideoData();
+						msg.setTimestamp(message.getTimestamp()-1);
+						msg.setData(keyframe);
+						msg.setSealed(true);
+						this.write(msg);
+					}
+				}
+			}
+			currentTS = message.getTimestamp();
+			if (playLength >= 0 && currentTS - startTS > playLength) return;
+			
+			if(log.isDebugEnabled())
+				log.debug("Sending downstream: " + message.getTimestamp());
+			//writeQueue++;
+			
+			downstream.enqueue(message);
+		}
+	}
+	
+	private int ts = 0;
+	private int bytesReadPacketCount = 0;
+	
+	public void publish(Message message){
+		ByteBuffer data = message.getData();
+		if (this.initialMessage && (message instanceof VideoData)) {
+			this.initialMessage = false;
+			
+			if (this.videoCodecFactory != null) {
+				this.videoCodec = this.videoCodecFactory.getVideoCodec(data);
+				if (this.upstream != null)
+					this.upstream.setVideoCodec(this.videoCodec);
+			}
+		}
+		
+		if (this.videoCodec != null)
+			this.videoCodec.addData(data);
+		
+		ts += message.getTimestamp();
+		bytesRead += message.getData().limit();
+		if(bytesReadPacketCount < Math.floor(bytesRead / bytesReadInterval)){
+			bytesReadPacketCount++;
+			StreamBytesRead streamBytesRead = new StreamBytesRead();
+			streamBytesRead.setBytesRead(bytesRead);
+			log.debug(streamBytesRead);
+			conn.getChannel((byte)2).write(streamBytesRead);
+		}
+		message.setTimestamp(ts);
+		if(upstream != null && upstream.canAccept()){
+			upstream.enqueue(message);
+		} else {
+			log.warn("No where for upstream packet to go :(");
+		}
+	}
+	
+	public void written(Message message){
+		if(paused) return;
+		writeQueue--;
+		synchronized (this) {
+			if(source !=null && source.hasMore()){
+				write(source.dequeue());
+			}
+		}
+	}
+	
+	public void close(){
+		if(upstream!=null) upstream.close();
+		if(downstream!=null) downstream.close();
+		if(source!=null) source.close();
+		super.close();
+	}
+	
+}
Index: /java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/MultiStreamSink.java (revision 656)
@@ -0,0 +1,74 @@
+package org.red5.server.stream;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.server.net.rtmp.message.Message;
+
+public class MultiStreamSink extends BaseStreamSink implements IStreamSink, ISinkContainer {
+
+	protected static Log log =
+        LogFactory.getLog(MultiStreamSink.class.getName());
+	
+	protected LinkedList streams = new LinkedList();
+
+	public void connect(IStreamSink stream){
+		synchronized (streams) {
+			streams.add(stream);
+		}
+		stream.setVideoCodec(this.videoCodec);
+		stream.setSinkContainer(this);
+	}
+
+	public void disconnect(IStreamSink stream) {
+		synchronized (streams) {
+			streams.remove(stream);
+		}
+		stream.setSinkContainer(null);
+	}
+	
+	public void setVideoCodec(IVideoStreamCodec codec) {
+		super.setVideoCodec(codec);
+		
+		// Update already connected streams
+		synchronized (streams) {
+			Iterator it = streams.iterator();
+			while (it.hasNext()) {
+				IStreamSink stream = (IStreamSink) it.next();
+				stream.setVideoCodec(codec);
+			}
+		}
+	}
+
+	// push message to all connected streams
+	public void enqueue(Message message) {
+		synchronized (streams) {
+			final Iterator it = streams.iterator();
+			while (it.hasNext()){
+				IStreamSink stream = (IStreamSink) it.next();
+				if (log.isDebugEnabled())
+					log.debug("Sending");
+				if (stream.canAccept()){
+					stream.enqueue(message);
+				} else {
+					log.warn("Out cant accept");
+				}
+			}
+		}
+	}
+
+	public void close(){
+		synchronized (streams) {
+			final Iterator it = streams.iterator();
+			while (it.hasNext()){
+				IStreamSink stream = (IStreamSink) it.next();
+				stream.close();
+			}
+			streams.clear();
+		}
+		
+		super.close();
+	}
+}
Index: /java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/VideoCodecFactory.java (revision 656)
@@ -0,0 +1,46 @@
+package org.red5.server.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mina.common.ByteBuffer;
+
+public class VideoCodecFactory {
+
+	private Log log = LogFactory.getLog(VideoCodecFactory.class.getName());
+
+	private List codecs = new ArrayList();
+	
+	public void setCodecs(List codecs) {
+		this.codecs = codecs;
+	}
+	
+	public IVideoStreamCodec getVideoCodec(ByteBuffer data) {
+		IVideoStreamCodec result = null;
+		Iterator it = this.codecs.iterator();
+		while (it.hasNext()) {
+			IVideoStreamCodec codec;
+			IVideoStreamCodec storedCodec = (IVideoStreamCodec) it.next();
+			// XXX: this is a bit of a hack to create new instances of the configured
+			//      video codec for each stream
+			try {
+				codec = (IVideoStreamCodec) storedCodec.getClass().newInstance();
+			} catch (Exception e) {
+				log.error("Could not create video codec instance.", e);
+				continue;
+			}
+			
+			log.info("Trying codec " + codec);
+			if (codec.canHandleData(data)) {
+				result = codec;
+				break;
+			}
+		}
+		
+		// No codec for this video data
+		return result;
+	}
+}
Index: /java/server/trunk/src/org/red5/server/stream/TemporaryStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/TemporaryStream.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/stream/TemporaryStream.java (revision 656)
@@ -0,0 +1,25 @@
+package org.red5.server.stream;
+
+import org.red5.server.net.rtmp.message.Message;
+
+// Very simple class that can be used as temporary stream while waiting for
+// the "real" published live stream.
+public class TemporaryStream extends Stream implements IStreamSource {
+
+	public TemporaryStream(String name, String mode) {
+		super(null, mode);
+		this.setName(name);
+		this.setSource(this);
+		this.setDownstream(new TemporaryDownStream());
+	}
+	
+	public boolean hasMore() {
+		return false;
+	}
+	
+	public Message dequeue() {
+		return null;
+	}
+	
+	public void close() {}
+}
Index: /java/server/trunk/src/org/red5/server/Context.java
===================================================================
--- /java/server/trunk/src/org/red5/server/Context.java (revision 651)
+++ /java/server/trunk/src/org/red5/server/Context.java (revision 656)
@@ -9,5 +9,5 @@
 import org.red5.server.api.IScopeHandler;
 import org.red5.server.api.IScopeResolver;
-import org.red5.server.api.persistance.IPersistanceStore;
+import org.red5.server.api.persistance.IPersistenceStore;
 import org.red5.server.api.service.IServiceInvoker;
 import org.red5.server.exception.ScopeHandlerNotFoundException;
@@ -26,5 +26,5 @@
 	private IServiceInvoker serviceInvoker;
 	private IMappingStrategy mappingStrategy;
-	private IPersistanceStore persistanceStore;
+	private IPersistenceStore persistanceStore;
 	
 	public Context(){
@@ -61,9 +61,9 @@
 	}
 
-	public IPersistanceStore getPersistanceStore() {
+	public IPersistenceStore getPersistanceStore() {
 		return persistanceStore; 
 	}
 
-	public void setPersistanceStore(IPersistanceStore persistanceStore) {
+	public void setPersistanceStore(IPersistenceStore persistanceStore) {
 		this.persistanceStore = persistanceStore;
 	}
Index: /java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java (revision 656)
+++ /java/server/trunk/src/org/red5/server/api/persistance/IPersistenceStore.java (revision 656)
@@ -0,0 +1,8 @@
+package org.red5.server.api.persistance;
+
+public interface IPersistenceStore {
+	
+	public boolean save(IPersistable obj);
+	public boolean load(IPersistable obj);
+	
+}
Index: /va/server/trunk/src/org/red5/server/api/persistance/IPersistanceStore.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/persistance/IPersistanceStore.java (revision 651)
+++  (revision )
@@ -1,8 +1,0 @@
-package org.red5.server.api.persistance;
-
-public interface IPersistanceStore {
-	
-	public boolean save(IPersistable obj);
-	public boolean load(IPersistable obj);
-	
-}
Index: /java/server/trunk/src/org/red5/server/api/IContext.java
===================================================================
--- /java/server/trunk/src/org/red5/server/api/IContext.java (revision 651)
+++ /java/server/trunk/src/org/red5/server/api/IContext.java (revision 656)
@@ -1,5 +1,5 @@
 package org.red5.server.api;
 
-import org.red5.server.api.persistance.IPersistanceStore;
+import org.red5.server.api.persistance.IPersistenceStore;
 import org.red5.server.api.service.IServiceInvoker;
 import org.springframework.context.ApplicationContext;
@@ -20,5 +20,5 @@
 	public IClientRegistry getClientRegistry();
 	public IServiceInvoker getServiceInvoker();
-	public IPersistanceStore getPersistanceStore();
+	public IPersistenceStore getPersistanceStore();
 	public Object lookupService(String serviceName);
 	public IScopeHandler lookupScopeHandler(String path);



More information about the Red5commits mailing list