[Red5devs] My first patch! - StreamingProxy Patch - imdt.com.br.StreamingProxy (modified version of red5 SP)
Steven Gong
steven.gong at gmail.com
Wed Apr 15 09:34:10 PDT 2009
Your solution looks fine with me and I agree there is some refactoring
needed to the patch.
On Wed, Apr 15, 2009 at 10:49 AM, Tiago Daniel Jacobs - iMDT <
tiago at imdt.com.br> wrote:
> When using StreamingProxy for live comunication, any delay on the
> network get added, generating a BIG delay.
>
> Example: With a few minutes of transfer (few small delays something
> like 200ms per minute), we have something like 10 - 30 seconds of total
> delay, obviously, depending on latency of connection.
>
> How to reproduce the bug: Use something to "slow down" your connection
> (like NetLimiter Trial) for 5 seconds. The other side will not receive your
> data, for 5 seconds, when you remove the limit, the stream get working, but
> the other side, will have the entire Queue of packets to receive, generating
> a offset between the sending people and receiving people. All latency get
> added.
>
> My solution is drop video packets using VideoPacketDropper (to drop
> interframes also). So, the audio never get dropped, but the video packets
> that cannot be sent, got dropped when the network things go ugly.
>
> If I can not explain ask again that I'll try more examples, but the
> whole thing is that.
>
> Yes, I can make the diff path, but first I`ll need to refactor the code
> and see if this approach is interesting for red5 project, specifically for
> StreamingProxy. For my use it's ok, will only refactor if we can put this
> into red5 core.
>
> Also, do you think this is the best approach?
>
> Thanks,
> Tiago
>
>
>
> Steven Gong escreveu:
>
> Hi Tiago,
> Can you explain in detail what problem you want to solve from this patch?
> And it would be more clear if you can provide the diff patch instead of the
> updated class. Thanks.
>
> On Wed, Apr 15, 2009 at 10:24 AM, Tiago Daniel Jacobs - iMDT <
> tiago at imdt.com.br> wrote:
>
>> Hey, i have made a patch for StreamingProxy, I used the VideoPacketDropper
>> for drop VideoPackets when the system get "Late".
>>
>> I don´t know if it´s a desirable approach for everyone, so, might I create
>> a new class StreamingProxyLiveApproach, or create a optional method that
>> turns on this operating mode? What do you think?
>>
>> The current dirty, but functional, new code is attached.
>>
>> I think to clean up this code and put inside red5, anyone want to do this?
>> (More time left for me for working on new patches).
>>
>> Thanks!
>> --
>>
>> package br.com.imdt.live.stream;
>>
>> import java.io.IOException;
>> import java.util.Map;
>> import java.util.UUID;
>>
>> import org.red5.server.net.rtmp.RTMPClient;
>> import org.red5.server.net.rtmp.INetStreamEventHandler;
>> import org.red5.server.net.rtmp.RTMPConnection;
>> import org.red5.io.utils.ObjectMap;
>> import org.red5.server.api.IConnection;
>> import org.red5.server.api.Red5;
>> import org.red5.server.api.service.IPendingServiceCall;
>> import org.red5.server.api.service.IPendingServiceCallback;
>> import org.red5.server.messaging.IMessage;
>> import org.red5.server.messaging.IMessageComponent;
>> import org.red5.server.messaging.IPipe;
>> import org.red5.server.messaging.IPipeConnectionListener;
>> import org.red5.server.messaging.IPushableConsumer;
>> import org.red5.server.messaging.OOBControlMessage;
>> import org.red5.server.messaging.PipeConnectionEvent;
>> import org.red5.server.net.rtmp.event.AudioData;
>> import org.red5.server.net.rtmp.event.IRTMPEvent;
>> import org.red5.server.net.rtmp.event.Notify;
>> import org.red5.server.net.rtmp.event.VideoData;
>> import org.red5.server.net.rtmp.status.StatusCodes;
>> import org.red5.server.stream.IBroadcastScope;
>> import org.red5.server.stream.IFrameDropper;
>> import org.red5.server.stream.IProviderService;
>> import org.red5.server.stream.IStreamData;
>> import org.red5.server.stream.VideoFrameDropper;
>> import org.red5.server.stream.message.RTMPMessage;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import br.com.imdt.live.RunningClusterNode;
>>
>> /**
>> * A proxy to publish stream from server to server.
>> * TODO: Use timer to monitor the connect/stream creation.
>> * @author Steven Gong (steven.gong at gmail.com) / modified by T. D. Jacobs
>> (tiago at imdt.com.br)
>> */
>> public class StreamingProxy
>> implements IPushableConsumer, IPipeConnectionListener,
>> INetStreamEventHandler, IPendingServiceCallback {
>> private static final int STOPPED = 0;
>> private static final int CONNECTING = 1;
>> private static final int STREAM_CREATING = 2;
>> private static final int PUBLISHING = 3;
>> private static final int PUBLISHED = 4;
>>
>> private String host;
>> private int port;
>> private String app;
>> private RTMPClient rtmpClient;
>> private int state;
>> private String publishName;
>> private int streamId;
>>
>> private UUID sourceServerUUID;
>>
>> private Logger log = LoggerFactory.getLogger(StreamingProxy.class);
>> private UUID roomUUID;
>>
>> //Usado apenas para desconexão futura
>> private IBroadcastScope sourceBroadcastScope;
>>
>> private Thread th_StreamingProxyStatus;
>>
>> //TDJ: dropper de video para atraso (LIVE STREAMING)
>> private IFrameDropper videoFrameDropper = new VideoFrameDropper();
>> private int timestampOffset = 0;
>>
>> private long bufferCheckInterval = 5000, nextCheckBufferUnderrun;
>>
>> private String realUserID; //UUID do usuario real ( o que está
>> sendo streamado por este)
>> public void init(UUID sourceServerUUID, UUID roomUUID) {
>> log.info("init");
>> this.sourceServerUUID = sourceServerUUID;
>> this.roomUUID = roomUUID;
>> rtmpClient = new RTMPClient();
>> state = STOPPED;
>> th_StreamingProxyStatus = new Thread(new
>> StreamingProxyStatus());
>> th_StreamingProxyStatus.start();
>>
>> }
>>
>> synchronized public void start(String publishName) {
>> log.info("start");
>> state = CONNECTING;
>> this.publishName = publishName;
>> Map<String,Object> connectionParams =
>> rtmpClient.makeDefaultConnectionParams(host, port, app);
>>
>> connectionParams.put("tcUrl",
>> ((String)connectionParams.get("tcUrl")).concat("/" + roomUUID.toString()));
>> connectionParams.put("app", app +"/" +
>> roomUUID.toString());
>>
>> rtmpClient.connect(host, port, connectionParams, this, new
>> Object[] {"STREAMING_PROXY", sourceServerUUID.toString(), realUserID} );
>> }
>>
>> synchronized public void stop() {
>> log.info("stop");
>> if (state >= STREAM_CREATING) {
>> rtmpClient.disconnect();
>>
>> }
>> state = STOPPED;
>> }
>>
>> public void onPipeConnectionEvent(PipeConnectionEvent event) {
>> // nothing to do
>> }
>>
>> synchronized public void pushMessage(IPipe pipe, IMessage message)
>> throws IOException {
>> if (state >= PUBLISHED && message instanceof RTMPMessage) {
>> RTMPMessage rtmpMsg = (RTMPMessage) message;
>> IRTMPEvent body = rtmpMsg.getBody();
>> RTMPConnection rtmpClientConnection =
>> rtmpClient.getConnManager().getConnection();
>>
>> if(rtmpClientConnection == null) {
>> return;
>> }
>>
>> Long pendingVideos =
>> rtmpClientConnection.getPendingVideoMessages(streamId);
>>
>>
>> if (body instanceof VideoData) {
>> if
>> (!videoFrameDropper.canSendPacket(rtmpMsg, pendingVideos)) {
>> // Bloqueia frames que dependam de
>> outros frames que foram bloqueados anteriormente.
>> System.out.println("canSendPacket =
>> false");
>> return;
>> }
>>
>> if (pendingVideos > 1) {
>> long now =
>> System.currentTimeMillis();
>>
>> // if (bufferCheckInterval > 0
>> // && now >=
>> nextCheckBufferUnderrun) {
>> log.info("Pacote perdido
>> na comunicação com outro nó do cluster");
>> System.out.println("Pacote
>> perdido na comunicação com outro nó do cluster");
>> //
>> // nextCheckBufferUnderrun =
>> now + bufferCheckInterval;
>> // }
>>
>>
>> videoFrameDropper.dropPacket(rtmpMsg);
>> return;
>> }
>>
>> videoFrameDropper.sendPacket(rtmpMsg);
>> }
>>
>>
>> rtmpClient.publishStreamData(streamId, rtmpMsg);
>>
>> }
>> }
>>
>> public void onOOBControlMessage(IMessageComponent source, IPipe
>> pipe,
>> OOBControlMessage oobCtrlMsg) {
>> // TODO Auto-generated method stub
>>
>> }
>>
>> public void setHost(String host) {
>> this.host = host;
>> }
>>
>> public void setPort(int port) {
>> this.port = port;
>> }
>>
>> public void setApp(String app) {
>> this.app = app;
>> }
>>
>> synchronized public void onStreamEvent(Notify notify) {
>> log.info("onStreamEvent - {}", notify);
>> log.info("onStreamEvent - call - {}", notify.getCall());
>> log.info("onStreamEvent - call - arguments - {}",
>> notify.getCall().getArguments());
>>
>> ObjectMap map = (ObjectMap)
>> notify.getCall().getArguments()[0];
>> String code = (String) map.get("code");
>> if (StatusCodes.NS_PUBLISH_START.equals(code)) {
>> state = PUBLISHED;
>> }
>> }
>>
>> synchronized public void resultReceived(IPendingServiceCall call) {
>> log.info("resultReceived - {} ",
>> call.getServiceMethodName());
>> log.info("resultReceived - result = {} ",
>> call.getResult());
>> if ("connect".equals(call.getServiceMethodName())) {
>> state = STREAM_CREATING;
>> rtmpClient.createStream(this);
>> } else if
>> ("createStream".equals(call.getServiceMethodName())) {
>> state = PUBLISHING;
>> Object result = call.getResult();
>> if (result instanceof Integer) {
>> Integer streamIdInt = (Integer) result;
>> streamId = streamIdInt.intValue();
>> rtmpClient.publish(streamIdInt.intValue(),
>> publishName, "live", this);
>> videoFrameDropper.reset();
>> } else {
>> rtmpClient.disconnect();
>> state = STOPPED;
>> }
>> }
>> }
>>
>>
>> /**
>> * @param sourceBroadcast the sourceBroadcast to set
>> */
>> public void setSourceBroadcastScope(IBroadcastScope
>> sourceBroadcastScope) {
>> this.sourceBroadcastScope = sourceBroadcastScope;
>> }
>>
>> /**
>> * @return the sourceBroadcast
>> */
>> public IBroadcastScope getSourceBroadcastScope() {
>> return sourceBroadcastScope;
>> }
>>
>>
>> /**
>> * @param realUserID the realUserID to set
>> */
>> public void setRealUserID(String realUserID) {
>> this.realUserID = realUserID;
>> }
>>
>> /**
>> * @return the realUserID
>> */
>> public String getRealUserID() {
>> return realUserID;
>> }
>>
>>
>> private class StreamingProxyStatus implements Runnable{
>> private Logger log;
>>
>> @Override
>> public synchronized void run() {
>> /*while(true) {
>> try {
>> Thread.sleep(4000);
>> } catch (InterruptedException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> System.out.println("StreamingProxyStatus:
>> getPendingVideoMessages: " +
>> rtmpClient.getConnManager().getConnection().getPendingVideoMessages(streamId));
>> }*/
>> }
>> }
>> }
>>
>> _______________________________________________
>> Red5devs mailing list
>> Red5devs at osflash.org
>> http://osflash.org/mailman/listinfo/red5devs_osflash.org
>>
>>
>
>
> --
> Best Regards
> Steven Gong
>
> InfraRed5 Red5 Consultant: http://www.infrared5.com, steven at infrared5.com
>
> Red5 Developer: http://osflash.org/red5,
> http://jira.red5.org/confluence/display/~steven/Home<http://jira.red5.org/confluence/display/%7Esteven/Home>
>
> Javaflash Project Founder and Maintainer: http://osflash.org/javaflash
>
> ------------------------------
>
> _______________________________________________
> Red5devs mailing listRed5devs at osflash.orghttp://osflash.org/mailman/listinfo/red5devs_osflash.org
>
>
>
> _______________________________________________
> Red5devs mailing list
> Red5devs at osflash.org
> http://osflash.org/mailman/listinfo/red5devs_osflash.org
>
>
--
Best Regards
Steven Gong
InfraRed5 Red5 Consultant: http://www.infrared5.com, steven at infrared5.com
Red5 Developer: http://osflash.org/red5,
http://jira.red5.org/confluence/display/~steven/Home
Javaflash Project Founder and Maintainer: http://osflash.org/javaflash
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://osflash.org/pipermail/red5devs_osflash.org/attachments/20090415/b5b75150/attachment-0001.html>
More information about the Red5devs
mailing list