[Rubyfreenet] r9311 - in trunk/apps/rubyFreenet: freenet/fcp…

Top Page
Delete this message
Reply to this message
Author: sitharus
Date:  
To: rubyfreenet
Subject: [Rubyfreenet] r9311 - in trunk/apps/rubyFreenet: freenet/fcp rqueue rqueue/templates
Author: sitharus
Date: 2006-06-19 01:18:27 +0000 (Mon, 19 Jun 2006)
New Revision: 9311

Added:
trunk/apps/rubyFreenet/rqueue/README
trunk/apps/rubyFreenet/rqueue/templates/
trunk/apps/rubyFreenet/rqueue/templates/index.rhtml
Modified:
trunk/apps/rubyFreenet/freenet/fcp/client.rb
trunk/apps/rubyFreenet/rqueue/rqueue
Log:
Added README for rqueue
Added callbacks to client for all messages and unknown messages
Added watch_global and remove_request to client


Modified: trunk/apps/rubyFreenet/freenet/fcp/client.rb
===================================================================
--- trunk/apps/rubyFreenet/freenet/fcp/client.rb    2006-06-19 01:02:29 UTC (rev 9310)
+++ trunk/apps/rubyFreenet/freenet/fcp/client.rb    2006-06-19 01:18:27 UTC (rev 9311)
@@ -73,6 +73,15 @@
# acquire locks with Message#lock before using, but don't forget to release with Message#unlock.
#
# Message#synchronize may be implemented later.
+ #
+ # == Unknown and All Message callbacks
+ #
+ # Client#all_messages takes a block that takes status, request and response and is called for
+ # all messages, including unknown ones. If the message is unknown then request is nil.
+ #
+ # Client#unknown_message takes a block that takes status, response for all unknown messages
+ #
+ # Both these callbacks are threaded.
class Client
include Logger::Severity

@@ -269,8 +278,10 @@
end

# Get the request status for a persistent request. Not useful in synchronous systems.
+ # This will output some PersistentGet/PersistentPut requests, which don't invoke callbacks,
+ # SimpleProgress if Verbosity=1, DataFound for all GET requests and AllData if ReturnType=direct
#
- # Any replies will be directed to the async message's callback
+ # Any replies will be directed to the async message's callback.
def request_status(identifier, global, only_data=true, block=true, &callback)
log(DEBUG, 'Requesting status')
message = Message.new('GetRequestStatus', nil, 'Identifier'=>identifier, 'Global'=>global, 'OnlyData'=>only_data)
@@ -278,14 +289,39 @@
block_message(message, block, &callback)
end

+ # Enable global queue watching. This isn't of much use until I implement a global callback for unknown
+ # messages.
def watch_global(enabled=true, verbosity=1)
message = Message.new('WatchGlobal', nil, 'Enabled'=>enabled, 'VerbosityMask'=>verbosity)
send(message)
end
+
+ # Modify a persistent request. Pass the options to change, though Identifier and Global
+ # must remain the same
+ def modify_request(message, options)
+ options = options.merge('Identifier'=>message.identifier, 'Global'=>message.global)
+ message = Message.new('ModifyPersistentRequest', nil, options)
+ send(message)
+ end
+
+ # Remove a persistent request from the node. Call this once you've finished with a request.
+ def remove_request(message)
+ message = Message.new('RemovePersistentRequest', nil, 'Identifier'=>message.identifier, 'Global'=>message.items['Global'])
+ send(message)
+ end
+
+ def unknown_message(&callback)
+ @unknown_callback = callback
+ end

+ def all_messages(&callback)
+ @all_callback = callback
+ end
+
# Private calls
private

+ # Block the current thread until a response is received, then run callback.
def block_message(message, block, &callback)
unless block
message
@@ -424,9 +460,27 @@
original_message.unlock
end
@callback_threads << thread
+ if @all_callback
+ thread = Thread.new do
+ message.lock
+ status = message.status
+ @all_callback.call(status, original_message, message)
+ message.unlock
+ end
+ @callback_threads << thread
+ end
end
else
- log(WARN, "Got a message for an unknown identifier: #{message.identifier}. Did you forget to reload persistent requests?")
+ if @unknown_callback or @all_callback
+ thread = Thread.new do
+ message.lock
+ status = message.status
+ @unknown_callback.call(status, message) if @unknown_callback
+ @all_callback.call(status, nil, message) if @all_callback
+ message.unlock
+ end
+ @callback_threads << thread
+ end
end
else
log(DEBUG, 'Got message with no identifier')
@@ -447,6 +501,10 @@
unless message.load_only
message.write(@socket)
end
+
+ if message.type == 'RemovePersistentRequest'
+ @messages.delete(message.identifier)
+ end
end

