Author: sitharus Date: To: rubyfreenet Subject: [Rubyfreenet] r9166 - in trunk/apps/rubyFreenet: freenet
freenet/fcp indexr
Author: sitharus
Date: 2006-06-12 10:21:47 +0000 (Mon, 12 Jun 2006)
New Revision: 9166
Modified:
trunk/apps/rubyFreenet/freenet/fcp/client.rb
trunk/apps/rubyFreenet/freenet/fcp/message.rb
trunk/apps/rubyFreenet/freenet/uri.rb
trunk/apps/rubyFreenet/indexr/index.rhtml
trunk/apps/rubyFreenet/indexr/indexr.rb
Log:
Added timeouts to messages for those cases when you want it
Some changes to indexr, it might work properly some day
Modified: trunk/apps/rubyFreenet/freenet/fcp/client.rb
===================================================================
--- trunk/apps/rubyFreenet/freenet/fcp/client.rb 2006-06-11 22:51:44 UTC (rev 9165)
+++ trunk/apps/rubyFreenet/freenet/fcp/client.rb 2006-06-12 10:21:47 UTC (rev 9166)
@@ -176,7 +176,7 @@
async = true if callback
options = {'IgnoreDS'=>false,
'DSOnly'=>false,
- 'Verbosity'=>1,
+ 'Verbosity'=>0,
'ReturnType'=>'direct',
'PriorityClass'=>1,
'Persistence'=>'reboot',
@@ -184,6 +184,7 @@
options['Persistence'] = 'connection' unless async
options['URI'] = uri
message = Message.new('ClientGet', nil, options, callback)
+ log(INFO, "#{message.identifier} GET queued")
send(message, async)
if async
message
@@ -235,7 +236,7 @@
uri = uri.uri if uri.respond_to? :uri
async = true if callback
options = {'Metadata.ContentType' => 'application/octet-stream',
- 'Verbosity' => 1,
+ 'Verbosity' => 0,
'MaxRetries' => 10,
'Persistence' => 'reboot',
'UploadFrom' => 'direct'}.merge(options || {})
@@ -243,6 +244,7 @@
options['URI'] = uri
message = Message.new('ClientPut', data, options, callback)
message.content-type = options['Metadata.ContentType']
+ log(INFO, "#{message.identifier} PUT queued")
send(message, async)
if async
message
@@ -269,7 +271,7 @@
def putdir(uri, dir, async=false, options=nil, &callback)
uri = uri.uri if uri.respond_to? :uri
async = true if callback
- options = {'Verbosity' => 1,
+ options = {'Verbosity' => 0,
'MaxRetries' => 10,
'PriorityClass' => 3,
'DefaultName' => 'index.html',
@@ -278,6 +280,7 @@
options['Persistence'] = 'connection' unless async
options['URI'] = uri
message = Message.new('ClientPutDiskDir', nil, options, callback)
+ log(INFO, "#{message.identifier} PUTDIR queued")
send(message, async)
if async
message
@@ -332,7 +335,6 @@
# it may block if it's in use.
# param asynchronous Whether the reader should block until a response is received
def send(message, asynchronous = false)
- log(DEBUG, "Queuing #{message.type} - #{message.identifier}")
@message_queue << message
message
end
@@ -348,8 +350,10 @@
@callback_threads.delete(thread) if thread.join(0.1)
rescue RequestFinished=>e
@messages.delete(e.message)
+ @callback_threads.delete(thread)
rescue Exception=>e
puts "Callback exception #{e}"
+ @callback_threads.delete(thread)
end
end
@@ -371,6 +375,16 @@
message = read_message
dispatch_message(message)
end
+
+ @messages.each do |id, message|
+ if message.timeout and Time.now > (message.added + message.timeout)
+ thread = Thread.new do
+ message.callback(:timeout)
+ raise RequestFinished.new(message.identifier)
+ end
+ @callback_threads << thread
+ end
+ end
end
rescue Exception => e
log(FATAL, "Exception in socket thread: #{e.class}: #{e.message}")
@@ -382,8 +396,8 @@
# Dispatch a message received from the server. Sets the reply on the original message and then
# calls any callback. See above for notes on callbacks.
def dispatch_message(message)
- log(DEBUG, "Dispatching #{message.type} - #{message.identifier}")
if message.identifier
+ log(INFO, "#{message.identifier}: recieved #{message.type}")
original_message = @messages[message.identifier]
if original_message
original_message.reply = message
@@ -460,26 +474,19 @@
# raises FCPConnectionError if socket isn't connected
def send_message(message)
raise FCPConnectionError.new('Socket does not exist') unless @socket
- log(DEBUG, "Sending #{message.type} #{message.identifier}")
@messages[message.identifier] ||= message
unless message.load_only
- log(DEBUG, "W: #{message.type}")
@socket.write(message.type+"\n")
message.items.each do |key, value|
- log(DEBUG, "W: #{key}=#{value}")
@socket.write("#{key}=#{value}\n")
end
if message.data
@socket.write("DataLength=#{message.data.length}\n")
- log(DEBUG, "W: DataLength=#{message.data.length}")
@socket.write("Data\n")
- log(DEBUG, "W: Data")
@socket.write(message.data)
- log(DEBUG, "W: #{message.data}")
else
@socket.write("EndMessage\n")
- log(DEBUG, "W: EndMessage")
end
@socket.write("\n")
end
@@ -496,7 +503,6 @@
data = nil
loop do
line = @socket.readline.strip
- log(DEBUG, "R: #{line}")
case line
when "End","EndMessage"
break
@@ -504,7 +510,6 @@
key, value = line.split('=', 2)
items[key] = value
when "Data"
- log(DEBUG, "Reading #{items['DataLength']} bytes")
data = @socket.read(items['DataLength'].to_i)
break
else
# [type] The FCP message type
# [data] Any data to send with the message
@@ -30,10 +30,17 @@
@identifier = 'ClientHello'
@items.delete('Identifier')
end
+
+ if @items['Timeout']
+ @timeout = @items['Timeout']
+ @items.delete('Timeout')
+ end
+
@callback = callback
@mutex = Mutex.new
@load_only = false
@this_thread = nil
+ @added = Time.now
end
# Lock the object. Call before using in async situations
Modified: trunk/apps/rubyFreenet/freenet/uri.rb
===================================================================
--- trunk/apps/rubyFreenet/freenet/uri.rb 2006-06-11 22:51:44 UTC (rev 9165)
+++ trunk/apps/rubyFreenet/freenet/uri.rb 2006-06-12 10:21:47 UTC (rev 9166)
@@ -1,3 +1,5 @@
+require 'cgi'
+
module Freenet
# Represents a Freenet URI. Provides manipulation with awareness of the Freenet structure, such
# as USK versioning.
@@ -15,6 +17,7 @@
def initialize(uri = nil)
return if uri.nil?
uri = uri.respond_to?(:uri) ? uri.uri : uri.dup
+ uri = CGI::unescape(uri)
case uri
when /^\/?freenet:/
uri.sub!(/^\/?freenet:/,'')
@@ -78,6 +81,13 @@
return uri.uri
end
rescue URIError => e # We have a fragment
+ if uri =~ /([^#\?]+)/
+ uri = uri.match(/([^#\?]+)/)[1]
+ else
+ raise URIError.new
+ end
+ raise URIError.new if uri =~ /[^\/]+:/
+
case @type
when 'KSK','CHK' #No point merging paths for this type...
when 'USK'
@@ -148,6 +158,9 @@
when /^\//
return new_path
else
+ unless old_path =~ /\/$/
+ old_path = old_path.gsub(/\/[^\/]+$/, '/')
+ end
return "#{old_path}#{new_path}"
end
end
Modified: trunk/apps/rubyFreenet/indexr/index.rhtml
===================================================================
--- trunk/apps/rubyFreenet/indexr/index.rhtml 2006-06-11 22:51:44 UTC (rev 9165)
+++ trunk/apps/rubyFreenet/indexr/index.rhtml 2006-06-12 10:21:47 UTC (rev 9166)
@@ -51,12 +51,17 @@
<h1>Indexr: <%=Time.now.strftime("%d/%m/%Y")%></h1>
<h2>Pages</h2>
<div id="contents">
- <% category = ''%>
+ <% category = ''
+ site = ''%>
<% @sorted_pages.each do |page| -%>
<% if category != page.category -%>
<h3><%= page.category.to_s %></h3>
<% category = page.category %>
<% end %>
+ <% if site != page.uri.site %>
+ <h4><%= page.uri.site %></h4>
+ <% site = page.uri.site%>
+ <% end %>
<div class="page">
<a href="/<%=page.uri.uri%>"><%=page.title%></a> <%=page.uri.path%> Rank: <%=((page.rank*100).ceil).to_f/100%><br />
<% if page.meta['description'] -%>
# Example for FreenetR useage.
#
@@ -29,6 +30,10 @@
@pages_semaphore = Mutex.new # Used to sync operations on @pages
@mutex = Mutex.new # Used for any other member variables
@mutex.lock
+
+ File.open('index.rhtml') do |f|
+ @template = ERB.new(f.read, nil, '-')
+ end
# Log in to the client. We specify a static client name so only one instance can run
# at a time.
@client = Freenet::FCP::Client.new('IndexR')
@@ -44,19 +49,19 @@
loop do
@mutex.lock
@process_threads.each do |thread|
- puts "Thread count: #{@process_threads.length}"
begin
if thread.join(0.5)
@process_threads.delete_if {|x| x==thread}
end
rescue Exception => e # Make sure a mistake in a thread doesn't kill everything... Could be done better
puts "Exception in processing thread: #{e.to_s}"
- puts e.backtrace.join("\n")
@process_threads.delete_if {|x| x==thread}
end
end
- puts "URIs waiting: #{@waiting_urls.length}, Total Scanned: #{@all_uris.length}"
- puts @waiting_urls.join("\n") if @waiting_urls.size < 3
+ @pages_semaphore.synchronize do
+ puts "URIs waiting: #{@waiting_urls.length}, Total Scanned: #{@all_uris.length}."
+ puts @waiting_urls.join("\n") if @waiting_urls.size < 3
+ end
@mutex.unlock
sleep(5)
# If there are no waiting URLs and no processing threads we're done.
@@ -64,6 +69,9 @@
break if @process_threads.empty? and @waiting_urls.empty?
end
puts 'All processing done'
+ START_PAGES.each do |uri|
+ @pages[uri].run_rank(20, @pages)
+ end
create_index_page # create the page
puts 'Disconnecting'
@client.disconnect # disconnect
@@ -74,13 +82,8 @@
def mine_page(uri, depth=0)
# Filter out pages we don't want (images, binaries)
case uri.path
- when /\.html$/
- when /\.htm$/
- when /\/$/
- when ''
- when nil
- else
- puts "Ignoring path '#{uri.path.to_s}'"
+ when /\.html$/, /\.htm$/, /\/$/, '', nil
+ when /\./
return
end
@@ -97,10 +100,13 @@
end
# Get the page
- @client.get(uri.uri, true, "Persistence"=>'connection','PriorityClass'=>3) do |status, message, response|
+ @client.get(uri.uri, true, "Persistence"=>'connection','PriorityClass'=>2) do |status, message, response|
@mutex.lock # Lock the object
case status
when :finished # If we have the data and it's text/html we process it.
+ @pages_semaphore.synchronize do
+ @waiting_urls.delete_if {|u| u == uri.uri}
+ end
page = response.data
if message.content_type =~ /^text\/html/
thread = Thread.new do # Do it in a new thread, just because.
@@ -114,20 +120,19 @@
else
puts "Unknown content type: #{message.content_type} URI:#{uri.uri}"
end
- when :failed, :error # If it failed then we ditch the URI
+ when :failed, :error, :timeout # If it failed then we ditch the URI
+ puts "URI failed: #{status} (#{response.items['ExtraDescription'] if response}): #{uri.uri}"
@pages_semaphore.synchronize do
@waiting_urls.delete_if {|u| u == uri.uri}
end
- when :redirect # In case of redirect we issue a new request for it.
- puts "Got redirect to #{response.items['RedirectURI']}"
+ when :redirect # In case of redirect we issue a new request for it.
+ @pages_semaphore.synchronize do
+ @waiting_urls.delete_if {|u| u == uri.uri}
+ end
@mutex.unlock
mine_page(Freenet::URI.new(response.items['RedirectURI']), depth)
@mutex.lock
- @pages_semaphore.synchronize do
- @waiting_urls.delete_if {|u| u == uri.uri}
- end
when :retrying
- puts "Got retry notification"
when :pending
end
@mutex.unlock
@@ -158,29 +163,22 @@
@pages[uri.uri] = page
end
rescue Exception => e # This is normally a bad URI or something that's not HTML
- puts "Processing Exception: #{e.to_s}\n#{e.backtrace.join("\n")}"
- ensure # Make sure we delete the URI from the waiting list no matter what
- @pages_semaphore.synchronize do
- @waiting_urls.delete_if {|u| u == uri.uri}
- end
+ puts "Processing Exception: #{e.to_s}"
end
end
# Create index page. Sort pages and send to the erb template
def create_index_page
template = nil
- File.open('index.rhtml') do |f|
- template = ERB.new(f.read, nil, '-')
- end
@pages_semaphore.synchronize do
@pages.delete_if {|uri, page| page.nil? }
- START_PAGES.each do |uri|
- @pages[uri].run_rank(5, @pages)
- end
@sorted_pages = @pages.values.sort
File.open('index.html', 'w') do |f|
- f.write(template.result(binding))
+ f.write(@template.result(binding))
end
+ File.open('pages_dump.yaml', 'w') do |f|
+ f.write(YAML.dump(@pages))
+ end
end
end
end
@@ -188,7 +186,7 @@
# A page on Freenet, uses beautifulsoup to parse non-XML-compliant HTML files,
class FreenetPage
- attr_accessor :url, :title, :meta, :uri, :links, :category
+ attr_accessor :url, :title, :meta, :uri, :links, :category, :uri
attr_reader :rank
def initialize(url, data)
@uri = Freenet::URI.new(url)
@@ -197,33 +195,40 @@
@links = []
@meta = {}
@category = :unknown
- @page = BeautifulSoup.new(data)
- raise FreenetPageError.new if @page.nil?
- @page.find_all('a').each do |link|
+ page = BeautifulSoup.new(data)
+ raise FreenetPageError.new if page.nil?
+ page.find_all('a').each do |link|
begin
- href = link['href'] or next
+ href = link['href'].to_s or next
@links << uri.merge(href)
rescue Freenet::URIError => e
- puts 'Invalid URI'
+ puts "Invalid URI #{href}"
end
end
- @title = @page.html.head.title.string || "No Title"
+ begin
+ @title = page.html.head.title.string.to_s || "No Title"
+ rescue
+ @title = "No Title"
+ end
- @page.find_all('meta').each do |meta|
+ page.find_all('meta').each do |meta|
if meta['name'] and meta['content']
- @meta[meta['name'].downcase] = meta['content']
+ @meta[meta['name'].downcase] = meta['content'].to_s
end
end
rescue Exception => e # Raise this if we can't make the page, for whatever reason.
- raise FreenetPageError.new
+ puts e
+ puts e.backtrace
+ raise FreenetPageError.new(e.message)
end
def run_rank(rank, all_pages)
@rank += rank
- pass_rank = rank.to_f/@page.find_all('a').size
+ page = BeautifulSoup.new(data)
+ pass_rank = rank.to_f/page.find_all('a').size
return if pass_rank < 0.01
- @page.find_all('a').each do |link|
+ page.find_all('a').each do |link|
begin
href = link['href'] or next
if uri = @uri.merge(href) and uri != @uri.uri and all_pages[uri]
@@ -236,10 +241,11 @@
# Magic voodoo of half-baked ideas. It's supposed to sort pages in to categories.
def categorize
- links = @page.find_all('a')
- imgs = @page.find_all('img')
+ page = BeautifulSoup.new(data)
+ links = page.find_all('a')
+ imgs = page.find_all('img')
total_count = 0
- tag_count = @page.find_all {|tag| total_count += 1}
+ tag_count = page.find_all {|tag| total_count += 1}
if imgs.size/total_count.to_f > 0.3 or (links.size/imgs.size) > 0.8
:image_gallery
elsif links.size/total_count.to_f > 0.3
@@ -262,7 +268,11 @@
def <=>(other)
return -1 if other.nil?
if @category == other.category
- @title <=> other.title
+ if @uri.site == other.uri.site
+ @title <=> other.title
+ else
+ @uri.site <=> other.uri.site
+ end
else
@category.to_s <=> other.category.to_s
end