[Red5commits] [982] support asynchronous remoting calls (Trac #17)

jbauch luke at codegent.com
Tue Jun 20 04:23:57 EDT 2006


support asynchronous remoting calls (Trac #17)


Timestamp: 06/19/06 08:29:05 (1 day ago) 
Change: 982 
Author: jbauch

Files (see diff or trac for details): 
java/server/trunk/conf/red5-common.xml
java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java
java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java


Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/982

Index: /java/server/trunk/conf/red5-common.xml
===================================================================
--- /java/server/trunk/conf/red5-common.xml (revision 956)
+++ /java/server/trunk/conf/red5-common.xml (revision 982)
@@ -53,3 +53,11 @@
     
     <bean id="schedulingService" class="org.red5.server.scheduling.QuartzSchedulingService" />
+    
+    <!-- threadpool settings for the remoting clients -->
+    <bean id="remotingPool" class="org.mortbay.thread.BoundedThreadPool"
+          init-method="start">
+        <property name="minThreads" value="4" />
+        <property name="maxThreads" value="16" />
+    </bean>
+    
 </beans>
Index: /java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java (revision 971)
+++ /java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java (revision 982)
@@ -31,4 +31,5 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.mina.common.ByteBuffer;
+import org.mortbay.thread.ThreadPool;
 
 import org.red5.io.amf.Input;
@@ -36,4 +37,6 @@
 import org.red5.io.object.Deserializer;
 import org.red5.io.object.Serializer;
+import org.red5.server.api.IScope;
+import org.red5.server.api.Red5;
 import org.red5.server.net.servlet.ServletUtils;
 
@@ -55,4 +58,7 @@
 	private static final String CONTENT_TYPE = "application/x-amf";
 
+	/** Name of the bean defining the thread pool. */
+	private static final String POOL_BEAN_ID = "remotingPool";
+	
 	/** Manages HTTP connections. */
 	private static HttpConnectionManager connectionMgr = new MultiThreadedHttpConnectionManager();
@@ -101,5 +107,5 @@
 	 * @return
 	 */
-	private ByteBuffer encodeInvoke(String method, Object[] params) {
+	private synchronized ByteBuffer encodeInvoke(String method, Object[] params) {
 		ByteBuffer result = ByteBuffer.allocate(1024);
 		result.setAutoExpand(true);
@@ -199,5 +205,5 @@
 	 * @return
 	 */
-	private Object decodeResult(ByteBuffer data) {
+	private synchronized Object decodeResult(ByteBuffer data) {
 		processHeaders(data);
 		int count = data.getUnsignedShort();
@@ -220,5 +226,5 @@
 	 * @param password
 	 */
-	public void setCredentials(String userid, String password) {
+	public synchronized void setCredentials(String userid, String password) {
 		Map<String, String> data = new HashMap<String, String>();
 		data.put("userid", userid);
@@ -232,5 +238,5 @@
 	 *
 	 */
-	public void resetCredentials() {
+	public synchronized void resetCredentials() {
 		removeHeader(RemotingHeader.CREDENTIALS);
 	}
@@ -243,5 +249,5 @@
 	 * @param value
 	 */
-	public void addHeader(String name, boolean required, Object value) {
+	public synchronized void addHeader(String name, boolean required, Object value) {
 		RemotingHeader header = new RemotingHeader(name, required, value);
 		headers.put(name, header);
@@ -253,10 +259,10 @@
 	 * @param name
 	 */
-	public void removeHeader(String name) {
+	public synchronized void removeHeader(String name) {
 		headers.remove(name);
 	}
 	
 	/**
-	 * Invoke a method on the remoting server.
+	 * Invoke a method synchronously on the remoting server.
 	 * 
 	 * @param method
@@ -289,4 +295,45 @@
 	}
 	
+	/**
+	 * Invoke a method asynchronously on the remoting server.
+	 * 
+	 * @param method
+	 * @param params
+	 * @param callback
+	 */
+	public void invokeMethod(String method, Object[] params, IRemotingCallback callback) {
+		IScope scope = Red5.getConnectionLocal().getScope();
+		RemotingWorker worker = new RemotingWorker(this, method, params, callback);
+		
+		ThreadPool pool = (ThreadPool) scope.getContext().getBean(POOL_BEAN_ID);
+		pool.dispatch(worker);
+	}
+	
+	/**
+	 * Worker class that is used for asynchronous remoting calls.
+	 */
+	protected class RemotingWorker implements Runnable {
+		
+		protected RemotingClient client;
+		protected String method;
+		protected Object[] params;
+		protected IRemotingCallback callback;
+		
+		protected RemotingWorker(RemotingClient client, String method, Object[] params, IRemotingCallback callback) {
+			this.client = client;
+			this.method = method;
+			this.params = params;
+			this.callback = callback;
+		}
+		
+		public void run() {
+			try {
+				Object result = this.client.invokeMethod(method, params);
+				this.callback.resultReceived(this.client, method, params, result);
+			} catch (Exception err) {
+				this.callback.errorReceived(this.client, method, params, err);
+			}
+		}
+	}
 	
 }
Index: /java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java (revision 982)
+++ /java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java (revision 982)
@@ -0,0 +1,51 @@
+package org.red5.server.net.remoting;
+
+/*
+ * RED5 Open Source Flash Server - http://www.osflash.org/red5
+ * 
+ * Copyright © 2006 by respective authors (see below). All rights reserved.
+ * 
+ * This library is free software; you can redistribute it and/or modify it under the 
+ * terms of the GNU Lesser General Public License as published by the Free Software 
+ * Foundation; either version 2.1 of the License, or (at your option) any later 
+ * version. 
+ * 
+ * This library is distributed in the hope that it will be useful, but WITHOUT ANY 
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License along 
+ * with this library; if not, write to the Free Software Foundation, Inc., 
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
+ */
+
+/**
+ * Callback for asynchronous remoting calls.
+ * 
+ * @author The Red5 Project (red5 at osflash.org)
+ * @author Joachim Bauch (jojo at struktur.de)
+ *
+ */
+public interface IRemotingCallback {
+
+	/**
+	 * The result of a remoting call has been received.
+	 *  
+	 * @param client
+	 * @param method
+	 * @param params
+	 * @param result
+	 */
+	public void resultReceived(RemotingClient client, String method, Object[] params, Object result);
+	
+	/**
+	 * An error occured while performing the remoting call.
+	 * 
+	 * @param client
+	 * @param method
+	 * @param params
+	 * @param error
+	 */
+	public void errorReceived(RemotingClient client, String method, Object[] params, Throwable error);
+	
+}


Note:
Diffs are chopped if more than 30k.
This is to get past the limit on the mailing list.



More information about the Red5commits mailing list