Re: [freenet-dev] [freenet-cvs] r19544 - trunk/freenet/src/f…

Top Page
Author: Matthew Toseland
Date:  
To: devl
Subject: Re: [freenet-dev] [freenet-cvs] r19544 - trunk/freenet/src/freenet/client
Delete this message
Reply to this message
gpg: Signature made Thu Apr 24 23:03:23 2008 UTC using DSA key ID E43DA450
gpg: Good signature from "Matthew John Toseland <toad@amphibian.dyndns.org>"
On Thursday 24 April 2008 17:19, nextgens@??? wrote:
> Author: nextgens
> Date: 2008-04-24 16:19:07 +0000 (Thu, 24 Apr 2008)
> New Revision: 19544
>
> Modified:
> trunk/freenet/src/freenet/client/FECCodec.java
> Log:
> Hopefully fix #2287: Freenet uses too many fd's when starting a 2G+ insert,

results in failure to persist
>
> Now we leave it up to the GC ASAP


This isn't going to work. Bucket.getOutputStream() doesn't append, at least,
that's always been my assumption. And Bucket.getInputStream() *ALWAYS* starts
at zero. I expect this commit to seriously break things - did you test it?

It is conceivable that we'd run out of threads on a quad core system (192 fd's
per encode/decode, 4 at once), but imho it's unlikely - if we're running out
of fd's, most likely there's a leak, surely?
>
> Modified: trunk/freenet/src/freenet/client/FECCodec.java
> ===================================================================
> --- trunk/freenet/src/freenet/client/FECCodec.java    2008-04-24 13:43:50 UTC

(rev 19543)
> +++ trunk/freenet/src/freenet/client/FECCodec.java    2008-04-24 16:19:07 UTC

