[Red5commits] [1578] Merged the code from APPSERVER-8 and some refactoring for FLVReader and related
sgong
luke at codegent.com
Wed Jan 24 10:58:59 EST 2007
Merged the code from APPSERVER-8 and some refactoring for FLVReader and related code.
(1) FLVReader now uses a prefetching buffer for file reading and the file won't be mapped to
memory at the beginning but read as needed. The prefetching buffer is mainly used for reducing
the disk IO overhead and the size of buffer can be configured in red5-common.xml as a static
property of FLVReader "bufferSize".
(2) Cache for FLVReader in FLV is temporarily disabled by returning null on FLVReader.getFileData().
We need to redesign the cache architecture to put cache logic beneath FLVReader so that both
tag caching and file caching are feasible.
Timestamp: 11/27/06 10:03:57 EST (2 months ago)
Change: 1578
Author: sgong
Files (see diff or trac for details):
java/server/trunk/conf/red5-common.xml
java/server/trunk/src/org/red5/io/flv/IKeyFrameDataAnalyzer.java
java/server/trunk/src/org/red5/io/flv/impl/FLV.java
java/server/trunk/src/org/red5/io/flv/impl/FLVReader.java
java/server/trunk/src/org/red5/io/mp3/impl/MP3Reader.java
java/server/trunk/src/org/red5/server/stream/provider/FileProvider.java
Trac: http://mirror1.cvsdude.com/trac/osflash/red5/changeset/1578
Index: /java/server/trunk/conf/red5-common.xml
===================================================================
--- /java/server/trunk/conf/red5-common.xml (revision 1557)
+++ /java/server/trunk/conf/red5-common.xml (revision 1578)
@@ -146,8 +146,8 @@
</property>
<property name="targetMethod">
- <value>setMaxBufferSize</value>
+ <value>setBufferSize</value>
</property>
<!-- Three buffer types are available 'auto', 'heap', and 'direct' -->
- <property name="arguments" value="10000000" />
+ <property name="arguments" value="4096" />
</bean>
Index: /java/server/trunk/src/org/red5/server/stream/provider/FileProvider.java
===================================================================
--- /java/server/trunk/src/org/red5/server/stream/provider/FileProvider.java (revision 1538)
+++ /java/server/trunk/src/org/red5/server/stream/provider/FileProvider.java (revision 1578)
@@ -80,5 +80,5 @@
}
- public IMessage pullMessage(IPipe pipe) {
+ synchronized public IMessage pullMessage(IPipe pipe) {
if (this.pipe != pipe) {
return null;
@@ -186,5 +186,5 @@
}
- private void uninit() {
+ synchronized private void uninit() {
if (this.reader != null) {
this.reader.close();
Index: /java/server/trunk/src/org/red5/io/flv/impl/FLV.java
===================================================================
--- /java/server/trunk/src/org/red5/io/flv/impl/FLV.java (revision 1556)
+++ /java/server/trunk/src/org/red5/io/flv/impl/FLV.java (revision 1578)
@@ -174,5 +174,5 @@
fileData = reader.getFileData();
// offer the uncached file to the cache
- if (cache.offer(fileName, fileData)) {
+ if (fileData != null && cache.offer(fileName, fileData)) {
if (log.isDebugEnabled()) {
log.debug("Item accepted by the cache: " + fileName);
Index: /java/server/trunk/src/org/red5/io/flv/impl/FLVReader.java
===================================================================
--- /java/server/trunk/src/org/red5/io/flv/impl/FLVReader.java (revision 1556)
+++ /java/server/trunk/src/org/red5/io/flv/impl/FLVReader.java (revision 1578)
@@ -22,6 +22,4 @@
import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.ByteOrder;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
@@ -42,5 +40,7 @@
/**
- * A Reader is used to read the contents of a FLV file
+ * A Reader is used to read the contents of a FLV file.
+ * NOTE: This class is not implemented as threading-safe. The caller
+ * should make sure the threading-safety.
*
* @author The Red5 Project (red5 at osflash.org)
@@ -58,6 +58,4 @@
private FileChannel channel;
- private MappedByteBuffer mappedFile;
-
private KeyFrameMeta keyframeMeta;
@@ -68,8 +66,8 @@
/** Position of first video tag. */
- private int firstVideoTag = -1;
+ private long firstVideoTag = -1;
/** Position of first audio tag. */
- private int firstAudioTag = -1;
+ private long firstAudioTag = -1;
/** Current tag. */
@@ -86,81 +84,194 @@
/** Buffer type / style to use **/
- private static String bufferType = "auto"; //Default
+ private static BufferType bufferType = BufferType.AUTO;
+
+ private static int bufferSize = 1024;
- private static int maxBufferSize = 0;
- private static volatile int bufferUsed = 0;
-
- private boolean useBuffer = false;
-
- FLVReader() {}
-
+ /** Use load buffer */
+ private boolean useLoadBuf = false;
+
+ FLVReader() {
+ }
+
public FLVReader(FileInputStream f) {
this(f, false);
}
+ /**
+ * Get the remaining bytes that could be read from a file or ByteBuffer
+ * @return
+ */
+ private long getRemainingBytes() {
+ if (!useLoadBuf) {
+ return in.remaining();
+ }
+
+ try {
+ return channel.size() - channel.position() + in.remaining();
+ } catch (Exception e) {
+ log.error("Error getRemainingBytes", e);
+ return 0;
+ }
+ }
+
+ /**
+ * Get the total readable bytes in a file or ByteBuffer
+ * @return
+ */
+ private long getTotalBytes() {
+ if (!useLoadBuf) {
+ return in.capacity();
+ }
+
+ try {
+ return channel.size();
+ } catch (Exception e) {
+ log.error("Error getTotalBytes", e);
+ return 0;
+ }
+ }
+
+ /**
+ * Get the current position in a file or ByteBuffer
+ * @return
+ */
+ private long getCurrentPosition() {
+ long pos = 0;
+
+ if (!useLoadBuf) {
+ return in.position();
+ }
+
+ try {
+ if (in != null) {
+ pos = (channel.position() - in.remaining());
+ } else {
+ pos = channel.position();
+ }
+ return pos;
+ } catch (Exception e) {
+ log.error("Error getCurrentPosition", e);
+ return 0;
+ }
+ }
+
+ private void setCurrentPosition(long pos) {
+ if (!useLoadBuf) {
+ in.position((int) pos);
+ return;
+ }
+
+ try {
+ if (pos >= (channel.position() - in.limit())
+ && pos < channel.position()) {
+ in.position((int) (pos - (channel.position() - in.limit())));
+ } else {
+ channel.position(pos);
+ fillBuffer(bufferSize, true);
+ }
+ } catch (Exception e) {
+ log.error("Error setCurrentPosition", e);
+ }
+
+ }
+
+ private void fillBuffer() {
+ fillBuffer(bufferSize, false);
+ }
+
+ /**
+ *
+ * @param amount
+ */
+ private void fillBuffer(long amount) {
+ fillBuffer(amount, false);
+ }
+
+ /**
+ * Load enough bytes from channel to buffer.
+ * After the loading process, the caller can make sure the amount
+ * in buffer is of size 'amount' if we haven't reached the end of channel.
+ * @param amount The amount of bytes in buffer after returning,
+ * no larger than bufferSize
+ * @param reload Whether to reload or append.
+ */
+ private void fillBuffer(long amount, boolean reload) {
+ try {
+ if (amount > bufferSize) {
+ amount = bufferSize;
+ }
+ // Read all remaining bytes if the requested amount reach the end
+ // of channel.
+ if (channel.size() - channel.position() < amount) {
+ amount = channel.size() - channel.position();
+ }
+
+ if (in == null) {
+ switch (bufferType) {
+ case HEAP:
+ in = ByteBuffer.allocate(bufferSize, false);
+ break;
+ case DIRECT:
+ in = ByteBuffer.allocate(bufferSize, true);
+ break;
+ case AUTO:
+ in = ByteBuffer.allocate(bufferSize);
+ break;
+ default:
+ in = ByteBuffer.allocate(bufferSize);
+ }
+ channel.read(in.buf());
+ in.flip();
+ useLoadBuf = true;
+ }
+
+ if (!useLoadBuf) {
+ return;
+ }
+
+ if (reload || in.remaining() < amount) {
+ long toRead = amount;
+ if (!reload) {
+ toRead = (bufferSize - in.remaining());
+ in.compact();
+ } else {
+ in.clear();
+ }
+
+ java.nio.ByteBuffer tmpbuf = java.nio.ByteBuffer
+ .allocate((int) toRead);
+ channel.read(tmpbuf);
+ tmpbuf.flip();
+ in.put(tmpbuf);
+ in.flip();
+ tmpbuf = null;
+ }
+
+ } catch (Exception e) {
+ log.error("Error fillBuffer", e);
+ }
+ }
+
+ private void postInitialize() {
+ if (log.isDebugEnabled()) {
+ log.debug("FLVReader 1 - Buffer size: " + getTotalBytes()
+ + " position: " + getCurrentPosition() + " remaining: "
+ + getRemainingBytes());
+ }
+ if (getRemainingBytes() >= 9) {
+ decodeHeader();
+ }
+ keyframeMeta = analyzeKeyFrames();
+ }
+
public FLVReader(FileInputStream f, boolean generateMetadata) {
this.fis = f;
this.generateMetadata = generateMetadata;
channel = fis.getChannel();
- try {
- mappedFile = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel
- .size());
- mappedFile.order(ByteOrder.BIG_ENDIAN);
- if (log.isDebugEnabled()) {
- log.debug("Mapped file capacity: " + mappedFile.capacity() + " Channel size: " + channel.size());
- }
- int bufferTypeHash = bufferType.hashCode();
- if (bufferTypeHash == 3198444 ||
- bufferTypeHash == -1331586071 ||
- bufferTypeHash == 3005871) {
- synchronized (FLVReader.class) {
- if (bufferUsed + mappedFile.capacity() <= maxBufferSize) {
- bufferUsed += mappedFile.capacity();
- useBuffer = true;
- }
- }
- }
- in = null;
- if (useBuffer) {
- switch (bufferTypeHash) {
- case 3198444: //heap
- //Get a heap buffer from buffer pool
- in = ByteBuffer.allocate(mappedFile.capacity(), false);
- break;
- case -1331586071: //direct
- //Get a direct buffer from buffer pool
- in = ByteBuffer.allocate(mappedFile.capacity(), true);
- break;
- case 3005871: //auto
- //Let MINA choose
- in = ByteBuffer.allocate(mappedFile.capacity());
- break;
- default:
- break;
- }
- }
- if (in != null) {
- //drop in the file
- in.put(mappedFile);
- in.flip();
- } else {
- in = ByteBuffer.wrap(mappedFile);
- }
- if (log.isDebugEnabled()) {
- log.debug("Direct buffer: " + in.isDirect() + " Read only: " + in.isReadOnly() + " Pooled: " + in.isPooled());
- }
- } catch (IOException e) {
- log.error("FLVReader :: FLVReader ::>\n", e);
- } catch (Exception e) {
- log.error("FLVReader :: FLVReader ::>\n", e);
- }
- if (log.isDebugEnabled()) {
- log.debug("FLVReader 1 - Buffer size: " + in.capacity() + " position: "
- + in.position() + " remaining: " + in.remaining());
- }
- if (in.remaining() >= 9) {
- decodeHeader();
- }
- keyframeMeta = analyzeKeyFrames();
+
+ in = null;
+ fillBuffer();
+
+ postInitialize();
}
@@ -174,28 +285,51 @@
this.generateMetadata = generateMetadata;
in = buffer;
- if (log.isDebugEnabled()) {
- log.debug("FLVReader 2 - Buffer size: " + in.capacity() + " position: "
- + in.position() + " remaining: " + in.remaining());
- }
- if (in.remaining() >= 9) {
- decodeHeader();
- }
- keyframeMeta = analyzeKeyFrames();
+
+ postInitialize();
}
public static String getBufferType() {
- return bufferType;
+ switch (bufferType) {
+ case AUTO:
+ return "auto";
+ case DIRECT:
+ return "direct";
+ case HEAP:
+ return "heap";
+ default:
+ return null;
+ }
}
public static void setBufferType(String bufferType) {
- FLVReader.bufferType = bufferType;
- }
-
- public static int getMaxBufferSize() {
- return maxBufferSize;
- }
-
- public static void setMaxBufferSize(int maxBufferSize) {
- FLVReader.maxBufferSize = maxBufferSize;
+ int bufferTypeHash = bufferType.hashCode();
+ switch (bufferTypeHash) {
+ case 3198444: //heap
+ //Get a heap buffer from buffer pool
+ FLVReader.bufferType = BufferType.HEAP;
+ break;
+ case -1331586071: //direct
+ //Get a direct buffer from buffer pool
+ FLVReader.bufferType = BufferType.DIRECT;
+ break;
+ case 3005871: //auto
+ //Let MINA choose
+ FLVReader.bufferType = BufferType.AUTO;
+ break;
+ default:
+ FLVReader.bufferType = BufferType.AUTO;
+ }
+ }
+
+ public static int getBufferSize() {
+ return bufferSize;
+ }
+
+ public static void setBufferSize(int bufferSize) {
+ // make sure buffer size is no less than 1024 bytes.
+ if (bufferSize < 1024) {
+ bufferSize = 1024;
+ }
+ FLVReader.bufferSize = bufferSize;
}
@@ -206,5 +340,9 @@
*/
public ByteBuffer getFileData() {
- return in.asReadOnlyBuffer();
+ // TODO as of now, return null will disable cache
+ // we need to redesign the cache architecture so that
+ // the cache is layed underneath FLVReader not above it,
+ // thus both tag cache and file cache are feasible.
+ return null;
}
@@ -212,4 +350,5 @@
// XXX check signature?
// SIGNATURE, lets just skip
+ fillBuffer(9);
FLVHeader header = new FLVHeader();
in.skip(3);
@@ -238,5 +377,5 @@
*/
public int getOffset() {
- //return header.getDataOffset();
+ // XXX what's the difference from getBytesRead
return 0;
}
@@ -247,9 +386,11 @@
* @see org.red5.io.flv.Reader#getBytesRead()
*/
- synchronized public long getBytesRead() {
- return in.position();
- }
-
- synchronized public long getDuration() {
+ public long getBytesRead() {
+ // XXX should summarize the total bytes read or
+ // just the current position?
+ return getCurrentPosition();
+ }
+
+ public long getDuration() {
return duration;
}
@@ -260,6 +401,6 @@
* @see org.red5.io.flv.Reader#hasMoreTags()
*/
- synchronized public boolean hasMoreTags() {
- return in.remaining() > 4;
+ public boolean hasMoreTags() {
+ return getRemainingBytes() > 4;
}
@@ -274,20 +415,22 @@
out.writeNumber(duration / 1000.0);
if (firstVideoTag != -1) {
- int old = in.position();
- in.position(firstVideoTag);
+ long old = getCurrentPosition();
+ setCurrentPosition(firstVideoTag);
readTagHeader();
+ fillBuffer(1);
byte frametype = in.get();
out.writePropertyName("videocodecid");
out.writeNumber(frametype & MASK_VIDEO_CODEC);
- in.position(old);
+ setCurrentPosition(old);
}
if (firstAudioTag != -1) {
- int old = in.position();
- in.position(firstAudioTag);
+ long old = getCurrentPosition();
+ setCurrentPosition(firstAudioTag);
readTagHeader();
+ fillBuffer(1);
byte frametype = in.get();
out.writePropertyName("audiocodecid");
out.writeNumber((frametype & MASK_SOUND_FORMAT) >> 4);
- in.position(old);
+ setCurrentPosition(old);
}
out.writePropertyName("canSeekToEnd");
@@ -308,5 +451,5 @@
*/
synchronized public ITag readTag() {
- int oldPos = in.position();
+ long oldPos = getCurrentPosition();
ITag tag = readTagHeader();
@@ -314,5 +457,5 @@
&& generateMetadata) {
// Generate initial metadata automatically
- in.position(oldPos);
+ setCurrentPosition(oldPos);
KeyFrameMeta meta = analyzeKeyFrames();
tagPosition++;
@@ -323,16 +466,26 @@
ByteBuffer body = ByteBuffer.allocate(tag.getBodySize());
- final int limit = in.limit();
+
// XXX Paul: this assists in 'properly' handling damaged FLV files
- int newPosition = in.position() + tag.getBodySize();
- if (newPosition <= limit) {
- in.limit(newPosition);
- body.put(in);
+ long newPosition = getCurrentPosition() + tag.getBodySize();
+ if (newPosition <= getTotalBytes()) {
+ int limit;
+ while (getCurrentPosition() < newPosition) {
+ fillBuffer(newPosition - getCurrentPosition());
+ if (getCurrentPosition() + in.remaining() > newPosition) {
+ limit = in.limit();
+ in.limit((int) (newPosition - getCurrentPosition()) + in.position());
+ body.put(in);
+ in.limit(limit);
+ } else {
+ body.put(in);
+ }
+ }
+
body.flip();
- in.limit(limit);
-
tag.setBody(body);
tagPosition++;
}
+
return tag;
}
@@ -343,25 +496,9 @@
* @see org.red5.io.flv.Reader#close()
*/
- synchronized public void close() {
+ public void close() {
log.debug("Reader close");
if (in != null) {
- if (useBuffer) {
- int bufferTypeHash = bufferType.hashCode();
- if (bufferTypeHash == 3198444 ||
- bufferTypeHash == -1331586071 ||
- bufferTypeHash == 3005871) {
- synchronized (FLVReader.class) {
- if (mappedFile != null) {
- bufferUsed -= mappedFile.capacity();
- }
- }
- }
- }
in.release();
in = null;
- }
- if (mappedFile != null) {
- mappedFile.clear();
- mappedFile = null;
}
if (channel != null) {
@@ -375,4 +512,8 @@
}
+ /**
+ * Key frames analysis may be used as a utility method so
+ * synchronize it.
+ */
synchronized public KeyFrameMeta analyzeKeyFrames() {
if (keyframeMeta != null) {
@@ -380,13 +521,13 @@
}
- List<Integer> positionList = new ArrayList<Integer>();
+ List<Long> positionList = new ArrayList<Long>();
List<Integer> timestampList = new ArrayList<Integer>();
- int origPos = in.position();
+ long origPos = getCurrentPosition();
// point to the first tag
- in.position(9);
+ setCurrentPosition(9);
posTagMap = new HashMap<Long, Integer>();
int idx = 0;
while (this.hasMoreTags()) {
- int pos = in.position();
+ long pos = getCurrentPosition();
posTagMap.put((long) pos, idx++);
ITag tmpTag = this.readTagHeader();
@@ -398,4 +539,5 @@
// Grab Frame type
+ fillBuffer(1);
byte frametype = in.get();
if (((frametype & MASK_VIDEO_FRAMETYPE) >> 4) == FLAG_FRAMETYPE_KEYFRAME) {
@@ -411,8 +553,8 @@
// XXX Paul: this 'properly' handles damaged FLV files - as far as
// duration/size is concerned
- int newPosition = (pos + tmpTag.getBodySize() + 15);
+ long newPosition = pos + tmpTag.getBodySize() + 15;
// log.debug("---->" + in.remaining() + " limit=" + in.limit() + "
// new pos=" + newPosition);
- if (newPosition >= in.limit()) {
+ if (newPosition >= getTotalBytes()) {
log.info("New position exceeds limit");
if (log.isDebugEnabled()) {
@@ -421,6 +563,6 @@
log.debug(" data type=" + tmpTag.getDataType()
+ " bodysize=" + tmpTag.getBodySize());
- log.debug(" remaining=" + in.remaining() + " limit="
- + in.limit() + " new pos=" + newPosition);
+ log.debug(" remaining=" + getRemainingBytes() + " limit="
+ + getTotalBytes() + " new pos=" + newPosition);
log.debug(" pos=" + pos);
log.debug("-----");
@@ -428,13 +570,13 @@
break;
} else {
- in.position(newPosition);
+ setCurrentPosition(newPosition);
}
}
// restore the pos
- in.position(origPos);
+ setCurrentPosition(origPos);
keyframeMeta = new KeyFrameMeta();
posTimeMap = new HashMap<Long, Long>();
- keyframeMeta.positions = new int[positionList.size()];
+ keyframeMeta.positions = new long[positionList.size()];
keyframeMeta.timestamps = new int[timestampList.size()];
for (int i = 0; i < keyframeMeta.positions.length; i++) {
@@ -447,8 +589,12 @@
}
- synchronized public void position(long pos) {
- // FIXME what if file size is larger than 2G?
- // TODO: adjust position to point to nearest keyframe
- in.position((int) pos);
+ /**
+ * Put the current position to pos.
+ * The caller must ensure the pos is a valid one
+ * (eg. not sit in the middle of a frame)
+ * @param pos
+ */
+ public void position(long pos) {
+ setCurrentPosition(pos);
// Make sure we have informations about the keyframes.
analyzeKeyFrames();
@@ -469,4 +615,5 @@
private ITag readTagHeader() {
// PREVIOUS TAG SIZE
+ fillBuffer(15);
int previousTagSize = in.getInt();
@@ -486,3 +633,9 @@
return new Tag(dataType, timestamp, bodySize, null, previousTagSize);
}
+
+ public enum BufferType {
+ AUTO,
+ DIRECT,
+ HEAP
+ }
}
Index: /java/server/trunk/src/org/red5/io/flv/IKeyFrameDataAnalyzer.java
===================================================================
--- /java/server/trunk/src/org/red5/io/flv/IKeyFrameDataAnalyzer.java (revision 1406)
+++ /java/server/trunk/src/org/red5/io/flv/IKeyFrameDataAnalyzer.java (revision 1578)
@@ -31,5 +31,5 @@
public int timestamps[];
- public int positions[];
+ public long positions[];
}
}
Index: /java/server/trunk/src/org/red5/io/mp3/impl/MP3Reader.java
===================================================================
--- /java/server/trunk/src/org/red5/io/mp3/impl/MP3Reader.java (revision 1406)
+++ /java/server/trunk/src/org/red5/io/mp3/impl/MP3Reader.java (revision 1578)
@@ -348,5 +348,5 @@
posTimeMap = new HashMap<Integer, Double>();
frameMeta = new KeyFrameMeta();
- frameMeta.positions = new int[positionList.size()];
+ frameMeta.positions = new long[positionList.size()];
frameMeta.timestamps = new int[timestampList.size()];
for (int i = 0; i < frameMeta.positions.length; i++) {
Note:
Diffs are chopped if more than 25k.
This is to get past the limit on the mailing list.
More information about the Red5commits
mailing list