[Red5commits] [2432] Fixed APPSERVER-204. Changed FilePersistenceThread? to use scheduled executor. A

pgregoire luke at codegent.com
Wed Oct 24 22:30:14 PDT 2007


Fixed APPSERVER-204. Changed FilePersistenceThread? to use scheduled executor. Added cleanup for mina stats collector.


Timestamp: 10/25/07 00:24:37 EST (less than one hour ago) 
Change: 2432 
Author: pgregoire

Files (see diff or trac for details): 
java/server/trunk/conf/red5-common.xml
java/server/trunk/src/org/red5/server/jmx/JMXAgent.java
java/server/trunk/src/org/red5/server/net/rtmp/RTMPMinaTransport.java
java/server/trunk/src/org/red5/server/persistence/FilePersistence.java
java/server/trunk/src/org/red5/server/persistence/FilePersistenceThread.java
java/server/trunk/src/org/red5/server/war/WarLoaderServlet.java


Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/2432

Index: /java/server/trunk/conf/red5-common.xml
===================================================================
--- /java/server/trunk/conf/red5-common.xml (revision 2411)
+++ /java/server/trunk/conf/red5-common.xml (revision 2432)
@@ -12,5 +12,4 @@
 	<bean id="jmxAgent" class="org.red5.server.jmx.JMXAgent" init-method="init">
 		<!-- The RMI adapter allows remote connections to the MBeanServer -->
-
 		<property name="enableRmiAdapter" value="true"/>
 		<property name="rmiAdapterPort" value="9999"/>
Index: /java/server/trunk/src/org/red5/server/persistence/FilePersistence.java
===================================================================
--- /java/server/trunk/src/org/red5/server/persistence/FilePersistence.java (revision 2407)
+++ /java/server/trunk/src/org/red5/server/persistence/FilePersistence.java (revision 2432)
@@ -41,5 +41,5 @@
 
 /**
- * Simple file-based persistence for objects. Lowers memory usage if used instead of RAM memoty storage.
+ * Simple file-based persistence for objects. Lowers memory usage if used instead of RAM memory storage.
  * 
  * @author The Red5 Project (red5 at osflash.org)
Index: /java/server/trunk/src/org/red5/server/persistence/FilePersistenceThread.java
===================================================================
--- /java/server/trunk/src/org/red5/server/persistence/FilePersistenceThread.java (revision 2397)
+++ /java/server/trunk/src/org/red5/server/persistence/FilePersistenceThread.java (revision 2432)
@@ -24,4 +24,8 @@
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 
 import org.red5.server.api.persistence.IPersistable;
@@ -30,38 +34,46 @@
 
 /**
- * Thread that writes modified persistent objects to the filesystem periodically.
+ * Thread that writes modified persistent objects to the file system
+ * periodically.
  * 
  * @author The Red5 Project (red5 at osflash.org)
  * @author Joachim Bauch (jojo at struktur.de)
  */
