[Red5commits] [1585] Use only one lock for PlaylistSubscriberStream? when playing. Fix APPSERVER-7Re
sgong
luke at codegent.com
Wed Jan 24 10:59:57 EST 2007
Use only one lock for PlaylistSubscriberStream? when playing. Fix APPSERVER-7
Removed the workaround in StreamService?.play
Timestamp: 12/11/06 09:49:25 EST (1 month ago)
Change: 1585
Author: sgong
Files (see diff or trac for details):
java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java
java/server/trunk/src/org/red5/server/stream/StreamService.java
Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/1585
Index: /java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java (revision 1538)
+++ /java/server/trunk/src/org/red5/server/stream/PlaylistSubscriberStream.java (revision 1585)
@@ -110,35 +110,33 @@
}
- public void play() {
- synchronized (items) {
- if (items.size() == 0) {
- return;
- }
- if (currentItemIndex == -1) {
+ synchronized public void play() {
+ if (items.size() == 0) {
+ return;
+ }
+ if (currentItemIndex == -1) {
+ moveToNext();
+ }
+ IPlayItem item = items.get(currentItemIndex);
+ int count = items.size();
+ while (count-- > 0) {
+ try {
+ engine.play(item);
+ break;
+ } catch (StreamNotFoundException e) {
+ // go for next item
moveToNext();
- }
- IPlayItem item = items.get(currentItemIndex);
- int count = items.size();
- while (count-- > 0) {
- try {
- engine.play(item);
- break;
- } catch (StreamNotFoundException e) {
- // go for next item
- moveToNext();
- if (currentItemIndex == -1) {
- // we reaches the end.
- break;
- }
- item = items.get(currentItemIndex);
- } catch (IllegalStateException e) {
- // an stream is already playing
- break;
- }
- }
- }
- }
-
- public void pause(int position) {
+ if (currentItemIndex == -1) {
+ // we reaches the end.
+ break;
+ }
+ item = items.get(currentItemIndex);
+ } catch (IllegalStateException e) {
+ // an stream is already playing
+ break;
+ }
+ }
+ }
+
+ synchronized public void pause(int position) {
try {
engine.pause(position);
@@ -147,5 +145,5 @@
}
- public void resume(int position) {
+ synchronized public void resume(int position) {
try {
engine.resume(position);
@@ -154,5 +152,5 @@
}
- public void stop() {
+ synchronized public void stop() {
try {
engine.stop();
@@ -161,5 +159,5 @@
}
- public void seek(int position) {
+ synchronized public void seek(int position) {
try {
engine.seek(position);
@@ -168,5 +166,5 @@
}
- public void close() {
+ synchronized public void close() {
engine.close();
flowControlService.releaseFlowControllable(this);
@@ -178,128 +176,114 @@
}
- public void addItem(IPlayItem item) {
- synchronized (items) {
- items.add(item);
- }
- }
-
- public void addItem(IPlayItem item, int index) {
- synchronized (items) {
- items.add(index, item);
- }
- }
-
- public void removeItem(int index) {
- synchronized (items) {
- if (index < 0 || index >= items.size()) {
- return;
- }
- int originSize = items.size();
- items.remove(index);
- if (currentItemIndex == index) {
- // set the next item.
- if (index == originSize - 1) {
- currentItemIndex = index - 1;
- }
- }
- }
- }
-
- public void removeAllItems() {
- synchronized (items) {
- // we try to stop the engine first
- stop();
- items.clear();
- }
- }
-
- public void previousItem() {
- synchronized (items) {
- stop();
- moveToPrevious();
- if (currentItemIndex == -1) {
- return;
- }
- IPlayItem item = items.get(currentItemIndex);
- int count = items.size();
- while (count-- > 0) {
- try {
- engine.play(item);
- break;
- } catch (StreamNotFoundException e) {
- // go for next item
- moveToPrevious();
- if (currentItemIndex == -1) {
- // we reaches the end.
- break;
- }
- item = items.get(currentItemIndex);
- } catch (IllegalStateException e) {
- // an stream is already playing
- break;
- }
- }
- }
- }
-
- public void nextItem() {
- synchronized (items) {
- stop();
- moveToNext();
- boolean needPause = false;
- if (currentItemIndex == -1) {
- if (items.size() > 0) {
- // move to the head of the list and pause at the beginning
- moveToNext();
- if (currentItemIndex >= 0) {
- needPause = true;
- } else {
- return;
- }
+ synchronized public void addItem(IPlayItem item) {
+ items.add(item);
+ }
+
+ synchronized public void addItem(IPlayItem item, int index) {
+ items.add(index, item);
+ }
+
+ synchronized public void removeItem(int index) {
+ if (index < 0 || index >= items.size()) {
+ return;
+ }
+ int originSize = items.size();
+ items.remove(index);
+ if (currentItemIndex == index) {
+ // set the next item.
+ if (index == originSize - 1) {
+ currentItemIndex = index - 1;
+ }
+ }
+ }
+
+ synchronized public void removeAllItems() {
+ // we try to stop the engine first
+ stop();
+ items.clear();
+ }
+
+ synchronized public void previousItem() {
+ stop();
+ moveToPrevious();
+ if (currentItemIndex == -1) {
+ return;
+ }
+ IPlayItem item = items.get(currentItemIndex);
+ int count = items.size();
+ while (count-- > 0) {
+ try {
+ engine.play(item);
+ break;
+ } catch (StreamNotFoundException e) {
+ // go for next item
+ moveToPrevious();
+ if (currentItemIndex == -1) {
+ // we reaches the end.
+ break;
+ }
+ item = items.get(currentItemIndex);
+ } catch (IllegalStateException e) {
+ // an stream is already playing
+ break;
+ }
+ }
+ }
+
+ synchronized public void nextItem() {
+ stop();
+ moveToNext();
+ boolean needPause = false;
+ if (currentItemIndex == -1) {
+ if (items.size() > 0) {
+ // move to the head of the list and pause at the beginning
+ moveToNext();
+ if (currentItemIndex >= 0) {
+ needPause = true;
} else {
return;
}
- }
- IPlayItem item = items.get(currentItemIndex);
- int count = items.size();
- while (count-- > 0) {
- try {
- engine.play(item);
- if (needPause) {
- engine.pause(0);
- }
- break;
- } catch (StreamNotFoundException e) {
- // go for next item
- moveToNext();
- if (currentItemIndex == -1) {
- // we reaches the end.
- break;
- }
- item = items.get(currentItemIndex);
- } catch (IllegalStateException e) {
- // an stream is already playing
- break;
- }
- }
- }
- }
-
- public void setItem(int index) {
- synchronized (items) {
- if (index < 0 || index >= items.size()) {
+ } else {
return;
}
- stop();
- currentItemIndex = index;
- IPlayItem item = items.get(currentItemIndex);
+ }
+ IPlayItem item = items.get(currentItemIndex);
+ int count = items.size();
+ while (count-- > 0) {
try {
engine.play(item);
+ if (needPause) {
+ engine.pause(0);
+ }
+ break;
} catch (StreamNotFoundException e) {
- // let the engine retain the STOPPED state
- // and wait for control from outside
+ // go for next item
+ moveToNext();
+ if (currentItemIndex == -1) {
+ // we reaches the end.
+ break;
+ }
+ item = items.get(currentItemIndex);
} catch (IllegalStateException e) {
-
- }
+ // an stream is already playing
+ break;
+ }
+ }
+ }
+
+ synchronized public void setItem(int index) {
+ if (index < 0 || index >= items.size()) {
+ return;
+ }
+ stop();
+ currentItemIndex = index;
+ IPlayItem item = items.get(currentItemIndex);
+ try {
+ engine.play(item);
+ } catch (StreamNotFoundException e) {
+ // let the engine retain the STOPPED state
+ // and wait for control from outside
+ } catch (IllegalStateException e) {
+
}
}
@@ -368,5 +352,5 @@
* @param message
*/
- public void written(Object message) {
+ synchronized public void written(Object message) {
engine.pullAndPush();
}
@@ -525,5 +509,5 @@
}
- synchronized public void start() {
+ public void start() {
if (state != State.UNINIT) {
throw new IllegalStateException();
@@ -543,5 +527,5 @@
}
- synchronized public void play(IPlayItem item)
+ public void play(IPlayItem item)
throws StreamNotFoundException, IllegalStateException {
if (state != State.STOPPED) {
@@ -650,7 +634,9 @@
public void execute(
ISchedulingService service) {
- waitLiveJob = null;
- isWaiting = false;
- onItemEnd();
+ synchronized (PlaylistSubscriberStream.this) {
+ waitLiveJob = null;
+ isWaiting = false;
+ onItemEnd();
+ }
}
});
@@ -684,5 +670,5 @@
}
- synchronized public void pause(int position)
+ public void pause(int position)
throws IllegalStateException {
if (state != State.PLAYING) {
@@ -700,5 +686,5 @@
}
- synchronized public void resume(int position)
+ public void resume(int position)
throws IllegalStateException {
if (state != State.PAUSED) {
@@ -726,5 +712,5 @@
}
- synchronized public void seek(int position)
+ public void seek(int position)
throws IllegalStateException {
if (state != State.PLAYING && state != State.PAUSED) {
@@ -779,5 +765,5 @@
}
- synchronized public void stop() throws IllegalStateException {
+ public void stop() throws IllegalStateException {
if (state != State.PLAYING && state != State.PAUSED) {
@@ -800,5 +786,5 @@
}
- synchronized public void close() {
+ public void close() {
if (state == State.PLAYING || state == State.PAUSED) {
@@ -816,5 +802,5 @@
}
- synchronized private void pullAndPush() {
+ private void pullAndPush() {
if (state == State.PLAYING && isPullMode && !isWaitingForToken) {
int size;
@@ -871,8 +857,10 @@
public void execute(
ISchedulingService service) {
- // OMFG: it works god dammit! now we stop it.
- stop();
- onItemEnd();
- log.info("Stop");
+ synchronized (PlaylistSubscriberStream.this) {
+ // OMFG: it works god dammit! now we stop it.
+ stop();
+ onItemEnd();
+ log.info("Stop");
+ }
}
});
@@ -1163,5 +1151,5 @@
}
- synchronized public void pushMessage(IPipe pipe, IMessage message) {
+ public void pushMessage(IPipe pipe, IMessage message) {
if (message instanceof ResetMessage) {
sendReset();
@@ -1213,14 +1201,16 @@
}
- synchronized public void execute(ISchedulingService service) {
- if (playLengthJob == null) {
- return;
- }
- playLengthJob = null;
- stop();
- onItemEnd();
- }
-
- synchronized public void available(ITokenBucket bucket,
+ public void execute(ISchedulingService service) {
+ synchronized (PlaylistSubscriberStream.this) {
+ if (playLengthJob == null) {
+ return;
+ }
+ playLengthJob = null;
+ stop();
+ onItemEnd();
+ }
+ }
+
+ public void available(ITokenBucket bucket,
double tokenCount) {
isWaitingForToken = false;
Index: /java/server/trunk/src/org/red5/server/stream/StreamService.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/StreamService.java (revision 1570)
+++ /java/server/trunk/src/org/red5/server/stream/StreamService.java (revision 1585)
@@ -141,8 +141,4 @@
public void play(String name, int start, int length, boolean flushPlaylist) {
- if (length == 0)
- // Workaround for APPSERVER-7: ignore play requests with zero length
- return;
-
IConnection conn = Red5.getConnectionLocal();
if (!(conn instanceof IStreamCapableConnection)) {
Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.
More information about the Red5commits
mailing list