[Red5commits] [982] support asynchronous remoting calls (Trac #17)
jbauch
luke at codegent.com
Tue Jun 20 04:23:57 EDT 2006
support asynchronous remoting calls (Trac #17)
Timestamp: 06/19/06 08:29:05 (1 day ago)
Change: 982
Author: jbauch
Files (see diff or trac for details):
java/server/trunk/conf/red5-common.xml
java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java
java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java
Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/982
Index: /java/server/trunk/conf/red5-common.xml
===================================================================
--- /java/server/trunk/conf/red5-common.xml (revision 956)
+++ /java/server/trunk/conf/red5-common.xml (revision 982)
@@ -53,3 +53,11 @@
<bean id="schedulingService" class="org.red5.server.scheduling.QuartzSchedulingService" />
+
+ <!-- threadpool settings for the remoting clients -->
+ <bean id="remotingPool" class="org.mortbay.thread.BoundedThreadPool"
+ init-method="start">
+ <property name="minThreads" value="4" />
+ <property name="maxThreads" value="16" />
+ </bean>
+
</beans>
Index: /java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java (revision 971)
+++ /java/server/trunk/src/org/red5/server/net/remoting/RemotingClient.java (revision 982)
@@ -31,4 +31,5 @@
import org.apache.commons.logging.LogFactory;
import org.apache.mina.common.ByteBuffer;
+import org.mortbay.thread.ThreadPool;
import org.red5.io.amf.Input;
@@ -36,4 +37,6 @@
import org.red5.io.object.Deserializer;
import org.red5.io.object.Serializer;
+import org.red5.server.api.IScope;
+import org.red5.server.api.Red5;
import org.red5.server.net.servlet.ServletUtils;
@@ -55,4 +58,7 @@
private static final String CONTENT_TYPE = "application/x-amf";
+ /** Name of the bean defining the thread pool. */
+ private static final String POOL_BEAN_ID = "remotingPool";
+
/** Manages HTTP connections. */
private static HttpConnectionManager connectionMgr = new MultiThreadedHttpConnectionManager();
@@ -101,5 +107,5 @@
* @return
*/
- private ByteBuffer encodeInvoke(String method, Object[] params) {
+ private synchronized ByteBuffer encodeInvoke(String method, Object[] params) {
ByteBuffer result = ByteBuffer.allocate(1024);
result.setAutoExpand(true);
@@ -199,5 +205,5 @@
* @return
*/
- private Object decodeResult(ByteBuffer data) {
+ private synchronized Object decodeResult(ByteBuffer data) {
processHeaders(data);
int count = data.getUnsignedShort();
@@ -220,5 +226,5 @@
* @param password
*/
- public void setCredentials(String userid, String password) {
+ public synchronized void setCredentials(String userid, String password) {
Map<String, String> data = new HashMap<String, String>();
data.put("userid", userid);
@@ -232,5 +238,5 @@
*
*/
- public void resetCredentials() {
+ public synchronized void resetCredentials() {
removeHeader(RemotingHeader.CREDENTIALS);
}
@@ -243,5 +249,5 @@
* @param value
*/
- public void addHeader(String name, boolean required, Object value) {
+ public synchronized void addHeader(String name, boolean required, Object value) {
RemotingHeader header = new RemotingHeader(name, required, value);
headers.put(name, header);
@@ -253,10 +259,10 @@
* @param name
*/
- public void removeHeader(String name) {
+ public synchronized void removeHeader(String name) {
headers.remove(name);
}
/**
- * Invoke a method on the remoting server.
+ * Invoke a method synchronously on the remoting server.
*
* @param method
@@ -289,4 +295,45 @@
}
+ /**
+ * Invoke a method asynchronously on the remoting server.
+ *
+ * @param method
+ * @param params
+ * @param callback
+ */
+ public void invokeMethod(String method, Object[] params, IRemotingCallback callback) {
+ IScope scope = Red5.getConnectionLocal().getScope();
+ RemotingWorker worker = new RemotingWorker(this, method, params, callback);
+
+ ThreadPool pool = (ThreadPool) scope.getContext().getBean(POOL_BEAN_ID);
+ pool.dispatch(worker);
+ }
+
+ /**
+ * Worker class that is used for asynchronous remoting calls.
+ */
+ protected class RemotingWorker implements Runnable {
+
+ protected RemotingClient client;
+ protected String method;
+ protected Object[] params;
+ protected IRemotingCallback callback;
+
+ protected RemotingWorker(RemotingClient client, String method, Object[] params, IRemotingCallback callback) {
+ this.client = client;
+ this.method = method;
+ this.params = params;
+ this.callback = callback;
+ }
+
+ public void run() {
+ try {
+ Object result = this.client.invokeMethod(method, params);
+ this.callback.resultReceived(this.client, method, params, result);
+ } catch (Exception err) {
+ this.callback.errorReceived(this.client, method, params, err);
+ }
+ }
+ }
}
Index: /java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java (revision 982)
+++ /java/server/trunk/src/org/red5/server/net/remoting/IRemotingCallback.java (revision 982)
@@ -0,0 +1,51 @@
+package org.red5.server.net.remoting;
+
+/*
+ * RED5 Open Source Flash Server - http://www.osflash.org/red5
+ *
+ * Copyright © 2006 by respective authors (see below). All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation; either version 2.1 of the License, or (at your option) any later
+ * version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License along
+ * with this library; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/**
+ * Callback for asynchronous remoting calls.
+ *
+ * @author The Red5 Project (red5 at osflash.org)
+ * @author Joachim Bauch (jojo at struktur.de)
+ *
+ */
+public interface IRemotingCallback {
+
+ /**
+ * The result of a remoting call has been received.
+ *
+ * @param client
+ * @param method
+ * @param params
+ * @param result
+ */
+ public void resultReceived(RemotingClient client, String method, Object[] params, Object result);
+
+ /**
+ * An error occured while performing the remoting call.
+ *
+ * @param client
+ * @param method
+ * @param params
+ * @param error
+ */
+ public void errorReceived(RemotingClient client, String method, Object[] params, Throwable error);
+
+}
Note:
Diffs are chopped if more than 30k.
This is to get past the limit on the mailing list.
More information about the Red5commits
mailing list