[Red5commits] [1576] committing merge results -r1569:1575
daccattato
luke at codegent.com
Wed Jan 24 10:58:42 EST 2007
committing merge results -r1569:1575
Timestamp: 11/25/06 23:47:18 EST (2 months ago)
Change: 1576
Author: daccattato
Files (see diff or trac for details):
java/server/branches/0.6_security/trunk/src/org/red5/server/api/so/IClientSharedObject.java
java/server/branches/0.6_security/trunk/src/org/red5/server/api/so/ISharedObject.java
java/server/branches/0.6_security/trunk/src/org/red5/server/api/so/ISharedObjectBase.java
java/server/branches/0.6_security/trunk/src/org/red5/server/api/so/ISharedObjectListener.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/BaseRTMPHandler.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/Channel.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/DeferredResult.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/RTMPClient.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/RTMPConnection.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/RTMPHandler.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/codec/RTMPProtocolDecoder.java
java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/codec/RTMPProtocolEncoder.java
java/server/branches/0.6_security/trunk/src/org/red5/server/so/ClientSharedObject.java
java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObject.java
java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObjectScope.java
java/server/branches/0.6_security/trunk/src/org/red5/server/stream/StreamService.java
Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/1576
Index: /java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObjectScope.java
===================================================================
--- /java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObjectScope.java (revision 1533)
+++ /java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObjectScope.java (revision 1576)
@@ -114,5 +114,5 @@
}
- public void beginUpdate() {
+ public synchronized void beginUpdate() {
if (!lock.isHeldByCurrentThread()) {
lock.lock();
@@ -121,5 +121,5 @@
}
- public void beginUpdate(IEventListener listener) {
+ public synchronized void beginUpdate(IEventListener listener) {
if (!lock.isHeldByCurrentThread()) {
lock.lock();
@@ -128,5 +128,5 @@
}
- public void endUpdate() {
+ public synchronized void endUpdate() {
so.endUpdate();
if (so.updateCounter == 0) {
Index: /java/server/branches/0.6_security/trunk/src/org/red5/server/so/ClientSharedObject.java
===================================================================
--- /java/server/branches/0.6_security/trunk/src/org/red5/server/so/ClientSharedObject.java (revision 1576)
+++ /java/server/branches/0.6_security/trunk/src/org/red5/server/so/ClientSharedObject.java (revision 1576)
@@ -0,0 +1,357 @@
+package org.red5.server.so;
+
+/*
+ * RED5 Open Source Flash Server - http://www.osflash.org/red5
+ *
+ * Copyright (c) 2006 by respective authors (see below). All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation; either version 2.1 of the License, or (at your option) any later
+ * version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License along
+ * with this library; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.server.api.IConnection;
+import org.red5.server.api.event.IEvent;
+import org.red5.server.api.event.IEventDispatcher;
+import org.red5.server.api.event.IEventListener;
+import org.red5.server.api.so.IClientSharedObject;
+import org.red5.server.api.so.ISharedObjectListener;
+import org.red5.server.net.rtmp.Channel;
+import org.red5.server.net.rtmp.RTMPConnection;
+import org.red5.server.so.ISharedObjectEvent.Type;
+
+public class ClientSharedObject extends SharedObject implements
+ IClientSharedObject, IEventDispatcher {
+
+ protected static Log log = LogFactory.getLog(ClientSharedObject.class.getName());
+
+ private boolean initialSyncReceived = false;
+ private final ReentrantLock lock = new ReentrantLock();
+ private HashSet<ISharedObjectListener> listeners = new HashSet<ISharedObjectListener>();
+ private HashMap<String, Object> handlers = new HashMap<String, Object>();
+
+ public ClientSharedObject(String name, boolean persistent) {
+ super();
+ this.name = name;
+ persistentSO = persistent;
+ }
+
+ /**
+ * Connect the shared object using the passed connection.
+ *
+ * @param conn
+ */
+ public void connect(IConnection conn) {
+ if (!(conn instanceof RTMPConnection))
+ throw new RuntimeException("can only connect through RTMP connections");
+
+ if (isConnected())
+ throw new RuntimeException("already connected");
+
+ source = conn;
+ SharedObjectMessage msg = new SharedObjectMessage(name, 0, isPersistentObject());
+ msg.addEvent(new SharedObjectEvent(Type.SERVER_CONNECT, null, null));
+ Channel c = ((RTMPConnection) conn).getChannel((byte) 3);
+ c.write(msg);
+ }
+
+ /**
+ * Disconnect the shared object.
+ */
+ public void disconnect() {
+ SharedObjectMessage msg = new SharedObjectMessage(name, 0, isPersistentObject());
+ msg.addEvent(new SharedObjectEvent(Type.SERVER_DISCONNECT, null, null));
+ Channel c = ((RTMPConnection) source).getChannel((byte) 3);
+ c.write(msg);
+ notifyDisconnect();
+ initialSyncReceived = false;
+ }
+
+ public boolean isConnected() {
+ return initialSyncReceived;
+ }
+
+ public void addSharedObjectListener(ISharedObjectListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeSharedObjectListener(ISharedObjectListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void dispatchEvent(IEvent e) {
+ if (e.getType() != IEvent.Type.SHARED_OBJECT
+ || !(e instanceof ISharedObjectMessage)) {
+ // Don't know how to handle this event.
+ return;
+ }
+
+ ISharedObjectMessage msg = (ISharedObjectMessage) e;
+ if (msg.hasSource()) {
+ beginUpdate(msg.getSource());
+ } else {
+ beginUpdate();
+ }
+ for (ISharedObjectEvent event : msg.getEvents()) {
+ switch (event.getType()) {
+ case CLIENT_INITIAL_DATA:
+ initialSyncReceived = true;
+ notifyConnect();
+ break;
+
+ case CLIENT_CLEAR_DATA:
+ data.clear();
+ notifyClear();
+ break;
+
+ case CLIENT_DELETE_DATA:
+ case CLIENT_DELETE_ATTRIBUTE:
+ data.remove(event.getKey());
+ notifyDelete(event.getKey());
+ break;
+
+ case CLIENT_SEND_MESSAGE:
+ notifySendMessage(event.getKey(), (List) event.getValue());
+ break;
+
+ case CLIENT_UPDATE_DATA:
+ data.putAll((Map<String, Object>) event.getValue());
+ notifyUpdate(event.getKey(), (Map<String, Object>) event.getValue());
+ break;
+
+ case CLIENT_UPDATE_ATTRIBUTE:
+ data.put(event.getKey(), event.getValue());
+ notifyUpdate(event.getKey(), event.getValue());
+ break;
+
+ default:
+ log.warn("Unknown SO event: " + event.getType());
+ }
+ }
+ endUpdate();
+ }
+
+ protected void notifyConnect() {
+ Iterator<ISharedObjectListener> it = listeners.iterator();
+ while (it.hasNext()) {
+ ISharedObjectListener listener = it.next();
+ listener.onSharedObjectConnect(this);
+ }
+ }
+
+ protected void notifyDisconnect() {
+ Iterator<ISharedObjectListener> it = listeners.iterator();
+ while (it.hasNext()) {
+ ISharedObjectListener listener = it.next();
+ listener.onSharedObjectDisconnect(this);
+ }
+ }
+
+ protected void notifyUpdate(String key, Object value) {
+ Iterator<ISharedObjectListener> it = listeners.iterator();
+ while (it.hasNext()) {
+ ISharedObjectListener listener = it.next();
+ listener.onSharedObjectUpdate(this, key, value);
+ }
+ }
+
+ protected void notifyUpdate(String key, Map<String, Object> value) {
+ if (value.size() == 1) {
+ Map.Entry<String, Object> entry = value.entrySet().iterator().next();
+ notifyUpdate(entry.getKey(), entry.getValue());
+ return;
+ }
+ Iterator<ISharedObjectListener> it = listeners.iterator();
+ while (it.hasNext()) {
+ ISharedObjectListener listener = it.next();
+ listener.onSharedObjectUpdate(this, key, value);
+ }
+ }
+
+ protected void notifyDelete(String key) {
+ Iterator<ISharedObjectListener> it = listeners.iterator();
+ while (it.hasNext()) {
+ ISharedObjectListener listener = it.next();
+ listener.onSharedObjectDelete(this, key);
+ }
+ }
+
+ protected void notifyClear() {
+ Iterator<ISharedObjectListener> it = listeners.iterator();
+ while (it.hasNext()) {
+ ISharedObjectListener listener = it.next();
+ listener.onSharedObjectClear(this);
+ }
+ }
+
+ protected void notifySendMessage(String method, List params) {
+ Iterator<ISharedObjectListener> it = listeners.iterator();
+ while (it.hasNext()) {
+ ISharedObjectListener listener = it.next();
+ listener.onSharedObjectSend(this, method, params);
+ }
+ }
+
+ @Override
+ public synchronized boolean setAttribute(String name, Object value) {
+ ownerMessage.addEvent(Type.SERVER_SET_ATTRIBUTE, name, null);
+ notifyModified();
+ return true;
+ }
+
+ @Override
+ public synchronized boolean removeAttribute(String name) {
+ ownerMessage.addEvent(Type.SERVER_DELETE_ATTRIBUTE, name, null);
+ notifyModified();
+ return true;
+ }
+
+ @Override
+ public synchronized void sendMessage(String handler, List arguments) {
+ ownerMessage.addEvent(Type.SERVER_SEND_MESSAGE, handler, arguments);
+ notifyModified();
+ }
+
+ @Override
+ public synchronized void removeAttributes() {
+ // TODO: there must be a direct way to clear the SO on the client
+ // side...
+ Iterator keys = data.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = (String) keys.next();
+ ownerMessage.addEvent(Type.SERVER_DELETE_ATTRIBUTE, key, null);
+ }
+ notifyModified();
+ }
+
+ @Override
+ public synchronized void beginUpdate() {
+ if (!lock.isHeldByCurrentThread()) {
+ lock.lock();
+ }
+ super.beginUpdate();
+ }
+
+ @Override
+ public synchronized void beginUpdate(IEventListener listener) {
+ if (!lock.isHeldByCurrentThread()) {
+ lock.lock();
+ }
+ super.beginUpdate(listener);
+ }
+
+ @Override
+ public synchronized void endUpdate() {
+ super.endUpdate();
+ if (updateCounter == 0) {
+ lock.unlock();
+ }
+ }
+
+ public void lock() {
+ lock.lock();
+ }
+
+ public void unlock() {
+ lock.unlock();
+ }
+
+ public boolean isLocked() {
+ return lock.isLocked();
+ }
+
+ public void registerServiceHandler(Object handler) {
+ registerServiceHandler("", handler);
+ }
+
+ public void unregisterServiceHandler(String name) {
+ handlers.remove(name);
+ }
+
+ public void registerServiceHandler(String name, Object handler) {
+ if (name == null) {
+ name = "";
+ }
+ handlers.put(name, handler);
+ }
+
+ public Object getServiceHandler(String name) {
+ if (name == null) {
+ name = "";
+ }
+ return handlers.get(name);
+ }
+
+ public Set<String> getServiceHandlerNames() {
+ return Collections.unmodifiableSet(handlers.keySet());
+ }
+
+ public Boolean getBoolAttribute(String name) {
+ return (Boolean) getAttribute(name);
+ }
+
+ public Byte getByteAttribute(String name) {
+ return (Byte) getAttribute(name);
+ }
+
+ public Double getDoubleAttribute(String name) {
+ return (Double) getAttribute(name);
+ }
+
+ public Integer getIntAttribute(String name) {
+ return (Integer) getAttribute(name);
+ }
+
+ public List getListAttribute(String name) {
+ return (List) getAttribute(name);
+ }
+
+ public Long getLongAttribute(String name) {
+ return (Long) getAttribute(name);
+ }
+
+ public Map getMapAttribute(String name) {
+ return (Map) getAttribute(name);
+ }
+
+ public Set getSetAttribute(String name) {
+ return (Set) getAttribute(name);
+ }
+
+ public Short getShortAttribute(String name) {
+ return (Short) getAttribute(name);
+ }
+
+ public String getStringAttribute(String name) {
+ return (String) getAttribute(name);
+ }
+
+ synchronized public Object getAttribute(String name, Object defaultValue) {
+ if (!hasAttribute(name)) {
+ setAttribute(name, defaultValue);
+ }
+
+ return getAttribute(name);
+ }
+
+}
Index: /java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObject.java
===================================================================
--- /java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObject.java (revision 1406)
+++ /java/server/branches/0.6_security/trunk/src/org/red5/server/so/SharedObject.java (revision 1576)
@@ -75,11 +75,11 @@
protected long lastModified = -1;
- private SharedObjectMessage ownerMessage;
-
- private LinkedList<ISharedObjectEvent> syncEvents = new LinkedList<ISharedObjectEvent>();
+ protected SharedObjectMessage ownerMessage;
+
+ protected LinkedList<ISharedObjectEvent> syncEvents = new LinkedList<ISharedObjectEvent>();
protected HashSet<IEventListener> listeners = new HashSet<IEventListener>();
- private IEventListener source = null;
+ protected IEventListener source = null;
public SharedObject() {
@@ -152,5 +152,5 @@
}
- private void sendUpdates() {
+ protected void sendUpdates() {
if (!ownerMessage.getEvents().isEmpty()) {
// Send update to "owner" of this update request
@@ -175,5 +175,5 @@
ownerMessage.getEvents().clear();
}
-
+
if (!syncEvents.isEmpty()) {
// Synchronize updates with all registered clients of this shared
@@ -217,5 +217,5 @@
}
- private void notifyModified() {
+ protected void notifyModified() {
if (updateCounter > 0) {
// we're inside a beginUpdate...endUpdate block
@@ -251,5 +251,5 @@
}
- public boolean setAttribute(String name, Object value) {
+ public synchronized boolean setAttribute(String name, Object value) {
ownerMessage.addEvent(Type.CLIENT_UPDATE_ATTRIBUTE, name, null);
Object old = data.get(name);
@@ -270,5 +270,5 @@
}
- public void setAttributes(Map values) {
+ public synchronized void setAttributes(Map values) {
if (values == null) {
return;
@@ -284,5 +284,5 @@
}
- public void setAttributes(IAttributeStore values) {
+ public synchronized void setAttributes(IAttributeStore values) {
if (values == null) {
return;
@@ -298,5 +298,5 @@
}
- public boolean removeAttribute(String name) {
+ public synchronized boolean removeAttribute(String name) {
boolean result = data.containsKey(name);
if (result) {
@@ -314,5 +314,5 @@
}
- public void sendMessage(String handler, List arguments) {
+ public synchronized void sendMessage(String handler, List arguments) {
ownerMessage.addEvent(Type.CLIENT_SEND_MESSAGE, handler, arguments);
syncEvents.add(new SharedObjectEvent(Type.CLIENT_SEND_MESSAGE, handler,
@@ -332,5 +332,5 @@
}
- public void removeAttributes() {
+ public synchronized void removeAttributes() {
// TODO: there must be a direct way to clear the SO on the client
// side...
@@ -348,5 +348,5 @@
}
- public void register(IEventListener listener) {
+ public synchronized void register(IEventListener listener) {
listeners.add(listener);
@@ -366,5 +366,5 @@
}
- public void unregister(IEventListener listener) {
+ public synchronized void unregister(IEventListener listener) {
listeners.remove(listener);
if (!isPersistentObject() && listeners.isEmpty()) {
@@ -388,10 +388,10 @@
}
- public void beginUpdate(IEventListener listener) {
+ public synchronized void beginUpdate(IEventListener listener) {
source = listener;
updateCounter += 1;
}
- public void endUpdate() {
+ public synchronized void endUpdate() {
updateCounter -= 1;
@@ -433,5 +433,5 @@
* @return true if successful; false otherwise
*/
- public boolean clear() {
+ public synchronized boolean clear() {
data.clear();
// Send confirmation to client
@@ -445,5 +445,5 @@
* shared object any longer.
*/
- public void close() {
+ public synchronized void close() {
// clear collections
data.clear();
Index: /java/server/branches/0.6_security/trunk/src/org/red5/server/stream/StreamService.java
===================================================================
--- /java/server/branches/0.6_security/trunk/src/org/red5/server/stream/StreamService.java (revision 1567)
+++ /java/server/branches/0.6_security/trunk/src/org/red5/server/stream/StreamService.java (revision 1576)
@@ -141,4 +141,8 @@
public void play(String name, int start, int length, boolean flushPlaylist) {
+ if (length == 0)
+ // Workaround for APPSERVER-7: ignore play requests with zero length
+ return;
+
IConnection conn = Red5.getConnectionLocal();
if (!(conn instanceof IStreamCapableConnection)) {
Index: /java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/DeferredResult.java
===================================================================
--- /java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/DeferredResult.java (revision 1576)
+++ /java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/DeferredResult.java (revision 1576)
@@ -0,0 +1,89 @@
+package org.red5.server.net.rtmp;
+
+/*
+ * RED5 Open Source Flash Server - http://www.osflash.org/red5
+ *
+ * Copyright (c) 2006 by respective authors (see below). All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation; either version 2.1 of the License, or (at your option) any later
+ * version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License along
+ * with this library; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+import java.lang.ref.WeakReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.red5.server.api.service.IPendingServiceCall;
+import org.red5.server.net.rtmp.event.Invoke;
+
+/**
+ * Can be returned to delay returning the result of invoked methods.
+ *
+ * @author The Red5 Project (red5 at osflash.org)
+ * @author Joachim Bauch (jojo at struktur.de)
+ */
+public class DeferredResult {
+
+ protected static Log log = LogFactory.getLog(DeferredResult.class.getName());
+
+ private WeakReference<Channel> channel;
+ private IPendingServiceCall call;
+ private int invokeId;
+ private boolean resultSent = false;
+
+ /**
+ * Set the result of a method call and send to the caller.
+ *
+ * @param result
+ * deferred result of the method call
+ */
+ public void setResult(Object result) {
+ if (this.resultSent)
+ throw new RuntimeException("You can only set the result once.");
+
+ this.resultSent = true;
+ Channel channel = this.channel.get();
+ if (channel == null) {
+ log.warn("The client is no longer connected.");
+ return;
+ }
+
+ Invoke reply = new Invoke();
+ call.setResult(result);
+ reply.setCall(call);
+ reply.setInvokeId(invokeId);
+ channel.write(reply);
+ channel.getConnection().unregisterDeferredResult(this);
+ }
+
+ /**
+ * Check if the result has been sent to the client.
+ *
+ * @return <code>true</code> if the result has been sent, otherwise <code>false</code>
+ */
+ public boolean wasSent() {
+ return resultSent;
+ }
+
+ protected void setInvokeId(int id) {
+ this.invokeId = id;
+ }
+
+ protected void setServiceCall(IPendingServiceCall call) {
+ this.call = call;
+ }
+
+ protected void setChannel(Channel channel) {
+ this.channel = new WeakReference<Channel>(channel);
+ }
+}
Index: /java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/RTMPHandler.java
===================================================================
--- /java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/RTMPHandler.java (revision 1547)
+++ /java/server/branches/0.6_security/trunk/src/org/red5/server/net/rtmp/RTMPHandler.java (revision 1576)
@@ -23,12 +23,8 @@
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.mina.common.ByteBuffer;
import org.red5.server.api.IContext;
import org.red5.server.api.IGlobalScope;
@@ -36,8 +32,5 @@
import org.red5.server.api.IScopeHandler;
import org.red5.server.api.IServer;
-import org.red5.server.api.Red5;
-import org.red5.server.api.event.IEventDispatcher;
import org.red5.server.api.service.IPendingServiceCall;
-import org.red5.server.api.service.IPendingServiceCallback;
import org.red5.server.api.service.IServiceCall;
import org.red5.server.api.so.ISharedObject;
@@ -50,17 +43,9 @@
import org.red5.server.messaging.IConsumer;
import org.red5.server.messaging.OOBControlMessage;
-import org.red5.server.net.protocol.ProtocolState;
-import org.red5.server.net.rtmp.codec.RTMP;
-import org.red5.server.net.rtmp.event.BytesRead;
import org.red5.server.net.rtmp.event.ChunkSize;
-import org.red5.server.net.rtmp.event.IRTMPEvent;
import org.red5.server.net.rtmp.event.Invoke;
import org.red5.server.net.rtmp.event.Notify;
import org.red5.server.net.rtmp.event.Ping;
-import org.red5.server.net.rtmp.event.Unknown;
-import org.red5.server.net.rtmp.message.Constants;
import org.red5.server.net.rtmp.message.Header;
-import org.red5.server.net.rtmp.message.Packet;
-import org.red5.server.net.rtmp.status.StatusCodes;
import org.red5.server.net.rtmp.status.StatusObject;
import org.red5.server.net.rtmp.status.StatusObjectService;
@@ -72,8 +57,7 @@
import org.red5.server.stream.IBroadcastScope;
import org.red5.server.stream.IStreamFlow;
-import org.red5.server.stream.PlaylistSubscriberStream;
import org.red5.server.stream.StreamService;
-public class RTMPHandler implements IRTMPHandler, Constants, StatusCodes {
+public class RTMPHandler extends BaseRTMPHandler {
protected static Log log = LogFactory.getLog(RTMPHandler.class.getName());
@@ -82,7 +66,4 @@
protected IServer server;
-
- // XXX: HACK HACK HACK to support stream ids
- private static ThreadLocal<Integer> streamLocal = new ThreadLocal<Integer>();
public void setServer(IServer server) {
@@ -94,138 +75,5 @@
}
- // XXX: HACK HACK HACK to support stream ids
- public static int getStreamId() {
- return streamLocal.get().intValue();
- }
-
- private static void setStreamId(int id) {
- streamLocal.set(id);
- }
-
- public void connectionOpened(RTMPConnection conn, RTMP state) {
- // Nothing to do here...
- }
-
- public void messageReceived(RTMPConnection conn, ProtocolState state,
- Object in) throws Exception {
-
- IRTMPEvent message = null;
- try {
-
- final Packet packet = (Packet) in;
- message = packet.getMessage();
- final Header header = packet.getHeader();
- final Channel channel = conn.getChannel(header.getChannelId());
- final IClientStream stream = conn.getStreamById(header
- .getStreamId());
-
- if (log.isDebugEnabled()) {
- log.debug("Message recieved");
- log.debug("Stream Id: " + header);
- log.debug("Channel: " + channel);
- }
-
- // Thread local performance ? Should we benchmark
- Red5.setConnectionLocal(conn);
-
- // XXX: HACK HACK HACK to support stream ids
- RTMPHandler.setStreamId(header.getStreamId());
-
- // Increase number of received messages
- conn.messageReceived();
-
- //if (message instanceof IRTMPEvent) {
- message.setSource(conn);
- //}
-
- switch (header.getDataType()) {
- case TYPE_CHUNK_SIZE:
- onChunkSize(conn, channel, header, (ChunkSize) message);
- break;
-
- case TYPE_INVOKE:
- onInvoke(conn, channel, header, (Invoke) message);
- if(message.getHeader().getStreamId()!=0
- && ((Invoke)message).getCall().getServiceName()==null
- && ACTION_PUBLISH.equals(((Invoke)message).getCall().getServiceMethodName())) {
- IClientStream s = conn.getStreamById(header.getStreamId());
- if (s != null)
- // Only dispatch if stream really was created
- ((IEventDispatcher) s).dispatchEvent(message);
- }
- break;
-
- case TYPE_NOTIFY: // just like invoke, but does not return
- if (((Notify) message).getData() != null && stream != null) {
- // Stream metadata
- ((IEventDispatcher) stream).dispatchEvent(message);
- } else {
- onInvoke(conn, channel, header, (Notify) message);
- }
- break;
- case TYPE_PING:
- onPing(conn, channel, header, (Ping) message);
- break;
-
- case TYPE_BYTES_READ:
- onStreamBytesRead(conn, channel, header,
- (BytesRead) message);
- break;
-
- case TYPE_AUDIO_DATA:
- case TYPE_VIDEO_DATA:
- // log.info("in packet: "+source.getSize()+"
- // ts:"+source.getTimer());
-
- // NOTE: If we respond to "publish" with "NetStream.Publish.BadName",
- // the client sends a few st
Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.
More information about the Red5commits
mailing list