[Cppfcplib] r14333 - trunk/apps/CppFCPLib

Top Page
Delete this message
Reply to this message
Author: mkolar
Date:  
To: cppfcplib
Subject: [Cppfcplib] r14333 - trunk/apps/CppFCPLib
Author: mkolar
Date: 2007-07-25 15:14:22 +0000 (Wed, 25 Jul 2007)
New Revision: 14333

Modified:
trunk/apps/CppFCPLib/JobTicket.cpp
trunk/apps/CppFCPLib/JobTicket.h
trunk/apps/CppFCPLib/Node.cpp
trunk/apps/CppFCPLib/Node.h
trunk/apps/CppFCPLib/NodeThread.cpp
trunk/apps/CppFCPLib/NodeThread.h
Log:
* collection for local and global jobs
* modifyPeerNote
* listPersistenRequest



Modified: trunk/apps/CppFCPLib/JobTicket.cpp
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.cpp    2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/JobTicket.cpp    2007-07-25 15:14:22 UTC (rev 14333)
@@ -9,7 +9,7 @@
using namespace FCPLib;

JobTicket::Ptr
-JobTicket::factory(std::string id, Message::Ptr cmd, bool keep)
+JobTicket::factory(std::string id, Message::Ptr cmd)
{
log().log(NOISY, "Creating " + cmd->getHeader());
Ptr ret( new JobTicket() );
@@ -17,8 +17,6 @@
ret->id = id;
ret->cmd = cmd;

- ret->keep = keep;
-
ret->lock.acquire();
ret->reqSentLock.acquire();


Modified: trunk/apps/CppFCPLib/JobTicket.h
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.h    2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/JobTicket.h    2007-07-25 15:14:22 UTC (rev 14333)
@@ -19,14 +19,19 @@
namespace FCPLib {

class NodeThread;
+class Node;

class JobTicket {
+public:
+ typedef boost::shared_ptr<JobTicket > Ptr;
+private:
std::string id;
Message::Ptr cmd;

Response nodeResponse;

bool keep;
+ bool global;

std::string repr;
bool isReprValid;
@@ -52,22 +57,20 @@
nodeResponse.push_back(m);
}

- JobTicket() : isReprValid(false), _isFinished(false) {}
-public:
- typedef boost::shared_ptr<JobTicket > Ptr;
+ JobTicket() : keep(false), global(false), isReprValid(false), _isFinished(false) {}
+ static Ptr factory(std::string id, Message::Ptr cmd);

- static Ptr factory(std::string id, Message::Ptr cmd, bool keep);
-
+ JobTicket& setKeep( bool x ) { keep = x; return *this; };
+ JobTicket& setGlobal( bool x ) { global = x; return *this; };
void setCallback( boost::function<void (int, const ServerMessage::Ptr)> f )
{
this->f = f;
}
-
void setCallback( void (*f)(int, const ServerMessage::Ptr) )
{
this->f = f;
}
-
+public:
const std::string& getCommandName() const;
const std::string& getId() const;
const Message::Ptr getCommand() const;
@@ -82,12 +85,15 @@
}
const std::string& toString();

+ bool isGlobal() const { return global; }
+
bool isFinished()
{
ZThread::Guard<ZThread::Mutex> g(access);
return _isFinished;
}

+ friend class Node;
friend class NodeThread;
};


Modified: trunk/apps/CppFCPLib/Node.cpp
===================================================================
--- trunk/apps/CppFCPLib/Node.cpp    2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/Node.cpp    2007-07-25 15:14:22 UTC (rev 14333)
@@ -59,7 +59,7 @@
m->setField("Name", name);
m->setField("ExpectedVersion", "2.0");

- JobTicket::Ptr job = JobTicket::factory("", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "Node constructor: waiting for response to ClientHello");
@@ -84,7 +84,7 @@

m->setField("NodeIdentifier", identifier);

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for Peer message");
@@ -104,7 +104,7 @@
if (fields.hasField("WithMetadata")) m->setField("WithMetadata", fields.getField("WithMetadata"));
if (fields.hasField("WithVolatile")) m->setField("WithVolatile", fields.getField("WithVolatile"));

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for EndListPeers message");
@@ -123,7 +123,7 @@
Message::Ptr m = Message::factory( std::string("ListPeerNotes") );
m->setField("NodeIdentifier", identifier);

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for EndListPeerNotes message");
@@ -144,7 +144,7 @@
else
m->setField("URL", value);

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m);
clientReqQueue->put(job);

