[Red5commits] [1585] Use only one lock for PlaylistSubscriberStream? when playing. Fix APPSERVER-7Re

sgong luke at codegent.com
Wed Jan 24 10:59:57 EST 2007


Use only one lock for PlaylistSubscriberStream? when playing. Fix APPSERVER-7
Removed the workaround in StreamService?.play


Timestamp: 12/11/06 09:49:25 EST (1 month ago) 
Change: 1585 
Author: sgong

Files (see diff or trac for details): 
java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java
java/server/trunk/src/org/red5/server/stream/StreamService.java


Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/1585

Index: /java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java (revision 1538)
+++ /java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java (revision 1585)
@@ -110,35 +110,33 @@
 	}
 
-	public void play() {
-		synchronized (items) {
-			if (items.size() == 0) {
-				return;
-			}
-			if (currentItemIndex == -1) {
+	synchronized public void play() {
+		if (items.size() == 0) {
+			return;
+		}
+		if (currentItemIndex == -1) {
+			moveToNext();
+		}
+		IPlayItem item = items.get(currentItemIndex);
+		int count = items.size();
+		while (count-- > 0) {
+			try {
+				engine.play(item);
+				break;
+			} catch (StreamNotFoundException e) {
+				// go for next item
 				moveToNext();
-			}
-			IPlayItem item = items.get(currentItemIndex);
-			int count = items.size();
-			while (count-- > 0) {
-				try {
-					engine.play(item);
-					break;
-				} catch (StreamNotFoundException e) {
-					// go for next item
-					moveToNext();
-					if (currentItemIndex == -1) {
-						// we reaches the end.
-						break;
-					}
-					item = items.get(currentItemIndex);
-				} catch (IllegalStateException e) {
-					// an stream is already playing
-					break;
-				}
-			}
-		}
-	}
-
-	public void pause(int position) {
+				if (currentItemIndex == -1) {
+					// we reaches the end.
+					break;
+				}
+				item = items.get(currentItemIndex);
+			} catch (IllegalStateException e) {
+				// an stream is already playing
+				break;
+			}
+		}
+	}
+
+	synchronized public void pause(int position) {
 		try {
 			engine.pause(position);
@@ -147,5 +145,5 @@
 	}
 
-	public void resume(int position) {
+	synchronized public void resume(int position) {
 		try {
 			engine.resume(position);
@@ -154,5 +152,5 @@
 	}
 
-	public void stop() {
+	synchronized public void stop() {
 		try {
 			engine.stop();
@@ -161,5 +159,5 @@
 	}
 
-	public void seek(int position) {
+	synchronized public void seek(int position) {
 		try {
 			engine.seek(position);
@@ -168,5 +166,5 @@
 	}
 
-	public void close() {
+	synchronized public void close() {
 		engine.close();
 		flowControlService.releaseFlowControllable(this);
@@ -178,128 +176,114 @@
 	}
 
-	public void addItem(IPlayItem item) {
-		synchronized (items) {
-			items.add(item);
-		}
-	}
-
-	public void addItem(IPlayItem item, int index) {
-		synchronized (items) {
-			items.add(index, item);
-		}
-	}
-
-	public void removeItem(int index) {
-		synchronized (items) {
-			if (index < 0 || index >= items.size()) {
-				return;
-			}
-			int originSize = items.size();
-			items.remove(index);
-			if (currentItemIndex == index) {
-				// set the next item.
-				if (index == originSize - 1) {
-					currentItemIndex = index - 1;
-				}
-			}
-		}
-	}
-
-	public void removeAllItems() {
-		synchronized (items) {
-			// we try to stop the engine first
-			stop();
-			items.clear();
-		}
-	}
-
-	public void previousItem() {
-		synchronized (items) {
-			stop();
-			moveToPrevious();
-			if (currentItemIndex == -1) {
-				return;
-			}
-			IPlayItem item = items.get(currentItemIndex);
-			int count = items.size();
-			while (count-- > 0) {
-				try {
-					engine.play(item);
-					break;
-				} catch (StreamNotFoundException e) {
-					// go for next item
-					moveToPrevious();
-					if (currentItemIndex == -1) {
-						// we reaches the end.
-						break;
-					}
-					item = items.get(currentItemIndex);
-				} catch (IllegalStateException e) {
-					// an stream is already playing
-					break;
-				}
-			}
-		}
-	}
-
-	public void nextItem() {
-		synchronized (items) {
-			stop();
-			moveToNext();
-			boolean needPause = false;
-			if (currentItemIndex == -1) {
-				if (items.size() > 0) {
-					// move to the head of the list and pause at the beginning
-					moveToNext();
-					if (currentItemIndex >= 0) {
-						needPause = true;
-					} else {
-						return;
-					}
+	synchronized public void addItem(IPlayItem item) {
+		items.add(item);
+	}
+
+	synchronized public void addItem(IPlayItem item, int index) {
+		items.add(index, item);
+	}
+
+	synchronized public void removeItem(int index) {
+		if (index < 0 || index >= items.size()) {
+			return;
+		}
+		int originSize = items.size();
+		items.remove(index);
+		if (currentItemIndex == index) {
+			// set the next item.
+			if (index == originSize - 1) {
+				currentItemIndex = index - 1;
+			}
+		}
+	}
+
+	synchronized public void removeAllItems() {
+		// we try to stop the engine first
+		stop();
+		items.clear();
+	}
+
+	synchronized public void previousItem() {
+		stop();
+		moveToPrevious();
+		if (currentItemIndex == -1) {
+			return;
+		}
+		IPlayItem item = items.get(currentItemIndex);
+		int count = items.size();
+		while (count-- > 0) {
+			try {
+				engine.play(item);
+				break;
+			} catch (StreamNotFoundException e) {
+				// go for next item
+				moveToPrevious();
+				if (currentItemIndex == -1) {
+					// we reaches the end.
+					break;
+				}
+				item = items.get(currentItemIndex);
+			} catch (IllegalStateException e) {
+				// an stream is already playing
+				break;
+			}
+		}
+	}
+
+	synchronized public void nextItem() {
+		stop();
+		moveToNext();
+		boolean needPause = false;
+		if (currentItemIndex == -1) {
+			if (items.size() > 0) {
+				// move to the head of the list and pause at the beginning
+				moveToNext();
+				if (currentItemIndex >= 0) {
+					needPause = true;
 				} else {
 					return;
 				}
-			}
-			IPlayItem item = items.get(currentItemIndex);
-			int count = items.size();
-			while (count-- > 0) {
-				try {
-					engine.play(item);
-					if (needPause) {
-						engine.pause(0);
-					}
-					break;
-				} catch (StreamNotFoundException e) {
-					// go for next item
-					moveToNext();
-					if (currentItemIndex == -1) {
-						// we reaches the end.
-						break;
-					}
-					item = items.get(currentItemIndex);
-				} catch (IllegalStateException e) {
-					// an stream is already playing
-					break;
-				}
-			}
-		}
-	}
-
-	public void setItem(int index) {
-		synchronized (items) {
-			if (index < 0 || index >= items.size()) {
+			} else {
 				return;
 			}
-			stop();
-			currentItemIndex = index;
-			IPlayItem item = items.get(currentItemIndex);
+		}
+		IPlayItem item = items.get(currentItemIndex);
+		int count = items.size();
+		while (count-- > 0) {
 			try {
 				engine.play(item);
+				if (needPause) {
+					engine.pause(0);
+				}
+				break;
 			} catch (StreamNotFoundException e) {
-				// let the engine retain the STOPPED state
-				// and wait for control from outside
+				// go for next item
+				moveToNext();
+				if (currentItemIndex == -1) {
+					// we reaches the end.
+					break;
+				}
+				item = items.get(currentItemIndex);
 			} catch (IllegalStateException e) {
-
-			}
+				// an stream is already playing
+				break;
+			}
+		}
+	}
+
+	synchronized public void setItem(int index) {
+		if (index < 0 || index >= items.size()) {
+			return;
+		}
+		stop();
+		currentItemIndex = index;
+		IPlayItem item = items.get(currentItemIndex);
+		try {
+			engine.play(item);
+		} catch (StreamNotFoundException e) {
+			// let the engine retain the STOPPED state
+			// and wait for control from outside
+		} catch (IllegalStateException e) {
+
 		}
 	}
@@ -368,5 +352,5 @@
 	 * @param message
 	 */
-	public void written(Object message) {
+	synchronized public void written(Object message) {
 		engine.pullAndPush();
 	}
@@ -525,5 +509,5 @@
 		}
 
-		synchronized public void start() {
+		public void start() {
 			if (state != State.UNINIT) {
 				throw new IllegalStateException();
@@ -543,5 +527,5 @@
 		}
 
-		synchronized public void play(IPlayItem item)
+		public void play(IPlayItem item)
 				throws StreamNotFoundException, IllegalStateException {
 			if (state != State.STOPPED) {
@@ -650,7 +634,9 @@
 									public void execute(
 											ISchedulingService service) {
-										waitLiveJob = null;
-										isWaiting = false;
-										onItemEnd();
+										synchronized (PlaylistSubscriberStream.this) {
+											waitLiveJob = null;
+											isWaiting = false;
+											onItemEnd();
+										}
 									}
 								});
@@ -684,5 +670,5 @@
 		}
 
-		synchronized public void pause(int position)
+		public void pause(int position)
 				throws IllegalStateException {
 			if (state != State.PLAYING) {
@@ -700,5 +686,5 @@
 		}
 
-		synchronized public void resume(int position)
+		public void resume(int position)
 				throws IllegalStateException {
 			if (state != State.PAUSED) {
@@ -726,5 +712,5 @@
 		}
 
-		synchronized public void seek(int position)
+		public void seek(int position)
 				throws IllegalStateException {
 			if (state != State.PLAYING && state != State.PAUSED) {
@@ -779,5 +765,5 @@
 		}
 
-		synchronized public void stop() throws IllegalStateException {
+		public void stop() throws IllegalStateException {
 
 			if (state != State.PLAYING && state != State.PAUSED) {
@@ -800,5 +786,5 @@
 		}
 
-		synchronized public void close() {
+		public void close() {
 
 			if (state == State.PLAYING || state == State.PAUSED) {
@@ -816,5 +802,5 @@
 		}
 
-		synchronized private void pullAndPush() {
+		private void pullAndPush() {
 			if (state == State.PLAYING && isPullMode && !isWaitingForToken) {
 				int size;
@@ -871,8 +857,10 @@
 													public void execute(
 															ISchedulingService service) {
-														// OMFG: it works god dammit! now we stop it.
-														stop();
-														onItemEnd();
-														log.info("Stop");
+														synchronized (PlaylistSubscriberStream.this) {
+															// OMFG: it works god dammit! now we stop it.
+															stop();
+															onItemEnd();
+															log.info("Stop");
+														}
 													}
 												});
@@ -1163,5 +1151,5 @@
 		}
 
-		synchronized public void pushMessage(IPipe pipe, IMessage message) {
+		public void pushMessage(IPipe pipe, IMessage message) {
 			if (message instanceof ResetMessage) {
 				sendReset();
@@ -1213,14 +1201,16 @@
 		}
 
-		synchronized public void execute(ISchedulingService service) {
-			if (playLengthJob == null) {
-				return;
-			}
-			playLengthJob = null;
-			stop();
-			onItemEnd();
-		}
-
-		synchronized public void available(ITokenBucket bucket,
+		public void execute(ISchedulingService service) {
+			synchronized (PlaylistSubscriberStream.this) {
+				if (playLengthJob == null) {
+					return;
+				}
+				playLengthJob = null;
+				stop();
+				onItemEnd();
+			}
+		}
+
+		public void available(ITokenBucket bucket,
 				double tokenCount) {
 			isWaitingForToken = false;
Index: /java/server/trunk/src/org/red5/server/stream/StreamService.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/StreamService.java (revision 1570)
+++ /java/server/trunk/src/org/red5/server/stream/StreamService.java (revision 1585)
@@ -141,8 +141,4 @@
 
 	public void play(String name, int start, int length, boolean flushPlaylist) {
-		if (length == 0)
-			// Workaround for APPSERVER-7: ignore play requests with zero length
-			return;
-		
 		IConnection conn = Red5.getConnectionLocal();
 		if (!(conn instanceof IStreamCapableConnection)) {


Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.



More information about the Red5commits mailing list