[freenet-cvs] r19557 - trunk/freenet/src/freenet/store

Top Page
Delete this message
Reply to this message
Author: devl
Date:  
To: cvs
Subject: [freenet-cvs] r19557 - trunk/freenet/src/freenet/store
Author: j16sdiz
Date: 2008-04-25 16:21:35 +0000 (Fri, 25 Apr 2008)
New Revision: 19557

Modified:
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
Convert BDBFS to use FileChannel


Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java    2008-04-25 15:05:10 UTC (rev 19556)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java    2008-04-25 16:21:35 UTC (rev 19557)
@@ -4,6 +4,8 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -79,6 +81,9 @@
    private RandomAccessFile storeRAF;
    private RandomAccessFile keysRAF;
    private RandomAccessFile lruRAF;
+    private FileChannel storeFC;
+    private FileChannel keysFC;
+    private FileChannel lruFC;
    private final SortedLongSet freeBlocks;
    private final String name;
    /** Callback which translates records to blocks and back, specifies the size of blocks etc. */
@@ -252,17 +257,20 @@
                if(!storeFile.createNewFile())
                    throw new IOException("Can't create a new file " + storeFile + " !");
            storeRAF = new RandomAccessFile(storeFile,"rw");
+            storeFC = storeRAF.getChannel();

            if(!lruFile.exists())
                if(!lruFile.createNewFile())
                    throw new IOException("Can't create a new file " + lruFile + " !");
            lruRAF = new RandomAccessFile(lruFile,"rw");