# Reads a message from the server and returns an FCP::Message. Does not handle processing

Added: trunk/apps/rubyFreenet/rqueue/README
===================================================================
--- trunk/apps/rubyFreenet/rqueue/README    2006-06-19 01:02:29 UTC (rev 9310)
+++ trunk/apps/rubyFreenet/rqueue/README    2006-06-19 01:18:27 UTC (rev 9311)
@@ -0,0 +1,13 @@
+= Freenet Queue Downloader =
+
+Simply run ./rqueue
+
+All requests are stored in the global queue, all completed requests are removed from the queue.
+
+== Web Interface ==
+The web interface listens on all IP addresses, not just localhost. Point your web browser to http://127.0.0.1:9877/
+
+Paste URLs in to the box, they'll be downloaded. Click download to get the file through your browser, remove removes it from the list of files.
+
+== DRuby Interface ==
+The DRb interface listens on druby://127.0.0.1:9876. Run rdoc over rqueue to get documentation.
\ No newline at end of file

Modified: trunk/apps/rubyFreenet/rqueue/rqueue
===================================================================
--- trunk/apps/rubyFreenet/rqueue/rqueue    2006-06-19 01:02:29 UTC (rev 9310)
+++ trunk/apps/rubyFreenet/rqueue/rqueue    2006-06-19 01:18:27 UTC (rev 9311)
@@ -1,44 +1,145 @@
#!/usr/bin/ruby
+require 'drb'
+require 'erb'
+require 'thread'
+require 'webrick'
+require 'cgi'
+require '../freenet'
+include WEBrick

