[freenet-cvs] r19857 - branches/saltedhashstore/freenet/src/…

Top Page
Delete this message
Reply to this message
Author: devl
Date:  
To: cvs
Subject: [freenet-cvs] r19857 - branches/saltedhashstore/freenet/src/freenet/store
Author: j16sdiz
Date: 2008-05-09 02:09:10 +0000 (Fri, 09 May 2008)
New Revision: 19857

Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
Initial datastore resize code


Modified: branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
--- branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java    2008-05-09 02:08:50 UTC (rev 19856)
+++ branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java    2008-05-09 02:09:10 UTC (rev 19857)
@@ -28,6 +28,9 @@
import freenet.support.io.FileUtil;
import freenet.support.math.RunningAverage;
import freenet.support.math.SimpleRunningAverage;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;

/**
* Index-less data store based on salted hash
@@ -36,7 +39,7 @@
*/
public class SaltedHashFreenetStore implements FreenetStore {
    private static final boolean OPTION_SAVE_PLAINKEY = true;
-    
+
    private static boolean logMINOR;
    private static boolean logDEBUG;

@@ -116,8 +119,12 @@
    }

    private Entry probeEntry(byte[] routingKey) throws IOException {
-        // TODO probe store resize
-        return probeEntry0(routingKey, storeSize);
+        Entry entry = probeEntry0(routingKey, storeSize);
+
+        if (entry == null && prevStoreSize != 0)
+            entry = probeEntry0(routingKey, prevStoreSize);
+
+        return entry;
    }

    private Entry probeEntry0(byte[] routingKey, long probeStoreSize) throws IOException {
@@ -132,7 +139,6 @@
            entry = readEntry(offset, routingKey);
        } catch (EOFException e) {
            // may occur on resize, silent it a bit
-            //TODO store resize
            Logger.error(this, "EOFException on probeEntry", e);
            return null;
        } finally {
@@ -252,7 +258,7 @@
            System.arraycopy(header, 0, this.header, 0, headerBlockLength);
            this.data = new byte[dataBlockLength];
            System.arraycopy(data, 0, this.data, 0, dataBlockLength);
-            
+
            if (OPTION_SAVE_PLAINKEY) {
                flag |= ENTRY_FLAG_PLAINKEY;
            }
@@ -260,6 +266,21 @@
            isEncrypted = false;
        }

+        /**
+         * @return the storeSize
+         */
+        protected long getStoreSize() {
+            return storeSize;
+        }
+
+        /**
+         * @param storeSize
+         * the storeSize to set
+         */
+        protected void setStoreSize(long storeSize) {
+            this.storeSize = storeSize;
+        }
+
        public Entry(ByteBuffer in) {
            assert in.remaining() == entryTotalLength;

@@ -276,7 +297,7 @@
                plainRoutingKey = new byte[0x20];
                in.get(plainRoutingKey);
            }
-            
+
            // reserved bytes
            in.position((int) ENTRY_HEADER_LENGTH);

@@ -300,10 +321,10 @@
            out.putLong(flag);
            out.putLong(storeSize);

-            if (OPTION_SAVE_PLAINKEY) {
+            if (OPTION_SAVE_PLAINKEY && plainRoutingKey != null) {
                out.put(plainRoutingKey);
            }
-            
+
            // reserved bytes
            out.position((int) ENTRY_HEADER_LENGTH);

@@ -356,7 +377,7 @@
                else
                    return false;
            }
-            
+
            if (plainRoutingKey != null) {
                // we knew the key
                if (!Arrays.equals(this.plainRoutingKey, routingKey)) {
@@ -416,6 +437,10 @@
                throw new RuntimeException(e);
            }
        }
