Author: sitharus Date: To: rubyfreenet Subject: [Rubyfreenet] r9400 - in trunk/apps/rubyFreenet: freenet/fcp rqueue
rqueue/templates
Author: sitharus
Date: 2006-06-29 09:18:56 +0000 (Thu, 29 Jun 2006)
New Revision: 9400
Modified:
trunk/apps/rubyFreenet/freenet/fcp/client.rb
trunk/apps/rubyFreenet/freenet/fcp/message.rb
trunk/apps/rubyFreenet/rqueue/queue_server.rb
trunk/apps/rubyFreenet/rqueue/templates/index.rhtml
Log:
Updated rqueue to monitor global get queue, now restart-safe!
Message is progressing towards immutable. Some changes still need to be made, but it is not locked anymore.
Client now calls all_messages callback correctly
Modified: trunk/apps/rubyFreenet/freenet/fcp/client.rb
===================================================================
--- trunk/apps/rubyFreenet/freenet/fcp/client.rb 2006-06-28 23:41:35 UTC (rev 9399)
+++ trunk/apps/rubyFreenet/freenet/fcp/client.rb 2006-06-29 09:18:56 UTC (rev 9400)
@@ -277,6 +277,15 @@
block_message(message, block, &callback)
end
+ # Put a 'complex' dir
+ def put_complex_dir(uri, details, block, options, &callback)
+ uri = uri.uri if uri.respond_to? :uri
+ message = Message.new('ClientPutComplexDir', nil, options, callback)
+ log(INFO, "#{message.identifier} Put complex dir queued")
+ send(message)
+ block_message(message, block, &callback)
+ end
+
# Get the request status for a persistent request. Not useful in synchronous systems.
# This will output some PersistentGet/PersistentPut requests, which don't invoke callbacks,
# SimpleProgress if Verbosity=1, DataFound for all GET requests and AllData if ReturnType=direct
@@ -310,6 +319,11 @@
send(message)
end
+ def list_persistent_requests()
+ message = Message.new('ListPersistentRequests', nil, nil)
+ send(message)
+ end
+
def unknown_message(&callback)
@unknown_callback = callback
end
@@ -327,18 +341,8 @@
message
else
loop do
- begin
- message.wait_for_response
- message.lock
- message.response.lock
- callback.call(message.status, message, message.response)
- message.response.unlock
- message.unlock
- rescue RequestFinished => e
- message.response.unlock
- message.unlock
- break
- end
+ message.wait_for_response
+ callback.call(message.status, message, message.response)
end
end
end
@@ -349,10 +353,6 @@
message = Message.new('ClientHello', nil, 'Name'=>@client_name, 'ExpectedVersion'=>'2.0')
send(message)
message.wait_for_response
- log(DEBUG, "Got NodeHello - Freenet #{message.response.items['Version']}")
- if message.response.items['Testnet'] == 'true'
- log(WARN, "Connected to Testnet, you have no anonymity!")
- end
end
# Logger utility method. Logger should be thread-safe
@@ -433,15 +433,8 @@
message.request = original_message
status = message.status
log(DEBUG, "Message found: #{original_message.type}. #{original_message.callback?} :#{status}")
- original_message.reply = message
+ original_message.continue_thread(message)
case status
- when :found
- log(INFO, 'Data found, requesting status')
- original_message.content_type = message.items['Metadata.ContentType']
- if original_message.items['Persistence'] != 'connection'
- original_message.data_found = true
- request_status(original_message.identifier, original_message.items['Global'] || false, false, false)
- end
when :failed
if message.items['Fatal'] == 'false'
if original_message.retries < 5
@@ -455,31 +448,24 @@
if original_message.callback?
thread = Thread.new do
- original_message.lock
- message.lock
original_message.callback(status)
- message.unlock
- original_message.unlock
end
@callback_threads << thread
- if @all_callback
- thread = Thread.new do
- message.lock
- status = message.status
- @all_callback.call(status, original_message, message)
- message.unlock
- end
- @callback_threads << thread
+ end
+ if @all_callback
+ thread = Thread.new do
+ status = message.status
+ @all_callback.call(status, original_message, message)
end
+ @callback_threads << thread
end
else
+ log(DEBUG, "Executing unknown callback")
if @unknown_callback or @all_callback
thread = Thread.new do
- message.lock
status = message.status
@unknown_callback.call(status, message) if @unknown_callback
@all_callback.call(status, nil, message) if @all_callback
- message.unlock
end
@callback_threads << thread
end
Modified: trunk/apps/rubyFreenet/freenet/fcp/message.rb
===================================================================
--- trunk/apps/rubyFreenet/freenet/fcp/message.rb 2006-06-28 23:41:35 UTC (rev 9399)
+++ trunk/apps/rubyFreenet/freenet/fcp/message.rb 2006-06-29 09:18:56 UTC (rev 9400)
@@ -7,6 +7,8 @@
# and thread safety. When a response is received the original FCP::Message gets its response attribute
# set to the response FCP::Message
#
+ # Please consider this object immutable. This status will be reinforced later.
+ #
# Attributes of interest
# [load_only] Only load in to the message queue, don't actually send. Used when restoring persistent
# requests if a client dies
@@ -38,32 +40,10 @@
end
- # Lock the object. Call before using in async situations
- def lock(delay = 5)
- until @mutex.try_lock
- sleep(delay)
- end
- end
-
- # Unlock. Call after using asychronously
- def unlock
- @mutex.unlock
- end
-
- def locked?
- @mutex.locked?
- end
-
- def try_lock
- @mutex.try_lock
- end
-
# Dispatch the callback. Private to FCP::Client
def callback(status)
@callback.call(status, self, @response) unless @callback.nil?
@@ -76,10 +56,8 @@
end
# Sets the reply. This is private to FCP::Client
- def reply=(response)
- lock
- @response = response
- unlock
+ def continue_thread(message)
+ @response = message
@this_thread.run if @this_thread
end
@@ -98,6 +76,8 @@
# [:error] An error in the FCP messages occured. This is caused by a bug in rubyFreenet
def status
case @type
+ when 'PersistentGet': :get
+ when 'PersistentPut': :put
when 'SSKKeypair': :keypair
when 'AllData': :finished
when 'PersistentGet': :pending
Modified: trunk/apps/rubyFreenet/rqueue/queue_server.rb
===================================================================
--- trunk/apps/rubyFreenet/rqueue/queue_server.rb 2006-06-28 23:41:35 UTC (rev 9399)
+++ trunk/apps/rubyFreenet/rqueue/queue_server.rb 2006-06-29 09:18:56 UTC (rev 9400)
@@ -14,6 +14,51 @@
def initialize
@client = Freenet::FCP::Client.new
@client.watch_global
+ @client.all_messages do |status, original_message, message|
+ begin
+ case status
+ when :get
+ uri = message.items['URI']
+ unless @status[uri]
+ @messages[uri] = message
+ new_uri = Freenet::URI.new(uri)
+ filename = new_uri.path.gsub('/','_')
+ @status_mutex.synchronize do
+ @status[uri] = {:uri=>uri, :status=>:started, :data=>nil, :total=>0, :required=>0, :failed=>0, :fatally_failed=>0,
+ :succeeded=>0, :finalised=>false, :progress=>0, :content_type=>'', :filename=>filename,
+ :added=>Time.now}
+ end
+ end
+ when :found
+ @status_mutex.synchronize do
+ begin
+ original_message = @messages.values.find{|f| f.identifier == message.identifier}
+ found = false
+ if original_message
+ status = @status[original_message.items['URI']]
+ found = true if status[:found]
+ puts 'sending request status'
+ unless found
+ @client.request_status(message.identifier, original_message.items['Global'], false, false)
+ end
+ status[:found] = true
+ end
+ rescue => e
+ puts "#{e}\n#{e.backtrace.join("\n")}"
+ puts "#{message}"
+ end
+ end
+ process_message(status, @messages.values.find{|f| f.identifier == message.identifier}, message)
+ else
+ request = @messages.values.find{|f| f.identifier == message.identifier}
+ process_message(status, request, message) if request
+ end
+ rescue => e
+ puts "#{e}\n#{e.backtrace.join("\n")}"
+ puts "#{message}"
+ end
+ end
+ @client.list_persistent_requests
@status = {}
@messages = {}
@status_mutex = Mutex.new
@@ -32,47 +77,7 @@
puts 'bad URI'
next
end
- @messages[uri] = @client.get(uri, false, 'Persistence'=>'reboot',
- 'Global'=>'true', 'Verbosity'=>'1', 'MaxRetries'=>'30') do |status, request, response|
- case status
- when :failed
- @status_mutex.lock
- @status[request.items['URI']][:status] = 'Failed'
- @client.remove_request(request)
- p response.items
- @status_mutex.unlock
- when :progress
- @status_mutex.lock
- data = @status[request.items['URI']]
- data[:total] = response.items['Total']
- data[:required] = response.items['Required']
- data[:failed] = response.items['Failed']
- data[:fatally_failed] = response.items['FatallyFailed']
- data[:succeeded] = response.items['Succeeded']
- data[:finalised] = response.items['Finalized']
- data[:content_type] = response.content_type
- data[:progress] = ((response.items['Suceeded'].to_f/response.items['Required'].to_f)*100).ceil
- @status_mutex.unlock
- when :found
- @status_mutex.lock
- @status[request.items['URI']][:status] = 'Data Found'
- @status_mutex.unlock
- when :finished
- @status_mutex.lock
- @status[request.items['URI']][:status] = 'Finished'
- @status[request.items['URI']][:progress] = 100
- if @temp_dir
- File.open(File.join(@temp_dir, @status[request.items['URI']][:filename]), 'w') do |file|
- file.write(response.data)
- end
- else
- @status[request.items['URI']][:data] = response.data
- end
- @status[request.items['URI']][:content_type] = response.items['Metadata.ContentType']
- @status_mutex.unlock
- @client.remove_request(request)
- end
- end
+ @messages[uri] = @client.get(uri, false, 'Persistence'=>'reboot', 'Global'=>'true', 'Verbosity'=>'1', 'MaxRetries'=>'30')
queued << uri
@status_mutex.lock
filename = new_uri.path.gsub('/','_')
@@ -84,6 +89,46 @@
end
queued
end
+
+ def process_message(status, request, response)
+ case status
+ when :failed
+ @status_mutex.lock
+ @status[request.items['URI']][:status] = 'Failed'
+ @client.remove_request(request)
+ p response.items
+ @status_mutex.unlock
+ when :progress
+ @status_mutex.lock
+ data = @status[request.items['URI']]
+ data[:total] = response.items['Total']
+ data[:required] = response.items['Required']
+ data[:failed] = response.items['Failed']
+ data[:fatally_failed] = response.items['FatallyFailed']
+ data[:succeeded] = response.items['Succeeded']
+ data[:finalised] = response.items['Finalized']
+ data[:content_type] = response.items['Metadata.ContentType']
+ data[:progress] = ((response.items['Succeeded'].to_f/response.items['Required'].to_f)*100).ceil
+ @status_mutex.unlock
+ when :found
+ @status_mutex.lock
+ @status[request.items['URI']][:status] = 'Data Found'
+ @status_mutex.unlock
+ when :finished
+ @status_mutex.lock
+ @status[request.items['URI']][:status] = 'Finished'
+ @status[request.items['URI']][:progress] = 100
+ if @temp_dir
+ File.open(File.join(@temp_dir, @status[request.items['URI']][:filename]), 'w') do |file|
+ file.write(response.data)
+ end
+ else
+ @status[request.items['URI']][:data] = response.data
+ end
+ @status[request.items['URI']][:content_type] = response.items['Metadata.ContentType']
+ @status_mutex.unlock
+ end
+ end
# Get a list of all URIs and their status, a hash in the format:
# uri => {:status=>(:started, 'Failed', 'Data Found', 'Finished), :data=>nil or file data, :total=># of split file blocks,
@@ -100,7 +145,7 @@
def remove(uri)
@status_mutex.lock
if @messages[uri]
- @client.remove_request(@messages[uri]) if @status[uri][:status] != 'Finished'
+ @client.remove_request(@messages[uri])
if @temp_dir
status = @status[uri]
begin