(rev 19544)
> @@ -7,7 +7,6 @@
> import java.io.IOException;
> import java.io.OutputStream;
> import java.util.LinkedList;
> -import java.util.NoSuchElementException;
>
> import com.onionnetworks.fec.FECCode;
> import com.onionnetworks.util.Buffer;
> @@ -20,6 +19,7 @@
> import freenet.support.io.BucketTools;
> import freenet.support.io.Closer;
> import freenet.support.io.NativeThread;
> +import java.util.ArrayList;
>
> /**
> * FEC (forward error correction) handler.
> @@ -97,12 +97,10 @@
>             throw new IllegalArgumentException();
>         Buffer[] packets = new Buffer[k];
>         Bucket[] buckets = new Bucket[n];
> -        DataInputStream[] readers = new DataInputStream[n];
> -        OutputStream[] writers = new OutputStream[k];
>         int numberToDecode = 0; // can be less than n-k
> +        ArrayList existingReaders = new ArrayList();
> +        ArrayList existingWriters = new ArrayList();
>
> -        try {
> -
>             byte[] realBuffer = new byte[k * STRIPE_SIZE];
>
>             int[] packetIndexes = new int[k];
> @@ -114,15 +112,14 @@
>             for(int i = 0; i < k; i++)
>                 packets[i] = new Buffer(realBuffer, i * STRIPE_SIZE,
>                     STRIPE_SIZE);
> -
> +            
>             for(int i = 0; i < dataBlockStatus.length; i++) {
>                 buckets[i] = dataBlockStatus[i].getData();
>                 if(buckets[i] == null) {
>                     buckets[i] = bf.makeBucket(blockLength);
> -                    writers[i] = buckets[i].getOutputStream();
> +                    existingWriters.add(new Integer(i));
>                     if(logMINOR)
>                         Logger.minor(this, "writers[" + i + "] != null");
> -                    readers[i] = null;
>                     numberToDecode++;
>                 }
>                 else {
> @@ -137,17 +134,14 @@
>                     }
>                     if(logMINOR)
>                         Logger.minor(this, "writers[" + i + "] = null (already filled)");
> -                    writers[i] = null;
> -                    readers[i] = new DataInputStream(buckets[i].getInputStream());
> +                    existingReaders.add(new Integer(i));
>                     packetIndexes[idx++] = i;
>                 }
>             }
>             for(int i = 0; i < checkBlockStatus.length; i++) {
>                 buckets[i + k] = checkBlockStatus[i].getData();
> -                if(buckets[i + k] == null)
> -                    readers[i + k] = null;
> -                else {
> -                    readers[i + k] = new DataInputStream(buckets[i + k].getInputStream());
> +                if(buckets[i + k] != null) {
> +                    existingReaders.add(new Integer(i+k));
>                     if(idx < k)
>                         packetIndexes[idx++] = i + k;
>                 }
> @@ -160,15 +154,18 @@
>                 for(int i = 0; i < packetIndexes.length; i++)
>                     Logger.minor(this, "[" + i + "] = " + packetIndexes[i]);
>
> -            if(numberToDecode > 0)
> +            if(numberToDecode > 0) {
>                 // Do the (striped) decode
>
>                 for(int offset = 0; offset < blockLength; offset += STRIPE_SIZE) {
>                     // Read the data in first
>                     for(int i = 0; i < k; i++) {
>                         int x = packetIndexes[i];
> -                        readers[x].readFully(realBuffer, i * STRIPE_SIZE,
> -                            STRIPE_SIZE);
> +                        if(existingReaders.contains(new Integer(x))) {
> +                            DataInputStream dis = new

DataInputStream(buckets[i].getInputStream());
> +                            dis.readFully(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
> +                            Closer.close(dis);
> +                        }
>                     }
>                     // Do the decode
>                     // Not shuffled
> @@ -178,19 +175,16 @@
>                     fec.decode(packets, disposableIndexes);
>                     // packets now contains an array of decoded blocks, in order
>                     // Write the data out
> -                    for(int i = 0; i < k; i++)
> -                        if(writers[i] != null)
> -                            writers[i].write(realBuffer, i * STRIPE_SIZE,
> -                                STRIPE_SIZE);
> +                    for(int i = 0; i < k; i++) {
> +                        if(existingWriters.contains(new Integer(i))) {
> +                            OutputStream dos = buckets[i].getOutputStream();
> +                            dos.write(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
> +                        }
> +                    }
>                 }
> +            }
>
> -        }
> -        finally {
> -            for(int i = 0; i < k; i++)
> -                Closer.close(writers[i]);
> -            for(int i = 0; i < n; i++)
> -                Closer.close(readers[i]);
> -        }
> +                    
>         // Set new buckets only after have a successful decode.
>         // Note that the last data bucket will be overwritten padded.
>         for(int i = 0; i < dataBlockStatus.length; i++) {
> @@ -222,11 +216,7 @@
>         Buffer[] dataPackets = new Buffer[k];
>         Buffer[] checkPackets = new Buffer[n - k];
>         Bucket[] buckets = new Bucket[n];
> -        DataInputStream[] readers = new DataInputStream[k];
> -        OutputStream[] writers = new OutputStream[n - k];
>
> -        try {
> -
>             int[] toEncode = new int[n - k];
>             int numberToEncode = 0; // can be less than n-k
>
> @@ -250,20 +240,18 @@
>                     else
>                         throw new IllegalArgumentException("Too big: " + sz + " bigger than "

+ blockLength);
>                 }
> -                readers[i] = new DataInputStream(buckets[i].getInputStream());
>             }
> -
> +            
> +            ArrayList existingWriters = new ArrayList();
>             for(int i = 0; i < checkBlockStatus.length; i++) {
>                 buckets[i + k] = checkBlockStatus[i];
>                 if(buckets[i + k] == null) {
>                     buckets[i + k] = bf.makeBucket(blockLength);
> -                    writers[i] = buckets[i + k].getOutputStream();
>                     toEncode[numberToEncode++] = i + k;
> -                }
> -                else
> -                    writers[i] = null;
> +                    existingWriters.add(new Integer(i));
> +                }    
>             }
> -
> +            
>             //            Runtime.getRuntime().gc();
> //            Runtime.getRuntime().runFinalization();
> //            Runtime.getRuntime().gc();
> @@ -280,9 +268,11 @@
>                     if(logMINOR)
>                         Logger.minor(this, "Memory in use before read: " +

memUsedBeforeRead);
>                     // Read the data in first
> -                    for(int i = 0; i < k; i++)
> -                        readers[i].readFully(realBuffer, i * STRIPE_SIZE,
> -                            STRIPE_SIZE);
> +                    for(int i = 0; i < k; i++) {
> +                        DataInputStream dis = new

DataInputStream(buckets[i].getInputStream());
> +                        dis.readFully(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
> +                        Closer.close(dis);
> +                    }
>                     // Do the encode
>                     // Not shuffled
>                     long startTime = System.currentTimeMillis();
> @@ -306,19 +296,18 @@
>                         Logger.minor(this, "Stripe encode took " + (endTime - startTime)

+ "ms for k=" + k + ", n=" + n + ", stripeSize=" + STRIPE_SIZE);
>                     // packets now contains an array of decoded blocks, in order
>                     // Write the data out
> -                    for(int i = k; i < n; i++)
> -                        if(writers[i - k] != null)
> -                            writers[i - k].write(realBuffer, i * STRIPE_SIZE,
> -                                STRIPE_SIZE);
> +                    
> +                    for(int i = k; i < n; i++) {
> +                        int index = i-k;
> +                        
> +                        if(existingWriters.contains(new Integer(index))) {
> +                            OutputStream os = buckets[i].getOutputStream();
> +                            os.write(realBuffer, i * STRIPE_SIZE, STRIPE_SIZE);
> +                            Closer.close(os);
> +                        }
> +                    }
>                 }
>
> -        }
> -        finally {
> -            for(int i = 0; i < k; i++)
> -                Closer.close(readers[i]);
> -            for(int i = 0; i < n - k; i++)
> -                Closer.close(writers[i]);
> -        }
>         // Set new buckets only after have a successful decode.
>         for(int i = 0; i < checkBlockStatus.length; i++) {
>             Bucket data = buckets[i + k];
>
> _______________________________________________
> cvs mailing list
> cvs@???
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>