[Red5commits] [2457] 1. Use a central connection manager to manage all conections on Edge2. Improve

sgong luke at codegent.com
Mon Nov 5 08:20:15 PST 2007


1. Use a central connection manager to manage all conections on Edge
2. Improve MRTMP to support both RTMP and RTMPT Edge
3. Improve RTMPOriginConnection to support duplicate client id from different Edges with static connection (RTMP connection)
4. Quick Enable RTMP Edge (might break RTMPT Edge, need more refinement and cleanup)


Timestamp: 11/05/07 10:26:20 EST (less than one hour ago) 
Change: 2457 
Author: sgong

Files (see diff or trac for details): 
java/server/branches/clustering/clustering_note.txt
java/server/branches/clustering/conf/red5-edge-core.xml
java/server/branches/clustering/conf/red5-rtmpt.xml
java/server/branches/clustering/src/org/red5/server/net/mrtmp/EdgeMRTMPHandler.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPEdgeManager.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPManager.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPOriginManager.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/MRTMPPacket.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/OriginMRTMPHandler.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPEdgeManager.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPOriginManager.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/codec/MRTMPProtocolDecoder.java
java/server/branches/clustering/src/org/red5/server/net/mrtmp/codec/MRTMPProtocolEncoder.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/EdgeRTMPHandler.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/EdgeRTMPMinaConnection.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/EdgeRTMPMinaIoHandler.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/IRTMPConnManager.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/RTMPConnManager.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/RTMPConnection.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/RTMPMinaIoHandler.java
java/server/branches/clustering/src/org/red5/server/net/rtmp/RTMPOriginConnection.java
java/server/branches/clustering/src/org/red5/server/net/rtmpt/EdgeRTMPTConnection.java
java/server/branches/clustering/src/org/red5/server/net/rtmpt/EdgeRTMPTHandler.java
java/server/branches/clustering/src/org/red5/server/net/rtmpt/EdgeRTMPTServlet.java
java/server/branches/clustering/src/org/red5/server/net/rtmpt/RTMPTConnection.java
java/server/branches/clustering/src/org/red5/server/net/rtmpt/RTMPTServlet.java


Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/2457

Index: /java/server/branches/clustering/clustering_note.txt
===================================================================
--- /java/server/branches/clustering/clustering_note.txt (revision 2457)
+++ /java/server/branches/clustering/clustering_note.txt (revision 2457)
@@ -0,0 +1,11 @@
+1. event object serialization support
+2. add id to RTMPConnection
+3. put raw buffer handling to a common place
+RTMPHandler is designed to handle non-raw buffer
+Enhance RTMPHandler to handle raw buffer or override it to handle
+raw buffer?
+4. Use a common object to manage connections:
+create/delete connections
+get connection object by id
+inject the RTMPServlet and RTMPMinaIoHandler to use this common obj
+5. Use Enum for the type of connection
Index: /java/server/branches/clustering/conf/red5-edge-core.xml
===================================================================
--- /java/server/branches/clustering/conf/red5-edge-core.xml (revision 2273)
+++ /java/server/branches/clustering/conf/red5-edge-core.xml (revision 2457)
@@ -23,20 +23,26 @@
 		<property name="location" value="classpath:/red5.properties" />
 	</bean>
-	   
-	<!-- RTMP Handler 
+	
+	<bean id="rtmpConnManager"
+		class="org.red5.server.net.rtmp.RTMPConnManager">
+	</bean>
+	
+	<!-- RTMP Handler -->
 	<bean id="rtmpHandler"
-		class="org.red5.server.net.rtmp.RTMPHandler">
+		class="org.red5.server.net.rtmp.EdgeRTMPHandler">
 		<property name="server" ref="red5.server" />
 		<property name="statusObjectService" ref="statusObjectService" />
-	</bean> -->
-	
-	<!-- RTMP Mina IO Handler 
+		<property name="MRTMPManager" ref="mrtmpEdgeManager" />
+	</bean>
+	
+	<!-- RTMP Mina IO Handler -->
 	<bean id="rtmpMinaIoHandler"
