[Red5devs] My first patch! - StreamingProxy Patch - imdt.com.br.StreamingProxy (modified version of red5 SP)

Steven Gong steven.gong at gmail.com
Wed Apr 15 09:34:10 PDT 2009


Your solution looks fine with me and I agree there is some refactoring
needed to the patch.

On Wed, Apr 15, 2009 at 10:49 AM, Tiago Daniel Jacobs - iMDT <
tiago at imdt.com.br> wrote:

>      When using StreamingProxy for live comunication, any delay on the
> network get added, generating a BIG delay.
>
>     Example: With a few minutes of transfer (few small delays something
> like 200ms per minute), we have something like 10 - 30 seconds of total
> delay, obviously, depending on latency of connection.
>
>     How to reproduce the bug: Use something to "slow down" your connection
> (like NetLimiter Trial) for 5 seconds. The other side will not receive your
> data, for 5 seconds, when you remove the limit, the stream get working, but
> the other side, will have the entire Queue of packets to receive, generating
> a offset between the sending people and receiving people. All latency get
> added.
>
>     My solution is drop video packets using VideoPacketDropper (to drop
> interframes also). So, the audio never get dropped, but the video packets
> that cannot be sent, got dropped when the network things go ugly.
>
>     If I can not explain ask again that I'll try more examples, but the
> whole thing is that.
>
>     Yes, I can make the diff path, but first I`ll need to refactor the code
> and see if this approach is interesting for red5 project, specifically for
> StreamingProxy. For my use it's ok, will only refactor if we can put this
> into red5 core.
>
>     Also, do you think this is the best approach?
>
> Thanks,
> Tiago
>
>
>
> Steven Gong escreveu:
>
> Hi Tiago,
> Can you explain in detail what problem you want to solve from this patch?
> And it would be more clear if you can provide the diff patch instead of the
> updated class. Thanks.
>
> On Wed, Apr 15, 2009 at 10:24 AM, Tiago Daniel Jacobs - iMDT <
> tiago at imdt.com.br> wrote:
>
>> Hey, i have made a patch for StreamingProxy, I used the VideoPacketDropper
>> for drop VideoPackets when the system get "Late".
>>
>> I don´t know if it´s a desirable approach for everyone, so, might I create
>> a new class StreamingProxyLiveApproach, or create a optional method that
>> turns on this operating mode? What do you think?
>>
>> The current dirty, but functional, new code is attached.
>>
>> I think to clean up this code and put inside red5, anyone want to do this?
>> (More time left for me for working on new patches).
>>
>> Thanks!
>> --
>>
>> package br.com.imdt.live.stream;
>>
>> import java.io.IOException;
>> import java.util.Map;
>> import java.util.UUID;
>>
>> import org.red5.server.net.rtmp.RTMPClient;
>> import org.red5.server.net.rtmp.INetStreamEventHandler;
>> import org.red5.server.net.rtmp.RTMPConnection;
>> import org.red5.io.utils.ObjectMap;
>> import org.red5.server.api.IConnection;
>> import org.red5.server.api.Red5;
>> import org.red5.server.api.service.IPendingServiceCall;
>> import org.red5.server.api.service.IPendingServiceCallback;
>> import org.red5.server.messaging.IMessage;
>> import org.red5.server.messaging.IMessageComponent;
>> import org.red5.server.messaging.IPipe;
>> import org.red5.server.messaging.IPipeConnectionListener;
>> import org.red5.server.messaging.IPushableConsumer;
>> import org.red5.server.messaging.OOBControlMessage;
>> import org.red5.server.messaging.PipeConnectionEvent;
>> import org.red5.server.net.rtmp.event.AudioData;
>> import org.red5.server.net.rtmp.event.IRTMPEvent;
>> import org.red5.server.net.rtmp.event.Notify;
>> import org.red5.server.net.rtmp.event.VideoData;
>> import org.red5.server.net.rtmp.status.StatusCodes;
>> import org.red5.server.stream.IBroadcastScope;
>> import org.red5.server.stream.IFrameDropper;
>> import org.red5.server.stream.IProviderService;
>> import org.red5.server.stream.IStreamData;
>> import org.red5.server.stream.VideoFrameDropper;
>> import org.red5.server.stream.message.RTMPMessage;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import br.com.imdt.live.RunningClusterNode;
>>
>> /**
>>  * A proxy to publish stream from server to server.
>>  * TODO: Use timer to monitor the connect/stream creation.
>>  * @author Steven Gong (steven.gong at gmail.com) / modified by T. D. Jacobs
>> (tiago at imdt.com.br)
>>  */
>> public class StreamingProxy
>> implements IPushableConsumer, IPipeConnectionListener,
>>                INetStreamEventHandler, IPendingServiceCallback {
>>        private static final int STOPPED = 0;
>>        private static final int CONNECTING = 1;
>>        private static final int STREAM_CREATING = 2;
>>        private static final int PUBLISHING = 3;
>>        private static final int PUBLISHED = 4;
>>
>>        private String host;
>>        private int port;
>>        private String app;
>>        private RTMPClient rtmpClient;
>>        private int state;
>>        private String publishName;
>>        private int streamId;
>>
>>        private UUID sourceServerUUID;
>>
>>        private Logger log = LoggerFactory.getLogger(StreamingProxy.class);
>>        private UUID roomUUID;
>>
>>        //Usado apenas para desconexão futura
>>        private IBroadcastScope sourceBroadcastScope;
>>
>>        private Thread th_StreamingProxyStatus;
>>
>>        //TDJ: dropper de video para atraso (LIVE STREAMING)
>>        private IFrameDropper videoFrameDropper = new VideoFrameDropper();
>>        private int timestampOffset = 0;
>>
>>        private long bufferCheckInterval  = 5000, nextCheckBufferUnderrun;
>>
>>        private String realUserID; //UUID do usuario real ( o que está
>> sendo streamado por este)
>>        public void init(UUID sourceServerUUID, UUID roomUUID) {
>>                log.info("init");
>>                this.sourceServerUUID = sourceServerUUID;
>>                this.roomUUID = roomUUID;
>>                rtmpClient = new RTMPClient();
>>                state = STOPPED;
>>                th_StreamingProxyStatus = new Thread(new
>> StreamingProxyStatus());
>>                th_StreamingProxyStatus.start();
>>
>>        }
>>
>>        synchronized public void start(String publishName) {
>>                log.info("start");
>>                state = CONNECTING;
>>                this.publishName = publishName;
>>                Map<String,Object> connectionParams =
>> rtmpClient.makeDefaultConnectionParams(host, port, app);
>>
>>                connectionParams.put("tcUrl",
>> ((String)connectionParams.get("tcUrl")).concat("/" + roomUUID.toString()));
>>                connectionParams.put("app", app +"/" +
>> roomUUID.toString());
>>
>>                rtmpClient.connect(host, port, connectionParams, this, new
>> Object[] {"STREAMING_PROXY", sourceServerUUID.toString(), realUserID} );
>>        }
>>
>>        synchronized public void stop() {
>>                log.info("stop");
>>                if (state >= STREAM_CREATING) {
>>                        rtmpClient.disconnect();
>>
>>                }
>>                state = STOPPED;
>>        }
>>
>>        public void onPipeConnectionEvent(PipeConnectionEvent event) {
>>                // nothing to do
>>        }
>>
>>        synchronized public void pushMessage(IPipe pipe, IMessage message)
>> throws IOException {
>>                if (state >= PUBLISHED && message instanceof RTMPMessage) {
>>                        RTMPMessage rtmpMsg = (RTMPMessage) message;
>>                        IRTMPEvent body = rtmpMsg.getBody();
>>                        RTMPConnection rtmpClientConnection =
>> rtmpClient.getConnManager().getConnection();
>>
>>                        if(rtmpClientConnection == null) {
>>                                return;
>>                        }
>>
>>                        Long pendingVideos =
>> rtmpClientConnection.getPendingVideoMessages(streamId);
>>
>>
>>                        if (body instanceof VideoData) {
>>                                if
>> (!videoFrameDropper.canSendPacket(rtmpMsg, pendingVideos)) {
>>                                        // Bloqueia frames que dependam de
>> outros frames que foram bloqueados anteriormente.
>>                                        System.out.println("canSendPacket =
>> false");
>>                                        return;
>>                                }
>>
>>                                if (pendingVideos > 1) {
>>                                        long now =
>> System.currentTimeMillis();
>>
>> //                                      if (bufferCheckInterval > 0
>> //                                                      && now >=
>> nextCheckBufferUnderrun) {
>>                                                log.info("Pacote perdido
>> na comunicação com outro nó do cluster");
>>                                                System.out.println("Pacote
>> perdido na comunicação com outro nó do cluster");
>> //
>> //                                              nextCheckBufferUnderrun =
>> now + bufferCheckInterval;
>> //                                      }
>>
>>
>>  videoFrameDropper.dropPacket(rtmpMsg);
>>                                        return;
>>                                }
>>
>>                                videoFrameDropper.sendPacket(rtmpMsg);
>>                        }
>>
>>
>>                        rtmpClient.publishStreamData(streamId, rtmpMsg);
>>
>>                }
>>        }
>>
>>        public void onOOBControlMessage(IMessageComponent source, IPipe
>> pipe,
>>                        OOBControlMessage oobCtrlMsg) {
>>                // TODO Auto-generated method stub
>>
>>        }
>>
>>        public void setHost(String host) {
>>                this.host = host;
>>        }
>>
>>        public void setPort(int port) {
>>                this.port = port;
>>        }
>>
>>        public void setApp(String app) {
>>                this.app = app;
>>        }
>>
>>        synchronized public void onStreamEvent(Notify notify) {
>>                log.info("onStreamEvent - {}", notify);
>>                log.info("onStreamEvent - call - {}", notify.getCall());
>>                log.info("onStreamEvent - call - arguments - {}",
>> notify.getCall().getArguments());
>>
>>                ObjectMap map = (ObjectMap)
>> notify.getCall().getArguments()[0];
>>                String code = (String) map.get("code");
>>                if (StatusCodes.NS_PUBLISH_START.equals(code)) {
>>                        state = PUBLISHED;
>>                }
>>        }
>>
>>        synchronized public void resultReceived(IPendingServiceCall call) {
>>                log.info("resultReceived - {} ",
>> call.getServiceMethodName());
>>                log.info("resultReceived - result = {} ",
>> call.getResult());
>>                if ("connect".equals(call.getServiceMethodName())) {
>>                        state = STREAM_CREATING;
>>                        rtmpClient.createStream(this);
>>                } else if
>> ("createStream".equals(call.getServiceMethodName())) {
>>                        state = PUBLISHING;
>>                        Object result = call.getResult();
>>                        if (result instanceof Integer) {
>>                                Integer streamIdInt = (Integer) result;
>>                                streamId = streamIdInt.intValue();
>>                                rtmpClient.publish(streamIdInt.intValue(),
>> publishName, "live", this);
>>                                videoFrameDropper.reset();
>>                        } else {
>>                                rtmpClient.disconnect();
>>                                state = STOPPED;
>>                        }
>>                }
>>        }
>>
>>
>>        /**
>>         * @param sourceBroadcast the sourceBroadcast to set
>>         */
>>        public void setSourceBroadcastScope(IBroadcastScope
>> sourceBroadcastScope) {
>>                this.sourceBroadcastScope = sourceBroadcastScope;
>>        }
>>
>>        /**
>>         * @return the sourceBroadcast
>>         */
>>        public IBroadcastScope getSourceBroadcastScope() {
>>                return sourceBroadcastScope;
>>        }
>>
>>
>>        /**
>>         * @param realUserID the realUserID to set
>>         */
>>        public void setRealUserID(String realUserID) {
>>                this.realUserID = realUserID;
>>        }
>>
>>        /**
>>         * @return the realUserID
>>         */
>>        public String getRealUserID() {
>>                return realUserID;
>>        }
>>
>>
>>        private class StreamingProxyStatus implements Runnable{
>>                private Logger log;
>>
>>                @Override
>>                public synchronized void run() {
>>                        /*while(true) {
>>                                try {
>>                                        Thread.sleep(4000);
>>                                } catch (InterruptedException e) {
>>                                        // TODO Auto-generated catch block
>>                                        e.printStackTrace();
>>                                }
>>                                System.out.println("StreamingProxyStatus:
>> getPendingVideoMessages: " +
>> rtmpClient.getConnManager().getConnection().getPendingVideoMessages(streamId));
>>                        }*/
>>                }
>>        }
>> }
>>
>> _______________________________________________
>> Red5devs mailing list
>> Red5devs at osflash.org
>> http://osflash.org/mailman/listinfo/red5devs_osflash.org
>>
>>
>
>
> --
> Best Regards
> Steven Gong
>
> InfraRed5 Red5 Consultant: http://www.infrared5.com, steven at infrared5.com
>
> Red5 Developer: http://osflash.org/red5,
> http://jira.red5.org/confluence/display/~steven/Home<http://jira.red5.org/confluence/display/%7Esteven/Home>
>
> Javaflash Project Founder and Maintainer: http://osflash.org/javaflash
>
> ------------------------------
>
> _______________________________________________
> Red5devs mailing listRed5devs at osflash.orghttp://osflash.org/mailman/listinfo/red5devs_osflash.org
>
>
>
> _______________________________________________
> Red5devs mailing list
> Red5devs at osflash.org
> http://osflash.org/mailman/listinfo/red5devs_osflash.org
>
>


-- 
Best Regards
Steven Gong

InfraRed5 Red5 Consultant: http://www.infrared5.com, steven at infrared5.com

Red5 Developer: http://osflash.org/red5,
http://jira.red5.org/confluence/display/~steven/Home

Javaflash Project Founder and Maintainer: http://osflash.org/javaflash
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://osflash.org/pipermail/red5devs_osflash.org/attachments/20090415/b5b75150/attachment-0001.html>


More information about the Red5devs mailing list