+            lruFC = lruRAF.getChannel();

            if(keysFile != null) {
                if(!keysFile.exists())
                    if(!keysFile.createNewFile())
                        throw new IOException("Can't create a new file " + keysFile + " !");
                keysRAF = new RandomAccessFile(keysFile,"rw");
+                keysFC = keysRAF.getChannel();
            } else keysRAF = null;

            if (wipe) {
@@ -622,18 +630,15 @@
                boolean readLRU = false;
                boolean readKey = false;
                try {
-                    storeRAF.seek(entry * (headerBlockSize + dataBlockSize));
-                    storeRAF.readFully(buf);
+                    storeFC.read(ByteBuffer.wrap(buf), entry * (headerBlockSize + dataBlockSize));
                    lruValue = 0;
                    if(lruRAF.length() > ((entry + 1) * 8)) {
                        readLRU = true;
-                        lruRAF.seek(entry * 8);
-                        lruValue = lruRAF.readLong();
+                        lruValue = fcReadLRU(entry);
                    }
                    if(keysRAF != null && keysRAF.length() > ((entry + 1) * keyLength)) {
                        readKey = true;
-                        keysRAF.seek(entry * keyLength);
-                        keysRAF.readFully(keyBuf);
+                        fcReadKey(entry, keyBuf);
                    }
                } catch (EOFException e) {
                    System.err.println("Was reading "+wantedBlock+" to write to "+unwantedBlock);
@@ -642,15 +647,12 @@
                    throw e;
                }
                entry = unwantedBlock.longValue();
-                storeRAF.seek(entry * (headerBlockSize + dataBlockSize));
-                storeRAF.write(buf);
+                storeFC.write(ByteBuffer.wrap(buf), entry * (headerBlockSize + dataBlockSize));
                if(readLRU) {
-                    lruRAF.seek(entry * 8);
-                    lruRAF.writeLong(lruValue);
+                    fcWriteLRU(entry, lruValue);
                }
                if(readKey) {
-                    keysRAF.seek(entry * keyLength);
-                    keysRAF.write(keyBuf);
+                    fcWriteKey(entry, keyBuf);
                }
                
                // Update the database w.r.t. the old block.
@@ -1158,11 +1160,7 @@
                byte[] header = new byte[headerBlockSize];
                byte[] data = new byte[dataBlockSize];
                try {
-                    synchronized(storeRAF) {
-                        storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
-                        storeRAF.readFully(header);
-                        storeRAF.readFully(data);
-                    }
+                    fcReadStore(storeBlock.offset, header, data);
                } catch (EOFException e) {
                    Logger.error(this, "No block");
                    c.close();
@@ -1201,16 +1199,15 @@
                        keysDB.put(t,routingkeyDBE,blockDBE);
                        if(fullKey == null)
                            fullKey = block.getFullKey();
-                        synchronized(storeRAF) {
+                        
                            if(keysRAF != null) {
-                                keysRAF.seek(storeBlock.offset * keyLength);
-                                keysRAF.write(fullKey);
+                                fcWriteKey(storeBlock.offset, fullKey);
                                if(logDEBUG)
                                    Logger.debug(this, "Written full key length "+fullKey.length+" to block "+storeBlock.offset+" at "+(storeBlock.offset * keyLength)+" for "+callback);
                            } else if(logDEBUG) {
                                Logger.debug(this, "Not writing full key length "+fullKey.length+" for block "+storeBlock.offset+" for "+callback);
                            }
-                        }
+                        
                    } catch (DatabaseException e) {
                        Logger.error(this, "Caught database exception "+e+" while replacing element");
                        addFreeBlock(storeBlock.offset, true, "Bogus key");
@@ -1237,10 +1234,7 @@
                    c = null;
                    t.commit();
                    t = null;
-                    synchronized(storeRAF) {
-                        lruRAF.seek(storeBlock.offset * 8);
-                        lruRAF.writeLong(storeBlock.recentlyUsed);
-                    }
+                    fcWriteLRU(storeBlock.offset, storeBlock.recentlyUsed);
                } else {
                    c.close();
                    c = null;
@@ -1258,16 +1252,14 @@
                synchronized(this) {
                    misses++;
                }
-                synchronized(storeRAF) {
+                
                    // Clear the key in the keys file.
                    byte[] buf = new byte[keyLength];
                    for(int i=0;i<buf.length;i++) buf[i] = 0; // FIXME unnecessary?
                    if(keysRAF != null) {
-                        keysRAF.seek(storeBlock.offset * keyLength);
-                        keysRAF.write(buf);
+                        fcWriteKey(storeBlock.offset, buf);
                    }
-                }
-
+                
                keysDB.delete(t, routingkeyDBE);
                
                // Insert the block into the index with a random key, so that it's part of the LRU.
@@ -1394,14 +1386,9 @@

            StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
                        
-            synchronized(storeRAF) {
-                storeRAF.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
-                storeRAF.write(header);
-                storeRAF.write(data);
-                if(keysRAF != null) {
-                    keysRAF.seek(storeBlock.offset * keyLength);
-                    keysRAF.write(fullKey);
-                }
+            fcWriteStore(storeBlock.offset, header, data);
+            if (keysRAF != null) {
+                fcWriteKey(storeBlock.offset, fullKey);
            }
            
            // Unlock record.
@@ -1522,22 +1509,17 @@
        DatabaseEntry blockDBE = new DatabaseEntry();
        storeBlockTupleBinding.objectToEntry(storeBlock, blockDBE);
        keysDB.put(t,routingkeyDBE,blockDBE);
-        synchronized(storeRAF) {
-            storeRAF.seek(storeBlock.getOffset()*(long)(dataBlockSize+headerBlockSize));
-            storeRAF.write(header);
-            storeRAF.write(data);
-            lruRAF.seek(storeBlock.getOffset() * 8);
-            lruRAF.writeLong(storeBlock.recentlyUsed);
-            if(keysRAF != null) {
-                keysRAF.seek(storeBlock.getOffset() * keyLength);
-                keysRAF.write(fullKey);
-            }
+        
+        fcWriteStore(storeBlock.getOffset(), header, data);
+        fcWriteLRU( storeBlock.getOffset(),storeBlock.recentlyUsed);
+        if (keysRAF != null)
+            fcWriteKey(storeBlock.getOffset(), fullKey);
+        synchronized (this) {
            writes++;
        }
    }

    private boolean writeNewBlock(long blockNum, byte[] header, byte[] data, Transaction t, DatabaseEntry routingkeyDBE, byte[] fullKey) throws DatabaseException, IOException {
-        long byteOffset = blockNum*(dataBlockSize+headerBlockSize);
        StoreBlock storeBlock = new StoreBlock(this, blockNum);
        long lruValue = storeBlock.recentlyUsed;
        DatabaseEntry blockDBE = new DatabaseEntry();
@@ -1562,31 +1544,20 @@
                throw e;
            }
        }
-        synchronized(storeRAF) {
-            try {
-                storeRAF.seek(byteOffset);
-            } catch (IOException ioe) {
-                if(byteOffset > (2l*1024*1024*1024)) {
-                    Logger.error(this, "Environment does not support files bigger than 2 GB?");
-                    System.out.println("Environment does not support files bigger than 2 GB? (exception to follow)");
-                }
-                Logger.error(this, "Caught IOException on storeRAF.seek("+byteOffset+ ')');
-                throw ioe;
-            }
-            storeRAF.write(header);
-            storeRAF.write(data);
-            lruRAF.seek(blockNum * 8);
-            lruRAF.writeLong(lruValue);
+        
+            fcWriteStore(blockNum, header, data);
+            fcWriteLRU(blockNum, lruValue);
            if(keysRAF != null) {
-                keysRAF.seek(blockNum * keyLength);
-                keysRAF.write(fullKey);
+                fcWriteKey(blockNum, fullKey);
                if(logDEBUG)
                    Logger.debug(this, "Written full key length "+fullKey.length+" to block "+blockNum+" at "+(blockNum * keyLength)+" for "+callback);
            } else if(logDEBUG) {
                Logger.debug(this, "Not writing full key length "+fullKey.length+" for block "+blockNum+" for "+callback);
            }
+            synchronized (this) {
            writes++;
        }
+        
        return true;
    }

@@ -1802,7 +1773,7 @@
    private static void flushAndCloseRAF(RandomAccessFile file) {
        try {
            if (file != null)
-                file.getFD().sync();
+                file.getChannel().force(true);
        } catch (IOException e) {
            // ignore
        }
@@ -2144,6 +2115,46 @@
        return db;
    }

+    private void fcWriteLRU(long entry, long data) throws IOException {
+        ByteBuffer bf = ByteBuffer.allocateDirect(8);
+        bf.putLong(data);
+        bf.flip();
+        lruFC.write(bf, entry * 8);
+    }
+    private long fcReadLRU(long entry) throws IOException {
+        ByteBuffer bf = ByteBuffer.allocateDirect(8);
+        lruFC.read(bf, entry * 8);
+        bf.flip();
+        if (bf.remaining() < 8)
+            throw new EOFException();
+        return bf.getLong();
+    }
+    private void fcReadKey(long entry, byte[] data) throws IOException {
+        if (keysFC.read(ByteBuffer.wrap(data), entry * keyLength) != data.length)
+            throw new EOFException();
+    }
+    private void fcWriteKey(long entry, byte[] data) throws IOException {
+        keysFC.write(ByteBuffer.wrap(data), entry * keyLength);
+    }
+    private void fcWriteStore(long entry, byte[] header, byte[] data) throws IOException {
+        ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize + dataBlockSize);
+        bf.put(header);
+        bf.put(data);
+        bf.flip();
+        storeFC.write(bf, (headerBlockSize + dataBlockSize) * entry);
+    }
+    private void fcReadStore(long entry,byte[] header, byte[] data ) throws IOException {
+        ByteBuffer bf = ByteBuffer.allocateDirect(headerBlockSize + dataBlockSize);
+        int dataRead = storeFC.read(bf, (headerBlockSize + dataBlockSize) * entry);
+        
+        if (dataRead != headerBlockSize + dataBlockSize)
+            throw new EOFException();
+        
+        bf.flip();
+        bf.get(header);
+        bf.get(data);
+    }
+    
public void handleOOM() throws Exception {
        if (storeRAF != null)
            storeRAF.getFD().sync();