-		class="org.red5.server.net.rtmp.RTMPMinaIoHandler">
+		class="org.red5.server.net.rtmp.EdgeRTMPMinaIoHandler">
 		<property name="handler" ref="rtmpHandler" />
 		<property name="codecFactory" ref="rtmpCodecFactory" />
-	</bean> -->
-	
-	<!-- RTMP Mina Transport 
+		<property name="rtmpConnManager" ref="rtmpConnManager" />
+	</bean>
+	
+	<!-- RTMP Mina Transport -->
 	<bean id="rtmpTransport" class="org.red5.server.net.rtmp.RTMPMinaTransport" init-method="start" destroy-method="stop">
 		<property name="ioHandler" ref="rtmpMinaIoHandler" />
@@ -49,16 +55,16 @@
 		<property name="eventThreadsQueue" value="${rtmp.event_threads_queue}" />
 		<property name="eventThreadsKeepalive" value="${rtmp.event_threads_keepalive}" />
-	</bean> -->
-	
-	<!-- RTMP Mina Connection 
-	<bean id="rtmpMinaConnection" scope="prototype"
-		class="org.red5.server.net.rtmp.RTMPMinaConnection"> -->
-		<!-- Ping clients every X ms. Set to 0 to disable ghost detection code. 
-		<property name="pingInterval" value="${rtmp.ping_interval}" /> -->
-		<!-- Disconnect client after X ms of not responding. 
-		<property name="maxInactivity" value="${rtmp.max_inactivity}" /> -->
-		<!-- Max. time in milliseconds to wait for a valid handshake.
+	</bean>
+		
+	<!-- RTMP Mina Connection -->
+	<bean id="rtmpEdgeMinaConnection" scope="prototype"
+		class="org.red5.server.net.rtmp.EdgeRTMPMinaConnection">
+		<!-- Ping clients every X ms. Set to 0 to disable ghost detection code. -->
+		<property name="pingInterval" value="${rtmp.ping_interval}" />
+		<!-- Disconnect client after X ms of not responding. -->
+		<property name="maxInactivity" value="${rtmp.max_inactivity}" />
+		<!-- Max. time in milliseconds to wait for a valid handshake. -->
 		<property name="maxHandshakeTimeout" value="5000" />
-	</bean> -->
+	</bean>
 	
 	<bean id="mrtmpCodecFactory"
@@ -66,6 +72,7 @@
 	
 	<bean id="mrtmpHandler" class="org.red5.server.net.mrtmp.EdgeMRTMPHandler">
-		<property name="mrtmpMananger" ref="mrtmpEdgeManager"/>
+		<property name="mrtmpManager" ref="mrtmpEdgeManager"/>
 		<property name="codecFactory" ref="mrtmpCodecFactory" />
+		<property name="rtmpConnManager" ref="rtmpConnManager" />
 	</bean>
 	
@@ -79,5 +86,4 @@
 	<bean id="mrtmpEdgeManager"
 		class="org.red5.server.net.mrtmp.SimpleMRTMPEdgeManager">
-		<property name="RTMPTServlet" ref="rtmptServlet" />
 	</bean>
 	
@@ -91,10 +97,10 @@
 	<!-- Use injection to store RTMPT handler in servlet -->
 	<bean id="rtmptServlet"
-		class="org.red5.server.net.rtmpt.RTMPTServlet">
+		class="org.red5.server.net.rtmpt.EdgeRTMPTServlet">
 		<property name="handler" ref="rtmptHandler" />
 	</bean>
 	
 	<!-- RTMPT Connection -->
-	<bean id="rtmptConnection" scope="prototype"
+	<bean id="rtmptEdgeConnection" scope="prototype"
 		class="org.red5.server.net.rtmpt.EdgeRTMPTConnection">
 		<!-- Ping clients every X ms. Set to 0 to disable ghost detection code. -->