+
+        public boolean isFree() {
+            return (flag & ENTRY_FLAG_OCCUPIED) == 0;
+        }
    }

    /**
@@ -435,13 +460,13 @@
            storeFiles[i] = new File(baseDir, name + ".data-" + fmt.format(i));

            storeRAF[i] = new RandomAccessFile(storeFiles[i], "rw");
-            //TODO store resize
-            if (storeRAF[i].length() == 0) { // New file?
-                storeRAF[i].setLength(entryTotalLength * (storeSize / FILE_SPLIT + 1));
-            }
+
            storeFC[i] = storeRAF[i].getChannel();
            storeFC[i].lock();
        }
+
+        long storeFileSize = Math.max(storeSize, prevStoreSize);
+        setStoreFileSize(storeFileSize);
    }

    /**
@@ -637,6 +662,11 @@
    private Cleaner cleanerThread;

    private class Cleaner extends Thread {
+        /**
+         * How often the clean should run
+         */
+        private static final int CLEANER_PERIOD = 10 * 60 * 1000; // 10 minutes
+
        public Cleaner() {
            setName("Store-" + name + "-Cleaner");
            setPriority(MIN_PRIORITY);
@@ -645,14 +675,15 @@

        public void run() {
            while (!shutdown) {
-                if (prevStoreSize != 0)
-                    moveOldEntries();
-                else
-                    estimateStoreSize();
+                synchronized (cleanerLock) {
+                    if (prevStoreSize != 0)
+                        resizeStore();
+                    else
+                        estimateStoreSize();

-                synchronized (cleanerLock) {
+                    cleanerLock.notifyAll();
                    try {
-                        cleanerLock.wait(10 * 60 * 1000); // 10 minutes
+                        cleanerLock.wait(CLEANER_PERIOD);
                    } catch (InterruptedException e) {
                        Logger.debug(this, "interrupted", e);
                    }
@@ -661,19 +692,227 @@
        }

        /**
-         * Move old entries to new location
+         * Maximum memory to be used on resize
         */
-        private void moveOldEntries() {
-            Logger.minor(this, "move old entries");
-            prevStoreSize = 0;
+        private static final int RESIZE_MEMORY = 2 * 1024 * 1024; // 2MiB
+        /**
+         * Phase 1 Rounds
+         */
+        private static final int RESIZE_PHASE1_ROUND = 12;
+        /**
+         * Maximum resize round
+         */
+        private static final int RESIZE_MAX_ROUND = 16;
+
+        /**
+         * Are we shrinking the store?
+         */
+        private boolean shrinking;
+        private long newEntries;
+        private long oldEntries;
+        private long freeEntries;
+        private long resolvedEntries;
+        private long droppedEntries;
+        private long maxOldItemOffset;
+
+        /**
+         * Numbers of round resize have ran
+         */
+        private int resizeRound;
+
+        /**
+         * Move old entries to new location and resize store
+         */
+        private void resizeStore() {
+            ++resizeRound;
+            Logger.normal(this, "Starting datastore resize (round " + resizeRound + ")");
+
+            if (resizeRound == 1) { // first round
+                if (storeSize < prevStoreSize) {
+                    shrinking = true;
+                } else {
+                    setStoreFileSize(storeSize);
+                }
+                maxOldItemOffset = prevStoreSize - 1;
+            }
+
+            moveOldEntry0(resizeRound > RESIZE_PHASE1_ROUND);
+
+            if (logMINOR)
+                Logger.minor(this, "Finished resize round " + resizeRound + ": newEntries=" + newEntries
+                 + ", oldEntries=" + oldEntries + ", freeEntries=" + freeEntries + ", resolvedEntries="
+                 + resolvedEntries + ", droppedEntries=" + droppedEntries);
+
+            if (shutdown)
+                return;
+
+            // report key count
+            estimatedCount.report(newEntries + oldEntries - droppedEntries);
+
+            // Shrink store file size
+            if (shrinking)
+                setStoreFileSize(Math.max(storeSize, maxOldItemOffset));
+
+            // Check finished
+            if (resizeRound >= RESIZE_MAX_ROUND || oldEntries == 0 || resolvedEntries + droppedEntries >= oldEntries) {
+                // Finished
+                Logger.normal(this, "Datastore resize finished (total " + resizeRound + "rounds)");
+
+                prevStoreSize = 0;
+                resizeRound = 0;
+            }
        }

        /**
-         * Sample to take at a time
+         * Scan all entries and try to move them
         */
-        private static final double SAMPLE_RATE = 0.05; // 5%
+        private void moveOldEntry0(boolean queueItem) {
+            newEntries = 0;
+            oldEntries = 0;
+            freeEntries = 0;
+            resolvedEntries = 0;
+            droppedEntries = 0;

+            List oldItems = null;
+            if (queueItem) {
+                oldItems = new ArrayList();
+            }
+
+            long maxOffset = maxOldItemOffset;
+            maxOldItemOffset = 0;
+            for (long offset = 0; offset <= maxOffset; offset++) {
+                if (logDEBUG && offset % 1024 == 0) {
+                    Logger.debug(this, "Resize progress: newEntries=" + newEntries + ", oldEntries=" + oldEntries
+                     + ", freeEntries=" + freeEntries + ", resolvedEntries=" + resolvedEntries
+                     + ", droppedEntries=" + droppedEntries);
+                }
+
+                if (shutdown)
+                    return;
+
+                if (!lockEntry(offset)) //lock
+                    continue;
+                try {
+                    Entry entry = readEntry(offset, null);
+
+                    if (entry.isFree()) {
+                        // free block
+                        freeEntries++;
+                    } else if (entry.getStoreSize() == storeSize) {
+                        // new store size entries
+                        maxOldItemOffset = offset;
+                        newEntries++;
+                    } else { // if (entry.getStoreSize() == prevStoreSize)
+                        // old store size entries, try to move them
+                        oldEntries++;
+                        maxOldItemOffset = offset;
+
+                        entry.setStoreSize(storeSize);
+                        long newOffset = entry.getOffset();
+
+                        if (newOffset == offset) { // lucky!
+                            writeEntry(entry); // write back entry storeSize
+                            resolvedEntries++;
+                            continue;
+                        }
+
+                        if (!lockEntry(newOffset)) // lock
+                            continue;
+                        try {
+                            // see what's in the new offset
+                            Entry newOffsetEntry = readEntry(newOffset, null);
+
+                            if (newOffsetEntry.isFree()) {
+                                // the new offset is freeeeeeee..
+                                writeEntry(entry);
+                                freeOffset(offset);
+                                resolvedEntries++;
+                            } else if (newOffsetEntry.getStoreSize() == storeSize) {
+                                // new offset already have a new entry, free old entry
+                                freeOffset(offset);
+                                droppedEntries++;
+                            } else if (Arrays.equals(entry.digestedRoutingKey, newOffsetEntry.digestedRoutingKey)) {
+                                // same digested routing key, free the old entry
+                                freeOffset(offset);
+                                resolvedEntries++;
+                            } else if (queueItem) {
+                                // break tie by moveing old item to queue
+                                if (oldItems.size() * entryTotalLength < RESIZE_MEMORY) {
+                                    oldItems.add(newOffsetEntry);
+                                    if (newOffset > offset) {
+                                        oldEntries++; // newOffset wasn't counted count it
+                                    }
+
+                                    writeEntry(entry);
+                                    freeOffset(offset);
+                                    resolvedEntries++;
+                                }
+                            }
+                        } finally {
+                            unlockEntry(newOffset);
+                        }
+                    }
+                } catch (IOException e) {
+                    Logger.debug(this, "IOExcception on moveOldEntries0", e);
+                } finally {
+                    unlockEntry(offset);
+                }
+            }
+
+            if (queueItem) {
+                putBackOldItems(oldItems);
+            }
+        }
+
        /**
+         * Put back oldItems with best effort
+         */
+        private void putBackOldItems(List oldItems) {
+            Iterator it = oldItems.iterator();
+            while (it.hasNext()) {
+                boolean done = false;
+
+                Entry entry = (Entry) it.next();
+                entry.setStoreSize(storeSize);
+
+                long newOffset = entry.getOffset();
+
+                if (!lockEntry(newOffset)) // lock
+                    continue;
+                try {
+                    if (isFree(newOffset)) {
+                        if (logDEBUG)
+                            Logger.debug(this, "Put back old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+                        writeEntry(entry);
+                        done = true;
+                    } else {
+                        if (logDEBUG)
+                            Logger.debug(this, "Drop old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+                    }
+                } catch (IOException e) {
+                    Logger.debug(this, "IOExcception on putBackOldItems", e);
+                } finally {
+                    unlockEntry(newOffset);
+                    it.remove();
+
+                    if (done)
+                        resolvedEntries++;
+                    else
+                        droppedEntries++;
+                }
+            }
+            oldItems.clear();
+        }
+
+        /**
+         * Samples to take on key count estimation
+         */
+        private static final double SAMPLE_RATE = 0.05; // 5%
+        /**
+         * Minimum samples to take on key count estimation
+         */
+        private static final int MIN_SAMPLE = 10000;
+        /**
         * Last sample position
         */
        private long samplePos = 0;
@@ -683,7 +922,7 @@
         */
        private void estimateStoreSize() {
            Logger.minor(this, "start estimating key count");
-            long numSample = Math.min((long) (SAMPLE_RATE * storeSize), 10000);
+            long numSample = Math.min((long) (SAMPLE_RATE * storeSize), MIN_SAMPLE);
            long sampled = 0;
            long occupied = 0;
            while (sampled < numSample) {
@@ -712,9 +951,32 @@
        }
    }

-    public void setMaxKeys(long maxStoreKeys, boolean shrinkNow) throws IOException {
-        // TODO
-        // NO-OP now
+    public void setMaxKeys(long newStoreSize, boolean shrinkNow) throws IOException {
+        Logger.normal(this, "[" + name + "] Resize newStoreSize=" + newStoreSize + ", shinkNow=" + shrinkNow);
+
+        assert newStoreSize > 0;
+
+        synchronized (cleanerLock) {
+            if (newStoreSize == this.storeSize)
+                return;
+
+            if (prevStoreSize != 0) {
+                if (shrinkNow) {
+                    // TODO shrink now
+                } else {
+                    Logger.normal(this, "[" + name + "] resize already in progress, ignore resize request");
+                    return;
+                }
+            }
+
+            prevStoreSize = storeSize;
+            storeSize = newStoreSize;
+            cleanerLock.notifyAll();
+
+            if (shrinkNow) {
+                // TODO shrink now
+            }
+        }
    }

    // ------------- Locking
@@ -941,4 +1203,19 @@
    public long getMaxKeys() {
        return storeSize;
    }
+
+    /**
+     * Change on disk store file size
+     *
+     * @param storeFileSize
+     */
+    private void setStoreFileSize(long storeFileSize) {
+        for (int i = 0; i < FILE_SPLIT; i++) {
+            try {
+                storeRAF[i].setLength(entryTotalLength * (storeFileSize / FILE_SPLIT + 1));
+            } catch (IOException e) {
+                Logger.error(this, "error resizing store file", e);
+            }
+        }
+    }
}