log().log(DEBUG, "waiting for Peer message");
@@ -164,7 +164,7 @@

m->setFields(message);

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for Peer message");
@@ -188,7 +188,7 @@
if (fields.hasField("IsDisabled")) m->setField("IsDisabled", fields.getField("IsDisabled"));
if (fields.hasField("IsListenOnly")) m->setField("IsListenOnly", fields.getField("IsListenOnly"));

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for Peer message");
@@ -203,15 +203,15 @@
PeerNote
Node::modifyPeerNote(const std::string & nodeIdentifier,
const std::string & noteText,
- int peerNoteType = 1)
+ int peerNoteType)
{
Message::Ptr m = Message::factory( std::string("ModifyPeerNote") );

m->setField("NodeIdentifier", nodeIdentifier);
m->setField("NoteText", Base64::base64Encode((const unsigned char*)noteText.c_str(), noteText.size()));
- m->setField("PeerNoteType", "1"); // TODO: change to peerNoteType once it is used
+ m->setField("PeerNoteType", peerNoteType);

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for PeerNote message");
@@ -231,7 +231,7 @@

m->setField("NodeIdentifier", identifier);

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for PeerRemoved message");
@@ -253,7 +253,7 @@
if (fields.hasField("WithPrivate")) m->setField("WithPrivate", fields.getField("WithPrivate"));
if (fields.hasField("WithVolatile")) m->setField("WithVolatile", fields.getField("WithVolatile"));

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for NodeData message");
@@ -279,7 +279,7 @@
if (fields.hasField("WithShortDescription")) m->setField("WithShortDescription", fields.getField("WithShortDescription"));
if (fields.hasField("WithLongDescription")) m->setField("WithLongDescription", fields.getField("WithLongDescription"));

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for ConfigData message");
@@ -298,7 +298,7 @@
if (m->getHeader() != "ModifyConfig")
throw std::logic_error("ModifyConfig message expected, " + m->getHeader() + " received");

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for ConfigData message");
@@ -322,7 +322,7 @@
if (write)
m->setField("WantWriteDirectory", "true");

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for TestDDAReply");
@@ -344,7 +344,7 @@
if (readContent != "")
m->setField("ReadContent", readContent);

- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for TestDDAComplete");
@@ -426,7 +426,7 @@
Message::Ptr m = Message::factory( std::string("GenerateSSK") );
m->setField("Identifier", identifier);

- JobTicket::Ptr job = JobTicket::factory( identifier, m, false);
+ JobTicket::Ptr job = JobTicket::factory( identifier, m );
clientReqQueue->put(job);

log().log(DEBUG, "waiting for SSKKeypair message");
@@ -461,7 +461,7 @@

m->setStream(s, dataLength);

- JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m, false);
+ JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
clientReqQueue->put(job);

job->waitTillReqSent(globalCommandsTimeout); // assure that there is a response
@@ -491,7 +491,7 @@
m->setField("UploadFrom", "redirect");
m->setField("TargetURI", target);

- JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m, false);
+ JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
log().log(DEBUG, job->toString());
clientReqQueue->put(job);

@@ -567,7 +567,7 @@
if (!r.readDirectory)
m->setField("FileHash", fields.getField("FileHash"));

- JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m, false);
+ JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
log().log(DEBUG, job->toString());
clientReqQueue->put(job);

@@ -604,7 +604,7 @@
m->setField("Identifier", id);
m->setField("DontPoll", Converter::toString( dontPoll ));

- JobTicket::Ptr job = JobTicket::factory( id, m, false );
+ JobTicket::Ptr job = JobTicket::factory( id, m );
clientReqQueue->put(job);

return job;
@@ -617,23 +617,19 @@
m->setField( "Enabled", Converter::toString( enabled ) );
m->setField( "VerbosityMask", boost::lexical_cast<std::string>(verbosity) );

- JobTicket::Ptr job = JobTicket::factory( "", m, false );
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);
}

-MessagePtrContainer
-Node::listPersistentRequest()
+void
+Node::refreshPersistentRequest()
{
Message::Ptr m = Message::factory( std::string("ListPersistentRequest") );
- JobTicket::Ptr job = JobTicket::factory( "", m, false);
+ JobTicket::Ptr job = JobTicket::factory( "", m );
clientReqQueue->put(job);

- log().log(DEBUG, "waiting for SSKKeypair message");
- job->wait(globalCommandsTimeout);
+ // persistent jobs will be updated

- Response resp = job->getResponse();
- checkProtocolError(resp); // throws
-
- // hmmm... this does not work probably as messages will contain Identifiers and will be assigned to other jobs...
- return createResult<MessagePtrContainer, VectorWithoutLastConverter>( resp );
+ log().log(DEBUG, "waiting for EndListPersistentRequests message");
+ job->wait(globalCommandsTimeout);
}