Index: /java/server/branches/clustering/conf/red5-rtmpt.xml
===================================================================
--- /java/server/branches/clustering/conf/red5-rtmpt.xml (revision 2133)
+++ /java/server/branches/clustering/conf/red5-rtmpt.xml (revision 2457)
@@ -121,5 +121,5 @@
 							                           <bean class="org.mortbay.jetty.servlet.ServletHolder">
 							                               <property name="name" value="RTMPTServlet" />
-							                               <property name="className" value="org.red5.server.net.rtmpt.RTMPTServlet" />
+							                               <property name="className" value="org.red5.server.net.rtmpt.EdgeRTMPTServlet" />
 							                           </bean>
 							                           <bean class="org.mortbay.jetty.servlet.ServletHolder">
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/EdgeMRTMPHandler.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/EdgeMRTMPHandler.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/EdgeMRTMPHandler.java (revision 2457)
@@ -10,7 +10,8 @@
 import org.red5.server.net.mrtmp.MRTMPPacket.RTMPBody;
 import org.red5.server.net.mrtmp.MRTMPPacket.RTMPHeader;
+import org.red5.server.net.rtmp.IRTMPConnManager;
+import org.red5.server.net.rtmp.RTMPConnection;
 import org.red5.server.net.rtmp.event.Invoke;
 import org.red5.server.net.rtmp.message.Constants;
-import org.red5.server.net.rtmpt.EdgeRTMPTConnection;
 import org.red5.server.net.rtmpt.codec.EdgeRTMP;
 import org.red5.server.service.Call;
@@ -20,4 +21,5 @@
 	private static final Log log = LogFactory.getLog(EdgeMRTMPHandler.class);
 
+	private IRTMPConnManager rtmpConnManager;
 	private IMRTMPEdgeManager mrtmpManager;
 	private ProtocolCodecFactory codecFactory;
@@ -27,6 +29,10 @@
 	}
 