-public class FilePersistenceThread extends Thread {
+public class FilePersistenceThread implements Runnable {
 
-    /**
-     * Logger
-     */
-    private Logger log = LoggerFactory.getLogger(FilePersistenceThread.class);
-    
+	/**
+	 * Logger
+	 */
+	private Logger log = LoggerFactory.getLogger(FilePersistenceThread.class);
+
 	/**
 	 * Interval to serialize modified objects in milliseconds.
-	 *
+	 * 
 	 */
-    // TODO: make this configurable
-	private int storeInterval = 10000;
-	
+	// TODO: make this configurable
+	private long storeInterval = 10000;
+
 	/**
 	 * Modified objects that need to be stored.
 	 */
-	private Map<IPersistable, FilePersistence> modifiedObjects = new HashMap<IPersistable, FilePersistence>();
-	
+	private Map<IPersistable, FilePersistence> modifiedObjects = new ConcurrentHashMap<IPersistable, FilePersistence>();
+
 	/**
 	 * Modified objects for each store.
 	 */
-	private Map<FilePersistence, Set<IPersistable>> objectStores = new HashMap<FilePersistence, Set<IPersistable>>();
-	
+	private Map<FilePersistence, Set<IPersistable>> objectStores = new ConcurrentHashMap<FilePersistence, Set<IPersistable>>();
+
 	/**
 	 * Singleton instance.
 	 */
 	private static volatile FilePersistenceThread instance = null;
-	
+
+	/**
+	 * Each FilePersistenceThread has its own scheduler and the executor is
+	 * guaranteed to only run a single task at a time.
+	 */
+	private final ScheduledExecutorService scheduler = Executors
+			.newSingleThreadScheduledExecutor();
+
 	/**
 	 * Return singleton instance of the thread.
@@ -70,17 +82,7 @@
 	 */
 	public static FilePersistenceThread getInstance() {
-		if (instance == null) {
-			// Only synchronize if thread doesn't exist yet.
-			synchronized (FilePersistenceThread.class) {
-				if (instance == null) {
-					instance = new FilePersistenceThread();
-					instance.start();
-				}
-			}
-		}
-		
 		return instance;
 	}
-	
+
 	/**
 	 * Create instance of the thread.
@@ -88,5 +90,11 @@
 	private FilePersistenceThread() {
 		super();
-		setName("FilePersistenceThread");
+		if (instance != null) {
+			log.error("Instance was not null, this is not a good sign");
+		}
+		instance = this;
+		final ScheduledFuture<?> instanceHandle = scheduler
+				.scheduleAtFixedRate(this, storeInterval, storeInterval,
+						java.util.concurrent.TimeUnit.MILLISECONDS);
 	}
 
@@ -98,19 +106,17 @@
 	 */
 	protected void modified(IPersistable object, FilePersistence store) {
-		FilePersistence previous;
-		synchronized (modifiedObjects) {
-			previous = modifiedObjects.put(object, store);
-			Set<IPersistable> objects = objectStores.get(store);
-			if (objects == null) {
-				objects = new HashSet<IPersistable>();
-				objectStores.put(store, objects);
-			}
-			objects.add(object);
+		FilePersistence previous = modifiedObjects.put(object, store);
+		Set<IPersistable> objects = objectStores.get(store);
+		if (objects == null) {
+			objects = new HashSet<IPersistable>();
+			objectStores.put(store, objects);
 		}
-		
+		objects.add(object);
+
 		if (previous != null && !previous.equals(store)) {
-			log.warn("Object " + object + " was also modified in " + previous + ", saving instantly");
+			log.warn("Object {} was also modified in {}, saving instantly",
+					new Object[] { object, previous });
 			previous.saveObject(object);
-			Set<IPersistable> objects = objectStores.get(previous);
+			objects = objectStores.get(previous);
 			if (objects != null) {
 				objects.remove(previous);
@@ -118,5 +124,5 @@
 		}
 	}
-	
+
 	/**
 	 * Write any pending objects for the given store to disk.
@@ -125,63 +131,55 @@
 	 */
 	protected void notifyClose(FilePersistence store) {
-		Set<IPersistable> objects;
 		// Get snapshot of currently modified objects.
-		synchronized (modifiedObjects) {
-			objects = objectStores.remove(store);
-			if (objects != null) {
-				for (IPersistable object: objects) {
-					modifiedObjects.remove(object);
-				}
+		Set<IPersistable> objects = objectStores.remove(store);
+		if (objects != null) {
+			for (IPersistable object : objects) {
+				modifiedObjects.remove(object);
 			}
 		}
-		
+
 		if (objects == null || objects.isEmpty()) {
 			return;
 		}
-		
+
 		// Store pending objects
-		for (IPersistable object: objects) {
+		for (IPersistable object : objects) {
 			try {
 				store.saveObject(object);
 			} catch (Throwable e) {
-				log.error("Error while saving " + object + " in " + store, e);
-			}
-		}
-	}
-	
-    /**
-     * Write modified objects to the filesystem periodically.
-     */
-	public void run() {
-		while (isAlive()) {
-			long start = System.currentTimeMillis();
-			if (!modifiedObjects.isEmpty()) {
-				Map<IPersistable, FilePersistence> objects;
-				// Get snapshot of currently modified objects.
-				synchronized (modifiedObjects) {
-					objects = new HashMap<IPersistable, FilePersistence>(modifiedObjects);
-					modifiedObjects.clear();
-					objectStores.clear();
-				}
-				
-				for (Map.Entry<IPersistable, FilePersistence> entry: objects.entrySet()) {
-					try {
-						entry.getValue().saveObject(entry.getKey());
-					} catch (Throwable e) {
-						log.error("Error while saving " + entry.getKey() + " in " + entry.getValue(), e);
-					}
-				}
-			}
-			long end = System.currentTimeMillis();
-			try {
-				long delay = storeInterval - (end - start);
-				if (delay > 0) {
-					Thread.sleep(delay);
-				}
-			} catch (InterruptedException e) {
-				// Ignore
+				log.error("Error while saving {} in {}. {}", new Object[] {
+						object, store, e });
 			}
 		}
 	}
 
+	/**
+	 * Write modified objects to the file system periodically.
+	 */
+	public void run() {
+		if (!modifiedObjects.isEmpty()) {
+			// Get snapshot of currently modified objects.
+			Map<IPersistable, FilePersistence> objects = new HashMap<IPersistable, FilePersistence>(
+					modifiedObjects);
+			modifiedObjects.clear();
+			objectStores.clear();
+			for (Map.Entry<IPersistable, FilePersistence> entry : objects
+					.entrySet()) {
+				try {
+					entry.getValue().saveObject(entry.getKey());
+				} catch (Throwable e) {
+					log.error("Error while saving {} in {}. {}", new Object[] {
+							entry.getKey(), entry.getValue(), e });
+				}
+			}
+		}
+	}
+
+	/**
+	 * Cleanly shutdown the tasks
+	 */
+	public void shutdown() {
+		scheduler.shutdown();
+	}
+	
 }
Index: /java/server/trunk/src/org/red5/server/net/rtmp/RTMPMinaTransport.java
===================================================================
--- /java/server/trunk/src/org/red5/server/net/rtmp/RTMPMinaTransport.java (revision 2402)
+++ /java/server/trunk/src/org/red5/server/net/rtmp/RTMPMinaTransport.java (revision 2432)
@@ -96,4 +96,6 @@
 	private IoHandlerAdapter ioHandler;
 
+	private IoServiceManager serviceManager;
+	
 	private int ioThreads = DEFAULT_IO_THREADS;
 
@@ -249,5 +251,5 @@
 		if (JMXAgent.isEnableMinaMonitor()) {
     		//add a service manager to allow for more introspection into the workings of mina
-    		IoServiceManager serviceManager = new IoServiceManager(acceptor);
+    		serviceManager = new IoServiceManager(acceptor);
     		//poll every second
     		serviceManager.startCollectingStats(jmxPollInterval);
@@ -267,4 +269,9 @@
 		JMXAgent.unregisterMBean(oName);
 		if (serviceManagerObjectName != null) {
+			//if the service manager (stats collector) is not null then clean up
+			if (serviceManager != null) {
+    			serviceManager.stopCollectingStats();
+    			serviceManager.closeAllSessions();
+			}
 			JMXAgent.unregisterMBean(serviceManagerObjectName);
 		}
Index: /java/server/trunk/src/org/red5/server/war/WarLoaderServlet.java
===================================================================
--- /java/server/trunk/src/org/red5/server/war/WarLoaderServlet.java (revision 2416)
+++ /java/server/trunk/src/org/red5/server/war/WarLoaderServlet.java (revision 2432)
@@ -37,4 +37,5 @@
 import org.red5.server.WebScope;
 import org.red5.server.jmx.JMXAgent;
+import org.red5.server.persistence.FilePersistenceThread;
 import org.red5.server.service.ServiceInvoker;
 import org.slf4j.Logger;
@@ -207,4 +208,9 @@
 				// shutdown jmx
 				JMXAgent.shutdown();
+				// shutdown the persistence thread
+				FilePersistenceThread persistenceThread = FilePersistenceThread.getInstance();
+				if (persistenceThread != null) {
+					persistenceThread.shutdown();
+				}
 				// shutdown spring
 				Object attr = ctx.getAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE);
Index: /java/server/trunk/src/org/red5/server/jmx/JMXAgent.java
===================================================================
--- /java/server/trunk/src/org/red5/server/jmx/JMXAgent.java (revision 2414)
+++ /java/server/trunk/src/org/red5/server/jmx/JMXAgent.java (revision 2432)
@@ -101,5 +101,5 @@
 						"[\\.]", "");
 			}
-			log.debug("Register name: " + cName);
+			log.debug("Register name: {}", cName);
 			mbs.registerMBean(new StandardMBean(instance, interfaceClass),
 					new ObjectName(JMXFactory.getDefaultDomain() + ":type="
@@ -107,7 +107,7 @@
 			status = true;
 		} catch (InstanceAlreadyExistsException iaee) {
-			log.debug("Already registered: " + className);
+			log.debug("Already registered: {}", className);
 		} catch (Exception e) {
-			log.error("Could not register the " + className + " MBean", e);
+			log.error("Could not register the {} MBean. {}", className, e);
 		}
 		return status;
@@ -123,5 +123,5 @@
 						"[\\.]", "");
 			}
-			log.debug("Register name: " + cName);
+			log.debug("Register name: {}", cName);
 			mbs
 					.registerMBean(new StandardMBean(instance, interfaceClass),
@@ -129,7 +129,7 @@
 			status = true;
 		} catch (InstanceAlreadyExistsException iaee) {
-			log.debug("Already registered: " + className);
+			log.debug("Already registered: {}", className);
 		} catch (Exception e) {
-			log.error("Could not register the " + className + " MBean", e);
+			log.error("Could not register the {} MBean. {}", className, e);
 		}
 		return status;
@@ -145,5 +145,5 @@
 						"[\\.]", "");
 			}
-			log.debug("Register name: " + cName);
+			log.debug("Register name: {}", cName);
 			mbs.registerMBean(new StandardMBean(instance, interfaceClass),
 					new ObjectName(JMXFactory.getDefaultDomain() + ":type="
@@ -151,7 +151,7 @@
 			status = true;
 		} catch (InstanceAlreadyExistsException iaee) {
-			log.debug("Already registered: " + className);
+			log.debug("Already registered: {}", className);
 		} catch (Exception e) {
-			log.error("Could not register the " + className + " MBean", e);
+			log.error("Could not register the {} MBean. {}", className, e);
 		}
 		return status;
@@ -168,5 +168,5 @@
 				cs.stop();
 			} catch (Exception e) {
-				log.error("Exception stopping JMXConnector server", e);
+				log.error("Exception stopping JMXConnector server {}", e);
 			}
 		}
@@ -179,5 +179,5 @@
 			for (ObjectName oname : (Set<ObjectName>) mbs.queryNames(
 					new ObjectName(domain + ":*"), null)) {
-				log.debug("Bean domain: " + oname.getDomain());
+				log.debug("Bean domain: {}", oname.getDomain());
 				if (domain.equals(oname.getDomain())) {
 					unregisterMBean(oname);
@@ -185,5 +185,5 @@
 			}
 		} catch (Exception e) {
-			log.error("Exception unregistering mbeans", e);
+			log.error("Exception unregistering mbeans {}", e);
 		}
 
@@ -208,5 +208,5 @@
 				}
 			} catch (Exception e) {
-				log.warn("Exception unregistering mbean", e);
+				log.warn("Exception unregistering mbean {}", e);
 			}
 		}


Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.



More information about the Red5commits mailing list