if (entry.isFree()) {
// free block
freeEntries++;
- } else if (entry.getStoreSize() == storeSize) {
+ continue LOOP_ENTRIES;
+ }
+ if (entry.getStoreSize() == storeSize) {
// new store size entries
maxOldItemOffset = offset;
newEntries++;
- } else { // if (entry.getStoreSize() == prevStoreSize)
- // old store size entries, try to move them
- oldEntries++;
- maxOldItemOffset = offset;
+ continue LOOP_ENTRIES;
+ }
- entry.setStoreSize(storeSize);
- long newOffset = entry.getOffset();
+ // if (entry.getStoreSize() == prevStoreSize)
+ // old store size entries, try to move them
+ maxOldItemOffset = offset;
+ oldEntries++;
- if (newOffset == offset) { // lucky!
- lockAndWrite(entry); // write back entry storeSize
+ entry.setStoreSize(storeSize);
+ long[] newOffset = entry.getOffset();
+
+ // Check if I can keep my current offset
+ for (int i = 0; i < newOffset.length; i++) {
+ if (newOffset[i] == offset) { // lucky!
+ writeEntry(entry, offset); // write back entry storeSize
resolvedEntries++;
- continue;
+
+ if (logDEBUG)
+ Logger.debug(this, "old entry " + HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+ + " resolved without moving");
+
+ continue LOOP_ENTRIES;
}
+ }
- if (!lockEntry(newOffset)) // lock
- continue;
- try {
+ boolean[] locked = new boolean[newOffset.length];
+ try {
+ // Lock all possible slots first
+ for (int i = 0; i < newOffset.length; i++) {
+ if (lockEntry(newOffset[i])) { // lock
+ locked[i] = true;
+ } else if (shutdown) { // oops
+ return;
+ }
+ }
+
+ // Probe for a free slot
+ for (int i = 0; i < newOffset.length; i++) {
// see what's in the new offset
- Entry newOffsetEntry = readEntry(newOffset, null);
+ Entry newOffsetEntry = readEntry(newOffset[i], null);
+ // Free slot
if (newOffsetEntry.isFree()) {
// the new offset is freeeeeeee..
- lockAndWrite(entry);
+ writeEntry(entry, newOffset[i]);
freeOffset(offset);
resolvedEntries++;
- } else if (newOffsetEntry.getStoreSize() == storeSize) {
- // new offset already have a new entry, free old entry
+
+ if (logDEBUG)
+ Logger.debug(this, "old entry " + HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+ + " resolved by moving to free block");
+
+ continue LOOP_ENTRIES;
+ }
+
+ // Same digested key: same routing key or SHA-256 collision
+ byte[] digestedRoutingKey = entry.getDigestedRoutingKey();
+ byte[] digestedRoutingKey2 = newOffsetEntry.getDigestedRoutingKey();
+ if (Arrays.equals(digestedRoutingKey, digestedRoutingKey2)) {
+ // assume same routing key, drop this as duplicate
freeOffset(offset);
droppedEntries++;
- } else if (Arrays.equals(entry.digestedRoutingKey, newOffsetEntry.digestedRoutingKey)) {
- // same digested routing key, free the old entry
- freeOffset(offset);
- resolvedEntries++;
- } else if (queueItem) {
- // break tie by moveing old item to queue
+
if (logDEBUG)
- Logger.debug(this, "Write item "
- + HexUtil.bytesToHex(newOffsetEntry.digestedRoutingKey)
- + " to old item file");
- writeOldItem(oldItemsFC, newOffsetEntry);
- if (newOffset > offset) {
- oldEntries++; // newOffset wasn't counted count it
- }
+ Logger.debug(this, "old entry " + HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+ + " dropped duplicate");
- lockAndWrite(entry);
- freeOffset(offset);
- resolvedEntries++;
+ continue LOOP_ENTRIES;
}
- } finally {
- unlockEntry(newOffset);
}
+
+ if (queueItem) {
+ if (logDEBUG)
+ Logger.debug(this, "old entry " + HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+ + " queued");
+ writeOldItem(oldItemsFC, entry);
+ freeOffset(offset);
+ }
+ } finally {
+ // unlock all entries
+ for (int i = 0; i < newOffset.length; i++) {
+ if (locked[i]) {
+ unlockEntry(newOffset[i]);
+ }
+ }
}
} catch (IOException e) {
Logger.debug(this, "IOExcception on moveOldEntries0", e);
@@ -951,40 +984,41 @@
* Put back oldItems with best effort
*
* @throws IOException
- *
+ */
private void putBackOldItems(FileChannel oldItems) throws IOException {
- while (true) {
+ LOOP_ITEMS: while (true) {
Entry entry = readOldItem(oldItems);
if (entry == null)
break;
entry.setStoreSize(storeSize);
- long newOffset = entry.getOffset();
+ long[] newOffset = entry.getOffset();
- if (!lockEntry(newOffset)) // lock
- continue;
- boolean done = false;
- try {
- if (isFree(newOffset)) {
- if (logDEBUG)
- Logger.debug(this, "Put back old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
- lockAndWrite(entry);
- done = true;
- } else {
- if (logDEBUG)
- Logger.debug(this, "Drop old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+ for (int i = 0; i < newOffset.length; i++) {
+ if (!lockEntry(newOffset[i])) // lock
+ continue;
+ try {
+ if (isFree(newOffset[i], entry)) {
+ if (logDEBUG)
+ Logger
+ .debug(this, "Put back old item: "
+ + HexUtil.bytesToHex(entry.digestedRoutingKey));
+ writeEntry(entry, newOffset[i]);
+ resolvedEntries++;
+ continue LOOP_ITEMS;
+ } else {
+ if (logDEBUG)
+ Logger.debug(this, "Drop old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+ }
+ } catch (IOException e) {
+ Logger.debug(this, "IOExcception on putBackOldItems", e);
+ } finally {
+ unlockEntry(newOffset[i]);
}
- } catch (IOException e) {
- Logger.debug(this, "IOExcception on putBackOldItems", e);
- } finally {
- unlockEntry(newOffset);
+ }
/**
* Samples to take on key count estimation
@@ -1056,6 +1093,7 @@
Logger.normal(this, "[" + name + "] Resize newStoreSize=" + newStoreSize + ", shinkNow=" + shrinkNow);
assert newStoreSize > 0;
+ // TODO assert newStoreSize > (141 * (3 * 3) + 13 * 3) * 2; // store size too small
synchronized (cleanerLock) {
if (newStoreSize == this.storeSize)
@@ -1088,8 +1126,8 @@
/**
* Lock the entry
*
- * This lock is <strong>not</strong> reentrance. No threads except Cleaner should hold more
- * then one lock at a time (or deadlock may occur).
+ * This lock is <strong>not</strong> reentrance. No threads except Cleaner should hold more then
+ * one lock at a time (or deadlock may occur).
*/
private boolean lockEntry(long offset) {
if (logDEBUG && logLOCK)
@@ -1140,7 +1178,7 @@
* Use this method to stop all read / write before database shutdown.
*
* @param timeout
- * the maximum time to wait in milliseconds.
+ * the maximum time to wait in milliseconds.
*/
private boolean lockGlobal(long timeout) {
synchronized (lockMap) {