-	public void setMrtmpMananger(IMRTMPEdgeManager mrtmpMananger) {
-		this.mrtmpManager = mrtmpMananger;
+	public void setMrtmpManager(IMRTMPEdgeManager mrtmpManager) {
+		this.mrtmpManager = mrtmpManager;
+	}
+
+	public void setRtmpConnManager(IRTMPConnManager rtmpConnManager) {
+		this.rtmpConnManager = rtmpConnManager;
 	}
 
@@ -35,5 +41,5 @@
 		MRTMPPacket mrtmpPacket = (MRTMPPacket) message;
 		int clientId = mrtmpPacket.getHeader().getClientId();
-		EdgeRTMPTConnection conn = mrtmpManager.lookupRTMPTConnection(clientId);
+		RTMPConnection conn = rtmpConnManager.getConnection(clientId);
 		if (conn == null) {
 			log.debug("Client " + clientId + " is already closed.");
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPManager.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPManager.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPManager.java (revision 2457)
@@ -1,11 +1,17 @@
 package org.red5.server.net.mrtmp;
+
+import org.red5.server.net.rtmp.RTMPConnection;
 
 public interface IMRTMPManager {
 	/**
-	 * Map a client to an Origin MRTMP connection.
-	 * @param clientId
+	 * Map a client to an Edge/Origin MRTMP connection.
+	 * On Edge, the server will find an Origin connection per routing logic.
+	 * On Origin, the server will send to the original in-coming connection
+	 * if the client connection type is persistent. Or the latest in-coming
+	 * connection will be used.
+	 * @param conn
 	 * @return
 	 */
-	IMRTMPConnection lookupMRTMPConnection(int clientId);
+	IMRTMPConnection lookupMRTMPConnection(RTMPConnection conn);
 	
 	/**
@@ -13,11 +19,13 @@
 	 * been looked up.
 	 * @param conn
+	 * @return whether the registration is successful
 	 */
-	void registerConnection(IMRTMPConnection conn);
+	boolean registerConnection(IMRTMPConnection conn);
 	
 	/**
 	 * Unregister a MRTMP connection.
 	 * @param conn
+	 * @return whether the registration is successful
 	 */
-	void unregisterConnection(IMRTMPConnection conn);
+	boolean unregisterConnection(IMRTMPConnection conn);
 }
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/OriginMRTMPHandler.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/OriginMRTMPHandler.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/OriginMRTMPHandler.java (revision 2457)
@@ -13,4 +13,5 @@
 import org.apache.mina.filter.codec.ProtocolCodecFactory;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.red5.server.api.IConnection;
 import org.red5.server.net.rtmp.IRTMPHandler;
 import org.red5.server.net.rtmp.RTMPOriginConnection;
@@ -22,6 +23,8 @@
 	private ProtocolCodecFactory codecFactory;
 	private IRTMPHandler handler;
-	private Map<Integer, RTMPOriginConnection> connMap =
+	private Map<Integer, RTMPOriginConnection> dynConnMap =
 		new HashMap<Integer, RTMPOriginConnection>();
+	private Map<StaticConnId, RTMPOriginConnection> statConnMap =
+		new HashMap<StaticConnId, RTMPOriginConnection>();
 	private ReadWriteLock lock = new ReentrantReadWriteLock();
 	
@@ -48,19 +51,39 @@
 		}
 		int clientId = header.getClientId();
+		int sessionId = getSessionId(session);
 		MRTMPOriginConnection mrtmpConn = (MRTMPOriginConnection) session.getAttachment();
-		// set the afinity so that the follow-up packets can be sent
-		// via this mrtmp connection.
-		mrtmpManager.setAfinity(mrtmpConn, clientId);
+		RTMPOriginConnection conn = null;
 		switch (packet.getHeader().getType()) {
 			case MRTMPPacket.CONNECT:
 				lock.writeLock().lock();
 				try {
-					if (!connMap.containsKey(clientId)) {
-						RTMPOriginConnection conn = new RTMPOriginConnection(header.getClientId());
-						conn.setMrtmpManager(mrtmpManager);
-						conn.setHandler(this);
-						connMap.put(clientId, conn);
+					if (header.isDynamic()) {
+						if (!dynConnMap.containsKey(clientId)) {
+							conn = new RTMPOriginConnection(
+									IConnection.POLLING,
+									header.getClientId()
+									);
+							conn.setMrtmpManager(mrtmpManager);
+							conn.setHandler(this);
+							dynConnMap.put(clientId, conn);
+						} else {
+							log.warn("Open an already existing RTMPT origin connection!");
+						}
 					} else {
-						log.warn("Open an already existing origin connection!");
+						StaticConnId connId = new StaticConnId();
+						connId.clientId = header.getClientId();
+						connId.sessionId = sessionId;
+						if (!statConnMap.containsKey(connId)) {
+							conn = new RTMPOriginConnection(
+									IConnection.PERSISTENT,
+									header.getClientId(),
+									sessionId
+									);
+							conn.setMrtmpManager(mrtmpManager);
+							conn.setHandler(this);
+							statConnMap.put(connId, conn);
+						} else {
+							log.warn("Open an already existing RTMP origin connection!");
+						}
 					}
 				} finally {
@@ -69,18 +92,28 @@
 				break;
 			case MRTMPPacket.CLOSE:
-				closeConnection(clientId);
-				break;
 			case MRTMPPacket.RTMP:
 				lock.readLock().lock();
 				try {
-					if (connMap.containsKey(clientId)) {
-						RTMPOriginConnection conn = connMap.get(clientId);
-						MRTMPPacket.RTMPBody rtmpBody = (MRTMPPacket.RTMPBody) body;
-						handler.messageReceived(conn, conn.getState(), rtmpBody.getRtmpPacket());
+					if (header.isDynamic()) {
+						conn = dynConnMap.get(clientId);
 					} else {
-						log.warn("Handle on a non-existent origin connection!");
+						StaticConnId connId = new StaticConnId();
+						connId.clientId = header.getClientId();
+						connId.sessionId = sessionId;
+						conn = statConnMap.get(connId);
 					}
 				} finally {
 					lock.readLock().unlock();
+				}
+				if (conn != null) {
+					if (packet.getHeader().getType() == MRTMPPacket.CLOSE) {
+						closeConnection(conn);
+						conn = null;
+					} else {
+						MRTMPPacket.RTMPBody rtmpBody = (MRTMPPacket.RTMPBody) body;
+						handler.messageReceived(conn, conn.getState(), rtmpBody.getRtmpPacket());
+					}
+				} else {
+					log.warn("Handle on a non-existent origin connection!");
 				}
 				break;
@@ -88,4 +121,7 @@
 				log.warn("Unknown mrtmp packet received!");
 				break;
+		}
+		if (conn != null) {
+			mrtmpManager.associate(conn, mrtmpConn);
 		}
 	}
@@ -120,17 +156,69 @@
 	}
 
-	public void closeConnection(int clientId) {
+	public void closeConnection(RTMPOriginConnection conn) {
+		boolean dynamic = !conn.getType().equals(IConnection.PERSISTENT);
 		lock.writeLock().lock();
 		try {
-			if (connMap.containsKey(clientId)) {
-				RTMPOriginConnection conn = connMap.get(clientId);
-				connMap.remove(clientId);
-				conn.realClose();
+			if (dynamic) {
+				if (dynConnMap.containsKey(conn.getId())) {
+					dynConnMap.remove(conn.getId());
+					conn.realClose();
+				} else {
+					log.warn("Close a non-existent origin connection!");
+				}
 			} else {
-				log.warn("Close a non-existent origin connection!");
+				StaticConnId connId = new StaticConnId();
+				connId.clientId = conn.getId();
+				connId.sessionId = conn.getIoSessionId();
+				if (statConnMap.containsKey(connId)) {
+					statConnMap.remove(connId);
+					conn.realClose();
+				} else {
+					log.warn("Close a non-existent origin connection!");
+				}
 			}
 		} finally {
 			lock.writeLock().unlock();
 		}
+		mrtmpManager.dissociate(conn);
+	}
+	
+	protected int getSessionId(IoSession session) {
+		MRTMPOriginConnection mrtmpConn = (MRTMPOriginConnection) session.getAttachment();
+		if (mrtmpConn != null) {
+			return mrtmpConn.hashCode();
+		}
+		return 0;
+	}
+	
+	private class StaticConnId {
+		public int sessionId;
+		public int clientId;
+		
+		@Override
+		public int hashCode() {
+			final int PRIME = 31;
+			int result = 1;
+			result = PRIME * result + clientId;
+			result = PRIME * result + sessionId;
+			return result;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			final StaticConnId other = (StaticConnId) obj;
+			if (clientId != other.clientId)
+				return false;
+			if (sessionId != other.sessionId)
+				return false;
+			return true;
+		}
+		
 	}
 }
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/MRTMPPacket.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/MRTMPPacket.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/MRTMPPacket.java (revision 2457)
@@ -5,10 +5,12 @@
 
 public class MRTMPPacket {
-	public static final int CONNECT = 0;
-	public static final int CLOSE = 1;
-	public static final int RTMP = 2;
+	public static final short CONNECT = 0;
+	public static final short CLOSE = 1;
+	public static final short RTMP = 2;
 	
-	public static final int COMMON_HEADER_LENGTH = 16;
-	public static final int RTMP_HEADER_LENGTH = 20;
+	public static final short JAVA_ENCODING = 0;
+	
+	public static final int COMMON_HEADER_LENGTH = 20;
+	public static final int RTMP_HEADER_LENGTH = COMMON_HEADER_LENGTH + 4;
 	
 	private Header header;
@@ -16,5 +18,7 @@
 	
 	static public class Header {
-		private int type;
+		private short type;
+		private short bodyEncoding;
+		private boolean dynamic;
 		private int clientId;
 		private int headerLength;
@@ -45,11 +49,28 @@
 		}
 		
-		public int getType() {
+		public short getType() {
 			return type;
 		}
 		
-		public void setType(int type) {
+		public void setType(short type) {
 			this.type = type;
 		}
+
+		public short getBodyEncoding() {
+			return bodyEncoding;
+		}
+
+		public void setBodyEncoding(short bodyEncoding) {
+			this.bodyEncoding = bodyEncoding;
+		}
+
+		public boolean isDynamic() {
+			return dynamic;
+		}
+
+		public void setDynamic(boolean dynamic) {
+			this.dynamic = dynamic;
+		}
+		
 	}
 	
@@ -124,4 +145,5 @@
 				break;
 		}
+		buf.append(",isDynamic=" + header.isDynamic());
 		buf.append(",clientId=" + header.getClientId());
 		if (header.getType() == RTMP) {
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPEdgeManager.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPEdgeManager.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPEdgeManager.java (revision 2457)
@@ -1,12 +1,9 @@
 package org.red5.server.net.mrtmp;
 
-import org.red5.server.net.rtmpt.EdgeRTMPTConnection;
-
+/**
+ * A tag interface.
+ * @author Steven Gong (steven.gong at gmail.com)
+ * @version $Id$
+ */
 public interface IMRTMPEdgeManager extends IMRTMPManager {
-	/**
-	 * Look up the RTMPT connection of Edge.
-	 * @param clientId
-	 * @return
-	 */
-	EdgeRTMPTConnection lookupRTMPTConnection(int clientId);
 }
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPEdgeManager.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPEdgeManager.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPEdgeManager.java (revision 2457)
@@ -4,12 +4,18 @@
 import java.util.List;
 
-import org.red5.server.net.rtmpt.EdgeRTMPTConnection;
-import org.red5.server.net.rtmpt.RTMPTServlet;
+import org.red5.server.net.rtmp.RTMPConnection;
 
 public class SimpleMRTMPEdgeManager implements IMRTMPEdgeManager {
-	private RTMPTServlet servlet;
 	private List<IMRTMPConnection> connList = new ArrayList<IMRTMPConnection>();
 	
-	public IMRTMPConnection lookupMRTMPConnection(int clientId) {
+	public boolean registerConnection(IMRTMPConnection conn) {
+		return connList.add(conn);
+	}
+
+	public boolean unregisterConnection(IMRTMPConnection conn) {
+		return connList.remove(conn);
+	}
+
+	public IMRTMPConnection lookupMRTMPConnection(RTMPConnection conn) {
 		if (connList.size() > 0) {
 			return connList.get(0);
@@ -18,19 +24,4 @@
 		}
 	}
-
-	public void registerConnection(IMRTMPConnection conn) {
-		connList.add(conn);
-	}
-
-	public void unregisterConnection(IMRTMPConnection conn) {
-		connList.remove(conn);
-	}
-
-	public EdgeRTMPTConnection lookupRTMPTConnection(int clientId) {
-		return (EdgeRTMPTConnection) servlet.lookupConnection(clientId);
-	}
 	
-	public void setRTMPTServlet(RTMPTServlet servlet) {
-		this.servlet = servlet;
-	}
 }
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPOriginManager.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPOriginManager.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/IMRTMPOriginManager.java (revision 2457)
@@ -1,14 +1,24 @@
 package org.red5.server.net.mrtmp;
+
+import org.red5.server.net.rtmp.RTMPConnection;
 
 public interface IMRTMPOriginManager extends IMRTMPManager {
 	/**
-	 * Set the afinity of a RTMPT client with a MRTMP connection
-	 * so that the specified MRTMP connection will get higher
-	 * priority for selection as the output for packets.
-	 * Note it is implementation specific to choose which MRTMP
-	 * connection for which RTMPT client.
+	 * Associate the client to a MRTMP connection so that the packet
+	 * will be sent via this MRTMP connection.
+	 * The association has different impacts on persistent and polling
+	 * connections. For persistent connection, the mapping is static while
+	 * for polling connection, the mapping is dynamic and might not be
+	 * honored.
+	 * @param clientId
 	 * @param conn
-	 * @param clientId
 	 */
-	void setAfinity(IMRTMPConnection conn, int clientId);
+	void associate(RTMPConnection rtmpConn, IMRTMPConnection mrtmpConn);
+	
+	/**
+	 * Deassociate the client from the MRTMP connection previously
+	 * associated to.
+	 * @param rtmpConn
+	 */
+	void dissociate(RTMPConnection rtmpConn);
 }
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPOriginManager.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPOriginManager.java (revision 2236)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/SimpleMRTMPOriginManager.java (revision 2457)
@@ -6,7 +6,9 @@
 import java.util.Map;
 import java.util.Set;
-import java.util.WeakHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.red5.server.api.IConnection;
+import org.red5.server.net.rtmp.RTMPConnection;
 
 public class SimpleMRTMPOriginManager implements IMRTMPOriginManager {
@@ -14,5 +16,5 @@
 	private ReadWriteLock lock = new ReentrantReadWriteLock();
 	private Set<IMRTMPConnection> connSet = new HashSet<IMRTMPConnection>();
-	private Map<Integer, IMRTMPConnection> clientToConnMap;
+	private Map<RTMPConnection, IMRTMPConnection> clientToConnMap;
 	
 	public SimpleMRTMPOriginManager() {
@@ -21,20 +23,44 @@
 		// integration.
 		clientToConnMap = Collections.synchronizedMap(
-				new HashMap<Integer, IMRTMPConnection>());
-	}
-	
-	public void setAfinity(IMRTMPConnection conn, int clientId) {
-		clientToConnMap.put(clientId, conn);
+				new HashMap<RTMPConnection, IMRTMPConnection>());
 	}
 
-	public IMRTMPConnection lookupMRTMPConnection(int clientId) {
+	public boolean registerConnection(IMRTMPConnection conn) {
+		lock.writeLock().lock();
+		try {
+			return connSet.add(conn);
+		} finally {
+			lock.writeLock().unlock();
+		}
+	}
+
+	public boolean unregisterConnection(IMRTMPConnection conn) {
+		lock.writeLock().lock();
+		try {
+			return connSet.remove(conn);
+		} finally {
+			lock.writeLock().unlock();
+		}
+	}
+
+	public void associate(RTMPConnection rtmpConn, IMRTMPConnection mrtmpConn) {
+		clientToConnMap.put(rtmpConn, mrtmpConn);
+	}
+
+	public void dissociate(RTMPConnection rtmpConn) {
+		clientToConnMap.remove(rtmpConn);
+	}
+
+	public IMRTMPConnection lookupMRTMPConnection(RTMPConnection rtmpConn) {
 		lock.readLock().lock();
 		try {
-			IMRTMPConnection conn = clientToConnMap.get(clientId);
+			IMRTMPConnection conn = clientToConnMap.get(rtmpConn);
 			if (conn != null && !connSet.contains(conn)) {
-				clientToConnMap.remove(clientId);
+				clientToConnMap.remove(rtmpConn);
 				conn = null;
 			}
-			if (conn == null) {
+			// mrtmp connection not found, we locate the next mrtmp connection
+			// when the connection is not persistent.
+			if (conn == null && !rtmpConn.getType().equals(IConnection.PERSISTENT)) {
 				if (connSet.size() > 0) {
 					conn = connSet.iterator().next();
@@ -48,21 +74,3 @@
 	}
 
-	public void registerConnection(IMRTMPConnection conn) {
-		lock.writeLock().lock();
-		try {
-			connSet.add(conn);
-		} finally {
-			lock.writeLock().unlock();
-		}
-	}
-
-	public void unregisterConnection(IMRTMPConnection conn) {
-		lock.writeLock().lock();
-		try {
-			connSet.remove(conn);
-		} finally {
-			lock.writeLock().unlock();
-		}
-	}
-
 }
Index: /java/server/branches/clustering/src/org/red5/server/net/mrtmp/codec/MRTMPProtocolEncoder.java
===================================================================
--- /java/server/branches/clustering/src/org/red5/server/net/mrtmp/codec/MRTMPProtocolEncoder.java (revision 2235)
+++ /java/server/branches/clustering/src/org/red5/server/net/mrtmp/codec/MRTMPProtocolEncoder.java (revision 2457)
@@ -38,5 +38,8 @@
 			return;
 		}
-		buf.putInt(header.getType());
+		buf.putShort(header.getType());
+		buf.putS

Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.



More information about the Red5commits mailing list