-require '../freenet'
-puts 'Connecting to node, please wait...'
-client = Freenet::FCP::Client.new
-client.watch_global
-puts 'Client connected'
-loop do
- command = STDIN.readline
- case command.strip
- when 'exit', 'quit'
- exit 0
- else
- begin
- u = Freenet::URI.new(command)
-
- client.get(u, false, 'Persistence'=>'reboot', 'Global'=>'true', 'Verbosity'=>'1') do |status, request, response|
- puts status
- case status
- when :finished
- puts 'Data finished'
- if u.path
- path = u.path.gsub('/','_').strip
- path = 'download_'+random(100000).to_s if path == ''
- File.open(path, 'w') do |f|
- f.write(response.data)
- end
+# DRuby server
+class QueueServer
+ def initialize
+ @client = Freenet::FCP::Client.new
+ @client.watch_global
+ @status = {}
+ @status_mutex = Mutex.new
+ end
+
+ # Add URIs to the queue. Takes a variable argument list, each argument should be one URI. Returns the list
+ # of URIs actually added.
+ def add_uris(*args)
+ queued = []
+ args.each do |uri|
+ begin
+ next if uri == ''
+ @client.get(uri, false, 'Persistence'=>'reboot', 'Global'=>'true', 'Verbosity'=>'1') do |status, request, response|
+ case status
+ when :failed
+ @status_mutex.lock
+ @status[request.items['URI']][:status] = 'Failed'
+ @status_mutex.unlock
+ when :progress
+ @status_mutex.lock
+ data = @status[request.items['URI']]
+ data[:total] = request.items['Total']
+ data[:required] = request.items['Required']
+ data[:failed] = request.items['Failed']
+ data[:fatally_failed] = request.items['FatallyFailed']
+ data[:succeeded] = request.items['Succeeded']
+ data[:finalised] = request.items['Finalized']
+ data[:content_type] = response.content_type
+ @status_mutex.unlock
+ when :found
+ @status_mutex.lock
+ @status[request.items['URI']][:status] = 'Data Found'
+ @status_mutex.unlock
+ when :finished
+ @status_mutex.lock
+ @status[request.items['URI']][:status] = 'Finished'
+ @status[request.items['URI']][:data] = response.data
+ @status[request.items['URI']][:content_type] = response.content_type
+ @status_mutex.unlock
+ @client.remove_request(request)
end
- raise Freenet::FCP::RequestFinished.new
- when :failed
- raise Freenet::FCP::RequestFinished.new
- when :error
- puts response
- raise Freenet::FCP::RequestFinished.new
- when :status
- puts response.items
end
- puts 'Callback finished'
+ queued << uri
+ @status_mutex.lock
+ @status[uri] = {:status=>:started, :data=>nil, :total=>0, :required=>0, :failed=>0, :fatally_failed=>0,
+ :succeeded=>0, :finalised=>false, :content_type=>''}
+ @status_mutex.unlock
+ rescue Freenet::URIError
end
- rescue Freenet::URIError => e
- puts 'Invalid URI'
end
+ queued
end
-end
\ No newline at end of file
+
+ # Get a list of all URIs and their status, a hash in the format:
+ # uri => {:status=>(:started, 'Failed', 'Data Found', 'Finished), :data=>nil or file data, :total=># of split file blocks,
+ # :failed=>Blocks failed, :fatally_failed=>Blocks that cannot be retrieved, :succeeded=>Blocks downloaded,
+ # :finalised=>True if the block count is final, false if it may change, :content_type=>data's type}
+ def status
+ @status_mutex.lock
+ s = @status.dup
+ @status_mutex.unlock
+ s
+ end
+
+ # Remove the URI from the queue.
+ def remove(uri)
+ @status_mutex.lock
+ if @status[uri]
+ @client.remove_request(@status[uri]) if @status[uri][:status] != 'Finished'
+ @status.delete(uri)
+ end
+ @status_mutex.unlock
+ true
+ end
+end
+
+class QueueWeb < HTTPServlet::AbstractServlet
+ def initialize(server)
+ super(server)
+ DRb.start_service()
+ @drb = DRbObject.new(nil, 'druby://localhost:9876')
+ File.open('templates/index.rhtml') do |f|
+ @template = ERB.new(f.read, nil, '-')
+ end
+ end
+
+ def do_GET(req, res)
+ case req.path
+ when '/download'
+ params = CGI::parse(req.query_string)
+ status = @drb.status
+ item = status[params['uri'][0]]
+ res['Content-Type'] = item[:content_type] if item[:content_type] != ''
+ res.body = item[:data]
+ when '/remove'
+ params = CGI::parse(req.query_string)
+ @drb.remove(params['uri'][0])
+ res.status = 302
+ res['Location'] = '/'
+ else
+ res['Content-Type'] = 'text/html'
+ @status = @drb.status
+ res.body = @template.result(binding)
+ end
+ end
+
+ def do_POST(req, res)
+ case req.path
+ when '/add_uris'
+ params = CGI::parse(req.body)
+ params['uris'].each do |uris|
+ @drb.add_uris(*(uris.split(/\r|\n/)))
+ end
+ res.status = 302
+ res['Location'] = '/'
+ else
+ end
+ end
+end
+
+server = QueueServer.new
+DRb.start_service('druby://localhost:9876', server)
+
+s = HTTPServer.new(:Port=>9877)
+s.mount('/', QueueWeb)
+trap("INT") do
+ s.shutdown
+ DRb.stop_service
+end
+s.start
+
+DRb.thread.join if DRb.thread
\ No newline at end of file

Added: trunk/apps/rubyFreenet/rqueue/templates/index.rhtml
===================================================================
--- trunk/apps/rubyFreenet/rqueue/templates/index.rhtml    2006-06-19 01:02:29 UTC (rev 9310)
+++ trunk/apps/rubyFreenet/rqueue/templates/index.rhtml    2006-06-19 01:18:27 UTC (rev 9311)
@@ -0,0 +1,41 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
+    "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
+<html>
+    <head>
+        <title>QueueWeb</title>
+    </head>
+    <body>
+        <h1>QueueWeb</h1>
+        <h2>Current Requests</h2>
+        <table>
+            <thead>
+                <tr>
+                    <th>URI</th>
+                    <th>Status</th>
+                    <th>Download</th>
+                    <th>Remove</th>
+                </tr>
+            </thead>
+            <tbody>
+                <% @status.each do |uri, status| %>
+                <tr>
+                    <td><%=uri%></td>
+                    <td><%=status[:status]%></td>
+                    <td><%if status[:status] == 'Finished'%>
+                        <a href="/download?uri=<%=CGI::escape(uri)%>">Download</a>
+                    <%end%></td>
+                    <td><%if ['Finished', 'Failed'].include? status[:status] %>
+                        <a href="/remove?uri=<%=CGI::escape(uri)%>">Remove</a>
+                    <%end%></td>
+                </tr>
+                <% end %>
+            </tbody>
+        </table>
+        
+        <h2>Add URIs</h2>
+        <form action="/add_uris" method="post" accept-charset="utf-8">
+            <textarea name="uris" rows="8" cols="40"></textarea><br />
+            <input type="submit" value="Continue &rarr;" />
+        </form>
+    </body>
+</html>
\ No newline at end of file