[Red5commits] [1956] moved BaseConnectionMBean to rtmp as base is not a concrete class
pgregoire
luke at codegent.com
Fri May 4 03:10:14 EDT 2007
moved BaseConnectionMBean to rtmp as base is not a concrete class
Timestamp: 05/04/07 02:01:51 EST (less than one hour ago)
Change: 1956
Author: pgregoire
Files (see diff or trac for details):
java/server/trunk/src/org/red5/server/BaseConnection.java
java/server/trunk/src/org/red5/server/BaseConnectionMBean.java
java/server/trunk/src/org/red5/server/net/rtmp/RTMPConnection.java
java/server/trunk/src/org/red5/server/net/rtmp/RTMPMinaConnection.java
java/server/trunk/src/org/red5/server/net/rtmp/RTMPMinaConnectionMBean.java
Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/1956
Index: /java/server/trunk/src/org/red5/server/net/rtmp/RTMPConnection.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/rtmp/RTMPConnection.java (revision 1914)
+++ /java/server/trunk/src/org/red5/server/net/rtmp/RTMPConnection.java (revision 1956)
@@ -29,12 +29,14 @@
import java.util.concurrent.ConcurrentHashMap;
+import javax.management.ObjectName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.common.ByteBuffer;
import org.red5.server.BaseConnection;
+import org.red5.server.api.IBWControllable;
import org.red5.server.api.IBandwidthConfigure;
import org.red5.server.api.IConnectionBWConfig;
import org.red5.server.api.IContext;
-import org.red5.server.api.IBWControllable;
import org.red5.server.api.IScope;
import org.red5.server.api.Red5;
@@ -77,135 +79,140 @@
public abstract class RTMPConnection extends BaseConnection implements
IStreamCapableConnection, IServiceCapableConnection {
- /**
- * Logger
- */
+ /**
+ * Logger
+ */
protected static Log log = LogFactory
.getLog(RTMPConnection.class.getName());
- /**
- * Video codec factory constant
- */
- private static final String VIDEO_CODEC_FACTORY = "videoCodecFactory";
+ /**
+ * Video codec factory constant
+ */
+ private static final String VIDEO_CODEC_FACTORY = "videoCodecFactory";
// private Context context;
- /**
- * Connection channels
- *
- * @see org.red5.server.net.rtmp.Channel
- */
- private Map<Integer, Channel> channels = new ConcurrentHashMap<Integer, Channel>();
-
- /**
- * Client streams
- *
- * @see org.red5.server.api.stream.IClientStream
- */
- private Map<Integer, IClientStream> streams = new ConcurrentHashMap<Integer, IClientStream>();
+ /**
+ * Connection channels
+ *
+ * @see org.red5.server.net.rtmp.Channel
+ */
+ private Map<Integer, Channel> channels = new ConcurrentHashMap<Integer, Channel>();
+
+ /**
+ * Client streams
+ *
+ * @see org.red5.server.api.stream.IClientStream
+ */
+ private Map<Integer, IClientStream> streams = new ConcurrentHashMap<Integer, IClientStream>();
private Map<Integer, Boolean> reservedStreams = new ConcurrentHashMap<Integer, Boolean>();
- /**
- * Identifier for remote calls
- */
- protected Integer invokeId = 1;
-
- /**
- * Hash map that stores pending calls and ids as pairs.
- */
- protected Map<Integer, IPendingServiceCall> pendingCalls = new ConcurrentHashMap<Integer, IPendingServiceCall>();
-
- /**
- * Deferred results set
- *
- * @see org.red5.server.net.rtmp.DeferredResult
- */
- protected HashSet<DeferredResult> deferredResults = new HashSet<DeferredResult>();
-
- /**
- * Last ping timestamp
- */
- protected int lastPingTime = -1;
-
- /**
- * Timestamp when last ping command was sent.
- */
- protected long lastPingSent;
-
- /**
- * Timestamp when last ping result was received.
- */
- protected long lastPongReceived;
-
- /**
- * Name of quartz job that keeps connection alive
- */
- protected String keepAliveJobName;
-
- /**
- * Ping interval in ms to detect dead clients
- */
- protected int pingInterval = 5000;
-
- /**
- * Max. time in ms after a client is disconnected because of inactivity
- */
- protected int maxInactivity = 60000;
-
- /**
- * Data read interval
- */
- private int bytesReadInterval = 120*1024;
-
- /**
- * Previously number of bytes read from connection.
- */
- private long lastBytesRead = 0;
-
- /**
- * Number of bytes to read next
- */
- private int nextBytesRead = 120*1024;
-
- /**
- * Number of bytes the client reported to have received
- */
- private int clientBytesRead = 0;
-
- /**
- * Bandwidth configure
- */
- private IConnectionBWConfig bwConfig;
-
- /**
- * Bandwidth context used by bandwidth controller
- */
- private IBWControlContext bwContext;
-
- /**
- * Map for pending video packets and stream IDs
- */
- private Map<Integer, Integer> pendingVideos = new ConcurrentHashMap<Integer, Integer>();
-
- /**
- * Number of streams used
- */
- private int usedStreams;
-
- /**
- * AMF version, AMF0 by default
- */
- protected Encoding encoding = Encoding.AMF0;
-
- /**
- * Remembered stream buffer durations
- */
- protected Map<Integer, Integer> streamBuffers = new HashMap<Integer, Integer>();
-
- /**
- * Creates anonymous RTMP connection without scope
- * @param type Connection type
- */
- public RTMPConnection(String type) {
+ /**
+ * Identifier for remote calls
+ */
+ protected Integer invokeId = 1;
+
+ /**
+ * Hash map that stores pending calls and ids as pairs.
+ */
+ protected Map<Integer, IPendingServiceCall> pendingCalls = new ConcurrentHashMap<Integer, IPendingServiceCall>();
+
+ /**
+ * Deferred results set
+ *
+ * @see org.red5.server.net.rtmp.DeferredResult
+ */
+ protected HashSet<DeferredResult> deferredResults = new HashSet<DeferredResult>();
+
+ /**
+ * Last ping timestamp
+ */
+ protected int lastPingTime = -1;
+
+ /**
+ * Timestamp when last ping command was sent.
+ */
+ protected long lastPingSent;
+
+ /**
+ * Timestamp when last ping result was received.
+ */
+ protected long lastPongReceived;
+
+ /**
+ * Name of quartz job that keeps connection alive
+ */
+ protected String keepAliveJobName;
+
+ /**
+ * Ping interval in ms to detect dead clients
+ */
+ protected int pingInterval = 5000;
+
+ /**
+ * Max. time in ms after a client is disconnected because of inactivity
+ */
+ protected int maxInactivity = 60000;
+
+ /**
+ * Data read interval
+ */
+ private int bytesReadInterval = 120 * 1024;
+
+ /**
+ * Previously number of bytes read from connection.
+ */
+ private long lastBytesRead = 0;
+
+ /**
+ * Number of bytes to read next
+ */
+ private int nextBytesRead = 120 * 1024;
+
+ /**
+ * Number of bytes the client reported to have received
+ */
+ private int clientBytesRead = 0;
+
+ /**
+ * Bandwidth configure
+ */
+ private IConnectionBWConfig bwConfig;
+
+ /**
+ * Bandwidth context used by bandwidth controller
+ */
+ private IBWControlContext bwContext;
+
+ /**
+ * Map for pending video packets and stream IDs
+ */
+ private Map<Integer, Integer> pendingVideos = new ConcurrentHashMap<Integer, Integer>();
+
+ /**
+ * Number of streams used
+ */
+ private int usedStreams;
+
+ /**
+ * AMF version, AMF0 by default
+ */
+ protected Encoding encoding = Encoding.AMF0;
+
+ /**
+ * Remembered stream buffer durations
+ */
+ protected Map<Integer, Integer> streamBuffers = new HashMap<Integer, Integer>();
+
+ /**
+ * MBean object name used for de/registration purposes.
+ */
+ protected ObjectName oName;
+
+ /**
+ * Creates anonymous RTMP connection without scope
+ * @param type Connection type
+ */
+ public RTMPConnection(String type) {
// We start with an anonymous connection without a scope.
// These parameters will be set during the call of "connect" later.
@@ -213,12 +220,12 @@
super(type, null, null, 0, null, null, null);
}
-
- @Override
+
+ @Override
public boolean connect(IScope newScope, Object[] params) {
boolean success = super.connect(newScope, params);
if (success) {
- // XXX Bandwidth control service should not be bound to
- // a specific scope because it's designed to control
- // the bandwidth system-wide.
+ // XXX Bandwidth control service should not be bound to
+ // a specific scope because it's designed to control
+ // the bandwidth system-wide.
if (getScope() != null && getScope().getContext() != null) {
IBWControlService bwController = (IBWControlService) getScope()
@@ -231,12 +238,12 @@
/**
- * Initialize connection
- *
- * @param host Connection host
- * @param path Connection path
- * @param sessionId Connection session id
- * @param params Params passed from client
- */
- public void setup(String host, String path, String sessionId,
+ * Initialize connection
+ *
+ * @param host Connection host
+ * @param path Connection path
+ * @param sessionId Connection session id
+ * @param params Params passed from client
+ */
+ public void setup(String host, String path, String sessionId,
Map<String, Object> params) {
this.host = host;
@@ -248,16 +255,17 @@
}
- /**
- * Return AMF protocol encoding used by this connection
- * @return AMF encoding used by connection
- */
- public Encoding getEncoding() {
+ /**
+ * Return AMF protocol encoding used by this connection
+ * @return AMF encoding used by connection
+ */
+ public Encoding getEncoding() {
return encoding;
}
- /**
- * Getter for next available channel id
- *
- * @return Next available channel id
- */
+
+ /**
+ * Getter for next available channel id
+ *
+ * @return Next available channel id
+ */
public synchronized int getNextAvailableChannelId() {
int result = 4;
@@ -267,48 +275,48 @@
}
- /**
- * Checks whether channel is used
- * @param channelId Channel id
- * @return <code>true</code> if channel is in use, <code>false</code> otherwise
- */
- public boolean isChannelUsed(int channelId) {
+ /**
+ * Checks whether channel is used
+ * @param channelId Channel id
+ * @return <code>true</code> if channel is in use, <code>false</code> otherwise
+ */
+ public boolean isChannelUsed(int channelId) {
return channels.get(channelId) != null;
}
- /**
- * Return channel by id
- * @param channelId Channel id
- * @return Channel by id
- */
- public Channel getChannel(int channelId) {
- synchronized (channels) {
- Channel result = channels.get(channelId);
- if (result == null) {
- result = new Channel(this, channelId);
- channels.put(channelId, result);
- }
- return result;
- }
- }
-
- /**
- * Closes channel
- * @param channelId Channel id
- */
- public void closeChannel(int channelId) {
+ /**
+ * Return channel by id
+ * @param channelId Channel id
+ * @return Channel by id
+ */
+ public Channel getChannel(int channelId) {
+ synchronized (channels) {
+ Channel result = channels.get(channelId);
+ if (result == null) {
+ result = new Channel(this, channelId);
+ channels.put(channelId, result);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Closes channel
+ * @param channelId Channel id
+ */
+ public void closeChannel(int channelId) {
channels.remove(channelId);
}
/**
- * Getter for client streams
- *
- * @return Client streams as array
- */
- protected Collection<IClientStream> getStreams() {
+ * Getter for client streams
+ *
+ * @return Client streams as array
+ */
+ protected Collection<IClientStream> getStreams() {
return streams.values();
}
/** {@inheritDoc} */
- public int reserveStreamId() {
+ public int reserveStreamId() {
int result = -1;
synchronized (reservedStreams) {
@@ -325,12 +333,12 @@
}
- /**
- * Creates output stream object from stream id. Output stream consists of audio, data and video channels.
- *
- * @see org.red5.server.stream.OutputStream
- * @param streamId Stream id
- * @return Output stream object
- */
- public OutputStream createOutputStream(int streamId) {
+ /**
+ * Creates output stream object from stream id. Output stream consists of audio, data and video channels.
+ *
+ * @see org.red5.server.stream.OutputStream
+ * @param streamId Stream id
+ * @return Output stream object
+ */
+ public OutputStream createOutputStream(int streamId) {
int channelId = (4 + ((streamId - 1) * 5));
final Channel data = getChannel(channelId++);
@@ -343,9 +351,9 @@
/**
- * Getter for video codec factory
- *
- * @return Video codec factory
- */
- public VideoCodecFactory getVideoCodecFactory() {
+ * Getter for video codec factory
+ *
+ * @return Video codec factory
+ */
+ public VideoCodecFactory getVideoCodecFactory() {
final IContext context = scope.getContext();
ApplicationContext appCtx = context.getApplicationContext();
@@ -358,6 +366,6 @@
/** {@inheritDoc} */
- public IClientBroadcastStream newBroadcastStream(int streamId) {
- Boolean value = reservedStreams.get(streamId - 1);
+ public IClientBroadcastStream newBroadcastStream(int streamId) {
+ Boolean value = reservedStreams.get(streamId - 1);
if (value == null || !value) {
// StreamId has not been reserved before
@@ -370,8 +378,8 @@
return null;
}
- ApplicationContext appCtx =
- scope.getContext().getApplicationContext();
- ClientBroadcastStream cbs = (ClientBroadcastStream)
- appCtx.getBean("clientBroadcastStream");
+ ApplicationContext appCtx = scope.getContext()
+ .getApplicationContext();
+ ClientBroadcastStream cbs = (ClientBroadcastStream) appCtx
+ .getBean("clientBroadcastStream");
/**
* Picking up the ClientBroadcastStream defined as a spring prototype
@@ -392,8 +400,8 @@
}
- /** {@inheritDoc}
- * To be implemented.
- */
- public ISingleItemSubscriberStream newSingleItemSubscriberStream(
+ /** {@inheritDoc}
+ * To be implemented.
+ */
+ public ISingleItemSubscriberStream newSingleItemSubscriberStream(
int streamId) {
// TODO implement it
@@ -402,6 +410,6 @@
/** {@inheritDoc} */
- public IPlaylistSubscriberStream newPlaylistSubscriberStream(int streamId) {
- Boolean value = reservedStreams.get(streamId - 1);
+ public IPlaylistSubscriberStream newPlaylistSubscriberStream(int streamId) {
+ Boolean value = reservedStreams.get(streamId - 1);
if (value == null || !value) {
// StreamId has not been reserved before
@@ -414,12 +422,12 @@
return null;
}
- ApplicationContext appCtx =
- scope.getContext().getApplicationContext();
+ ApplicationContext appCtx = scope.getContext()
+ .getApplicationContext();
/**
* Picking up the PlaylistSubscriberStream defined as a spring prototype
* in red5-common.xml
*/
- PlaylistSubscriberStream pss = (PlaylistSubscriberStream)
- appCtx.getBean("playlistSubscriberStream");
+ PlaylistSubscriberStream pss = (PlaylistSubscriberStream) appCtx
+ .getBean("playlistSubscriberStream");
Integer buffer = streamBuffers.get(streamId - 1);
if (buffer != null)
@@ -436,14 +444,14 @@
/**
- * Getter for used stream count
- *
- * @return Value for property 'usedStreamCount'.
- */
- protected int getUsedStreamCount() {
+ * Getter for used stream count
+ *
+ * @return Value for property 'usedStreamCount'.
+ */
+ protected int getUsedStreamCount() {
return usedStreams;
}
/** {@inheritDoc} */
- public IClientStream getStreamById(int id) {
+ public IClientStream getStreamById(int id) {
if (id <= 0) {
return null;
@@ -453,10 +461,10 @@
}
- /**
- * Return stream id for given channel id
- * @param channelId Channel id
- * @return ID of stream that channel belongs to
- */
- public int getStreamIdForChannel(int channelId) {
+ /**
+ * Return stream id for given channel id
+ * @param channelId Channel id
+ * @return ID of stream that channel belongs to
+ */
+ public int getStreamIdForChannel(int channelId) {
if (channelId < 4) {
return 0;
@@ -466,10 +474,10 @@
}
- /**
- * Return stream for given channel id
- * @param channelId Channel id
- * @return Stream that channel belongs to
- */
- public IClientStream getStreamByChannelId(int channelId) {
+ /**
+ * Return stream for given channel id
+ * @param channelId Channel id
+ * @return Stream that channel belongs to
+ */
+ public IClientStream getStreamByChannelId(int channelId) {
if (channelId < 4) {
return null;
@@ -480,9 +488,9 @@
/** {@inheritDoc} */
- @Override
+ @Override
public void close() {
if (keepAliveJobName != null) {
- ISchedulingService schedulingService =
- (ISchedulingService) getScope().getContext().getBean(ISchedulingService.BEAN_NAME);
+ ISchedulingService schedulingService = (ISchedulingService) getScope()
+ .getContext().getBean(ISchedulingService.BEAN_NAME);
schedulingService.removeScheduledJob(keepAliveJobName);
keepAliveJobName = null;
@@ -493,9 +501,12 @@
if (streamService != null) {
synchronized (streams) {
- for (Map.Entry<Integer, IClientStream> entry: streams.entrySet()) {
+ for (Map.Entry<Integer, IClientStream> entry : streams
+ .entrySet()) {
IClientStream stream = entry.getValue();
if (stream != null) {
if (log.isDebugEnabled()) {
- log.debug("Closing stream: " + stream.getStreamId());
+ log
+ .debug("Closing stream: "
+ + stream.getStreamId());
}
streamService.deleteStream(this, stream.getStreamId());
@@ -508,5 +519,6 @@
channels.clear();
- if (bwContext != null && getScope() != null && getScope().getContext() != null) {
+ if (bwContext != null && getScope() != null
+ && getScope().getContext() != null) {
IBWControlService bwController = (IBWControlService) getScope()
.getContext().getBean(IBWControlService.KEY);
@@ -518,5 +530,5 @@
/** {@inheritDoc} */
- public void unreserveStreamId(int streamId) {
+ public void unreserveStreamId(int streamId) {
deleteStreamById(streamId);
if (streamId > 0) {
@@ -526,5 +538,5 @@
/** {@inheritDoc} */
- public void deleteStreamById(int streamId) {
+ public void deleteStreamById(int streamId) {
if (streamId > 0) {
if (streams.get(streamId - 1) != null) {
@@ -539,28 +551,28 @@
}
- /**
- * Handler for ping event
- * @param ping Ping event context
- */
- public void ping(Ping ping) {
+ /**
+ * Handler for ping event
+ * @param ping Ping event context
+ */
+ public void ping(Ping ping) {
getChannel((byte) 2).write(ping);
}
- /**
- * Write raw byte buffer
- * @param out Byte buffer
- */
- public abstract void rawWrite(ByteBuffer out);
-
- /**
- * Write packet
- * @param out Packet
- */
- public abstract void write(Packet out);
-
- /**
- * Update number of bytes to read next value
- */
- protected void updateBytesRead() {
+ /**
+ * Write raw byte buffer
+ * @param out Byte buffer
+ */
+ public abstract void rawWrite(ByteBuffer out);
+
+ /**
+ * Write packet
+ * @param out Packet
+ */
+ public abstract void write(Packet out);
+
+ /**
+ * Update number of bytes to read next value
+ */
+ protected void updateBytesRead() {
long bytesRead = getReadBytes();
if (bytesRead >= nextBytesRead) {
@@ -572,9 +584,9 @@
}
- /**
- * Read number of recieved bytes
- * @param bytes Number of bytes
- */
- public void receivedBytesRead(int bytes) {
+ /**
+ * Read number of recieved bytes
+ * @param bytes Number of bytes
+ */
+ public void receivedBytesRead(int bytes) {
log.info("Client received " + bytes + " bytes, written "
+ getWrittenBytes() + " bytes, " + getPendingMessages()
@@ -583,33 +595,33 @@
}
- /**
- * Get number of bytes the client reported to have received.
- *
- * @return number of bytes
- */
- public int getClientBytesRead() {
- return clientBytesRead;
- }
-
- /** {@inheritDoc} */
- public void invoke(IServiceCall call) {
+ /**
+ * Get number of bytes the client reported to have received.
+ *
+ * @return number of bytes
+ */
+ public int getClientBytesRead() {
+ return clientBytesRead;
+ }
+
+ /** {@inheritDoc} */
+ public void invoke(IServiceCall call) {
invoke(call, (byte) 3);
}
/**
- * Generate next invoke id
- *
- * @return Next invoke id for RPC
- */
- protected synchronized int getInvokeId() {
+ * Generate next invoke id
+ *
+ * @return Next invoke id for RPC
+ */
+ protected synchronized int getInvokeId() {
return invokeId++;
}
- /**
- * Register pending call (remote function call that is yet to finish)
- * @param invokeId Deferred operation id
- * @param call Call service
- */
- protected void registerPendingCall(int invokeId, IPendingServiceCall call) {
+ /**
+ * Register pending call (remote function call that is yet to finish)
+ * @param invokeId Deferred operation id
+ * @param call Call service
+ */
+ protected void registerPendingCall(int invokeId, IPendingServiceCall call) {
synchronized (pendingCalls) {
pendingCalls.put(invokeId, call);
@@ -618,5 +630,5 @@
/** {@inheritDoc} */
- public void invoke(IServiceCall call, byte channel) {
+ public void invoke(IServiceCall call, byte channel) {
// We need to use Invoke for all calls to the client
Invoke invoke = new Invoke();
@@ -624,5 +636,6 @@
invoke.setInvokeId(getInvokeId());
if (call instanceof IPendingServiceCall) {
- registerPendingCall(invoke.getInvokeId(), (IPendingServiceCall) call);
+ registerPendingCall(invoke.getInvokeId(),
+ (IPendingServiceCall) call);
}
getChannel(channel).write(invoke);
@@ -630,20 +643,20 @@
/** {@inheritDoc} */
- public void invoke(String method) {
+ public void invoke(String method) {
invoke(method, null, null);
}
/** {@inheritDoc} */
- public void invoke(String method, Object[] params) {
+ public void invoke(String method, Object[] params) {
invoke(method, params, null);
}
/** {@inheritDoc} */
- public void invoke(String method, IPendingServiceCallback callback) {
+ public void invoke(String method, IPendingServiceCallback callback) {
invoke(method, null, callback);
}
/** {@inheritDoc} */
- public void invoke(String method, Object[] params,
+ public void invoke(String method, Object[] params,
IPendingServiceCallback callback) {
IPendingServiceCall call = new PendingCall(method, params);
@@ -656,10 +669,10 @@
/** {@inheritDoc} */
- public void notify(IServiceCall call) {
+ public void notify(IServiceCall call) {
notify(call, (byte) 3);
}
/** {@inheritDoc} */
- public void notify(IServiceCall call, byte channel) {
+ public void notify(IServiceCall call, byte channel) {
Notify notify = new Notify();
notify.setCall(call);
@@ -668,10 +681,10 @@
/** {@inheritDoc} */
- public void notify(String method) {
+ public void notify(String method) {
notify(method, null);
}
/** {@inheritDoc} */
- public void notify(String method, Object[] params) {
+ public void notify(String method, Object[] params) {
IServiceCall call = new Call(method, params);
notify(call);
@@ -679,19 +692,19 @@
/** {@inheritDoc} */
- public IBandwidthConfigure getBandwidthConfigure() {
+ public IBandwidthConfigure getBandwidthConfigure() {
return bwConfig;
}
/** {@inheritDoc} */
- public IBWControllable getParentBWControllable() {
- // TODO return the client object
+ public IBWControllable getParentBWControllable() {
+ // TODO return the client object
return null;
}
/** {@inheritDoc} */
- public void setBandwidthConfigure(IBandwidthConfigure config) {
- if (!(config instanceof IConnectionBWConfig)) {
- return;
- }
+ public void setBandwidthConfigure(IBandwidthConfigure config) {
+ if (!(config instanceof IConnectionBWConfig)) {
+ return;
+ }
this.bwConfig = (IConnectionBWConfig) config;
@@ -712,6 +725,6 @@
}
if (bwContext != null) {
- IBWControlService bwControll
Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.
More information about the Red5commits
mailing list