Modified: trunk/apps/CppFCPLib/Node.h
===================================================================
--- trunk/apps/CppFCPLib/Node.h    2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/Node.h    2007-07-25 15:14:22 UTC (rev 14333)
@@ -63,7 +63,7 @@
Message::Ptr addPeer(const std::string &, bool isURL);
Message::Ptr addPeer(const std::map<std::string, std::string> &message);
Message::Ptr modifyPeer(const std::string &, const AdditionalFields& = AdditionalFields());
- PeerNote modifyPeerNote(const std::string &, const std::string &, int);
+ PeerNote modifyPeerNote(const std::string &, const std::string &, int = 1);
Message::Ptr removePeer(const std::string &);

Message::Ptr getNode(const AdditionalFields& = AdditionalFields());
@@ -95,7 +95,7 @@
JobTicket::Ptr subscribeUSK(const std::string, const std::string, bool);

void watchGlobal( bool enabled, int verbosity );
- MessagePtrContainer listPersistentRequest();
+ void refreshPersistentRequest();
};
}


Modified: trunk/apps/CppFCPLib/NodeThread.cpp
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.cpp    2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/NodeThread.cpp    2007-07-25 15:14:22 UTC (rev 14333)
@@ -1,9 +1,12 @@


+#include <ctime>
+#include <boost/lexical_cast.hpp>
+
#include "NodeThread.h"
#include "Log.h"
-#include <ctime>

+
using namespace FCPLib;
using namespace ZThread;

@@ -85,7 +88,7 @@
log().log(NOISY, "sendClientReq : top");
if (job->getCommandName() != "WatchGlobal") {
log().log(NOISY, "sendClientReq : about to add the job to the map");
- jobs[job->getId()] = job;
+ jobs[job->isGlobal() ? 1 : 0][job->getId()] = job;
log().log(NOISY, "sendClientReq : added the job to the map");
}

@@ -99,19 +102,28 @@
JobTicket::Ptr job;
std::map<std::string, JobTicket::Ptr>::iterator it;

- it = jobs.find(message->getIdOfJob());
- if (it == jobs.end()) {
+ std::string tmp = message->getMessage()->getField("Global");
+ tmp = tmp == "" ? "false" : tmp;
+ int isGlobal = boost::lexical_cast<int>(tmp);
+
+ it = jobs[isGlobal].find(message->getIdOfJob());
+ if (it == jobs[isGlobal].end()) {
log().log(DETAIL, "doMessage : received " + message->getMessage()->getHeader() + ", cannot find " + message->getIdOfJob() + " in started jobs");
/// message from global queue or error
Message::Ptr m = message->getMessage();
- if ( m->getField("Identifier") == "" ) { // error
+ if (!isGlobal) { // error
log().log(DEBUG, "doMessage : received error message");
// TODO: create a mean of passing error messages to client programme
return;
} else { // global queue, create a job
log().log(DEBUG, "doMessage : received message from a global queue");
- JobTicket::Ptr job = JobTicket::factory(m->getField("Identifier"), m, false);
- jobs[m->getField("Identifier")] = job;
+ if ( m->getField("Identifier") == "" ) {
+ // should never happen
+ log().log(ERROR, "doMessage : global message does not contain identifier !???");
+ return;
+ }
+ JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
+ jobs[1][m->getField("Identifier")] = job;
return;
}
}
@@ -125,7 +137,7 @@

if (!job->keep) {
log().log(NOISY, "doMessage : job should not be kept, erasing");
- jobs.erase( it );
+ jobs[isGlobal].erase( it );
}
}
else {

Modified: trunk/apps/CppFCPLib/NodeThread.h
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.h    2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/NodeThread.h    2007-07-25 15:14:22 UTC (rev 14333)
@@ -30,7 +30,7 @@
bool isAlive_;
ZThread::CountedPtr<std::exception> exception;

- std::map<std::string, JobTicket::Ptr > jobs;
+ std::map<std::string, JobTicket::Ptr > jobs[2]; // 0 -- local jobs, 1 -- global jobs

friend class Node;
NodeThread(std::string &host, int port, JobTicketQueuePtr clientReqQueue_) throw();