The Original CHomP Software
Public Member Functions | Private Member Functions | Private Attributes | Friends | List of all members
chomp::multiwork::mwWorker Class Reference

This class defines a generic worker task object for the multi-work distributed computations framework. More...

#include <mwworker.h>

Inheritance diagram for chomp::multiwork::mwWorker:
chomp::multiwork::mwTask chomp::multiwork::mwSubWorker< dim, coord >

Public Member Functions

 mwWorker ()
 The default constructor. More...
 
virtual ~mwWorker ()
 The destructor. More...
 
int Work ()
 Runs the worker on this computer. More...
 
void KeepWorker (bool keep=true)
 Makes the worker keep running after the coordinator has disconnected. More...
 
- Public Member Functions inherited from chomp::multiwork::mwTask
 mwTask ()
 The default constructor. More...
 
virtual ~mwTask ()
 The destructor. More...
 
void Port (int number)
 Sets the port number for the communication or 0 to use none. More...
 
int Port () const
 Returns the current port number. More...
 
void ControlNumber (unsigned int number)
 Sets the control number for identification. More...
 
unsigned int ControlNumber () const
 Returns the currently set identification control number. More...
 
void TimeOut (int seconds)
 Sets the network connection time-out interval in seconds. More...
 
int TimeOut () const
 Returns the currently set network connection time-out interval. More...
 
int LogFile (const char *filename)
 Begins logging detailed communication debug information to the given file. More...
 
void LogFile (const mwTask &other)
 Uses another task's log file to log this task's information. More...
 
void LogClose ()
 Closes the log file and adds a line with the time information unless this log file was borrowed from another task. More...
 
int Add (const char *name, int port=-1)
 Adds an address to the list of computers to connect to at the beginning of working or coordinating. More...
 
int Load (const char *filename)
 Loads computer addresses from the given file. More...
 
int QuitWorkers ()
 Quits all the workers whose addresses were added with the 'Add' and 'Load' functions. More...
 

Private Member Functions

virtual int Process (mwData &data)
 This function is called to process a data portion and replace it with the result of computation. More...
 
virtual int Initialize (mwData &data)
 This function is called to process initialization data, if any. More...
 
int WorkOne (int fd)
 Runs one working session after having connected to the coordinator at the socket identified by 'fd'. More...
 
int SendMessageW (int fd, unsigned int code, const mwData &x) const
 Sends a message with data to the given socket as a worker. More...
 
int RecvMessageW (int fd, unsigned int &code, mwData &x) const
 Receives a message with data from the socket of a coordinator. More...
 

Private Attributes

bool keepWorker
 Should the worker remain running after coordinator disconnects? More...
 

Friends

class mwCoordinator
 

Additional Inherited Members

- Static Protected Member Functions inherited from chomp::multiwork::mwTask
static int SendMessage (int fd, unsigned int ctrl, unsigned int code, const mwData &x)
 Sends a message with data to the given socket. More...
 
static int RecvMessage (int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
 Receives a message with data from the given socket. More...
 
- Protected Attributes inherited from chomp::multiwork::mwTask
std::ofstream * logFile
 The debug log file stream. More...
 
std::vector< std::string > computers
 A list of workers or coordinators to connect to at start-up. More...
 
std::vector< int > ports
 A list of port numbers of workers to connect to at start-up. More...
 

Detailed Description

This class defines a generic worker task object for the multi-work distributed computations framework.

Each worker class defined by the user must inherit from this class.

Definition at line 54 of file mwworker.h.

Constructor & Destructor Documentation

◆ mwWorker()

chomp::multiwork::mwWorker::mwWorker ( )
inline

The default constructor.

Definition at line 119 of file mwworker.h.

119 : keepWorker (false)
120{
121 return;
122} /* mwWorker::mwWorker */
bool keepWorker
Should the worker remain running after coordinator disconnects?
Definition: mwworker.h:100

◆ ~mwWorker()

chomp::multiwork::mwWorker::~mwWorker ( )
inlinevirtual

The destructor.

Definition at line 124 of file mwworker.h.

125{
126 return;
127} /* mwWorker::~mwWorker */

Member Function Documentation

◆ Initialize()

int chomp::multiwork::mwWorker::Initialize ( mwData data)
inlineprivatevirtual

This function is called to process initialization data, if any.

This data is sent to a new worker immediately after the connection with the coordinator has been established. It should return mwOk on success or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual code.

Definition at line 144 of file mwworker.h.

145{
146 return mwOk;
147} /* mwWorker::Initialize */
@ mwOk
Everything is fine.
Definition: mwconfig.h:159

References chomp::multiwork::mwOk.

Referenced by WorkOne().

◆ KeepWorker()

void chomp::multiwork::mwWorker::KeepWorker ( bool  keep = true)
inline

Makes the worker keep running after the coordinator has disconnected.

Otherwise, the worker quits in that situation.

Definition at line 131 of file mwworker.h.

132{
133 keepWorker = keep;
134 return;
135} /* mwWorker::KeepWorkers */

References keepWorker.

◆ Process()

int chomp::multiwork::mwWorker::Process ( mwData data)
inlineprivatevirtual

This function is called to process a data portion and replace it with the result of computation.

It should return mwOk on success, mwReject to reject the data, or mwError in case of error to quit the program. Please, overload this function in the class you derive from the class 'mwWorker' and program it with the actual task.

Reimplemented in chomp::multiwork::mwSubWorker< dim, coord >.

Definition at line 139 of file mwworker.h.

140{
141 return mwOk;
142} /* mwWorker::Process */

References chomp::multiwork::mwOk.

Referenced by WorkOne().

◆ RecvMessageW()

int chomp::multiwork::mwWorker::RecvMessageW ( int  fd,
unsigned int &  code,
mwData x 
) const
inlineprivate

Receives a message with data from the socket of a coordinator.

Returns mwOk on success or mwError in the case of failure.

Definition at line 161 of file mwworker.h.

163{
164 // receive the message
165 unsigned int ctrl = 0;
166 int result = this -> RecvMessage (fd, ctrl, code, x);
167
168 // if there was an error then return its code
169 if (result != mwOk)
170 return result;
171
172 // if the control number is correct then finish successfully
173 if (ctrl == this -> ControlNumber ())
174 return mwOk;
175
176 // if the control number is wrong then return an error code
177 if (logFile)
178 *logFile << "Wrong control code received "
179 "from the coordinator: " << ctrl << "." << std::endl;
180 return mwError;
181} /* mwWorker::RecvMessageW */
std::ofstream * logFile
The debug log file stream.
Definition: mwtask.h:177
unsigned int ControlNumber() const
Returns the currently set identification control number.
Definition: mwtask.h:332
static int RecvMessage(int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
Receives a message with data from the given socket.
Definition: mwtask.h:437
@ mwError
A serious error occurred.
Definition: mwconfig.h:162

References chomp::multiwork::mwTask::ControlNumber(), chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, chomp::multiwork::mwOk, and chomp::multiwork::mwTask::RecvMessage().

Referenced by WorkOne().

◆ SendMessageW()

int chomp::multiwork::mwWorker::SendMessageW ( int  fd,
unsigned int  code,
const mwData x 
) const
inlineprivate

Sends a message with data to the given socket as a worker.

Returns mwOk on success and mwError in the case of failure.

Definition at line 151 of file mwworker.h.

153{
154 // prepare the negation of the control number to send
155 unsigned int ctrl = ~(this -> ControlNumber ());
156
157 // send the message with the control number and the message code
158 return this -> SendMessage (fd, ctrl, code, x);
159} /* mwWorker::SendMessageW */
static int SendMessage(int fd, unsigned int ctrl, unsigned int code, const mwData &x)
Sends a message with data to the given socket.
Definition: mwtask.h:427

References chomp::multiwork::mwTask::ControlNumber(), and chomp::multiwork::mwTask::SendMessage().

Referenced by WorkOne().

◆ Work()

int chomp::multiwork::mwWorker::Work ( )
inline

Runs the worker on this computer.

Returns mwOk or mwError.

Definition at line 387 of file mwworker.h.

388{
389 if (logFile)
390 {
391 *logFile << "Running as a WORKER." << std::endl;
392
393 // indicate what kind of network connection is in use
394 #if !mwNETWORK
395 *logFile << "There is no network in use, "
396 "so exiting right now." << std::endl;
397 #elif mwWXWIN
398 *logFile << "Using the sockets interface "
399 "provided by wxWindows." << std::endl;
400 #else
401 *logFile << "Using the standard sockets "
402 "for network connections." << std::endl;
403 #endif
404 }
405
406 // connect to the first comp. on the list that is running
407 int fd = -1;
408 for (unsigned cur = 0; cur < this -> computers. size (); ++ cur)
409 {
410 // retrieve the computer name and port from the list
411 int port = this -> ports [cur];
412 const char *name = this -> computers [cur]. c_str ();
413
414 // if no valid name or no port read, skip this data
415 if (!*name || !port)
416 continue;
417
418 // try connecting to the computer
419 fd = mwConnect (name, port);
420
421 // add an appropriate message to the log file
422 if (logFile)
423 {
424 *logFile << "Connection to " << name << ":" <<
425 port << ((fd < 0) ? " refused." :
426 " established.") << std::endl;
427 }
428 }
429
430 while (1)
431 {
432 // if not connected then try listening
433 if ((fd < 0) && (this -> Port () > 0))
434 {
435 // listen at the given port (use a very short queue)
436 if (logFile)
437 *logFile << "Waiting for a coordinator at "
438 "port " << this -> Port () << "." <<
439 std::endl;
440 int fdlisten = mwListen (this -> Port (), 1);
441
442 // if the listening failed, return with an error
443 if (fdlisten < 0)
444 {
445 if (logFile)
446 *logFile << "Error: This port is "
447 "probably in use." <<
448 std::endl;
449 return mwError;
450 }
451
452 // accept a connection and stop listening
453 std::string computer;
454 int timeout = this -> TimeOut ();
455 fd = mwAccept (fdlisten, computer, timeout);
456 mwDisconnect (fdlisten);
457
458 // if too much time elapsed, quit the job
459 if (fd == mwTimeOut)
460 {
461 if (logFile)
462 *logFile << "Time out. Exiting." <<
463 std::endl;
464 return mwOk;
465 }
466
467 // if an error occurred, quit the job
468 else if (fd < 0)
469 {
470 if (logFile)
471 *logFile << "Error while connecting "
472 "to a coordinator." <<
473 std::endl;
474 return mwError;
475 }
476
477 // report the fact of connection to the log file
478 if (logFile)
479 *logFile << "Connected to a coordinator "
480 "at '" << computer << "'." <<
481 std::endl;
482 }
483
484 // receive messages and work
485 int result = 0;
486 try
487 {
488 result = WorkOne (fd);
489 mwDisconnect (fd);
490 fd = -1;
491 }
492 catch (...)
493 {
494 mwDisconnect (fd);
495 if (logFile)
496 *logFile << "An exception was thrown while "
497 "processing data." << std::endl;
498 throw;
499 }
500
501 // quit if there is no point to listen to another coordinator
502 // or if an error occurred
503 if ((this -> Port () <= 0) || !keepWorker ||
504 (result != mwOk))
505 {
506 return result;
507 }
508
509 // add a line to the log file between working sessions
510 if (logFile)
511 *logFile << "============================" <<
512 std::endl;
513 }
514} /* mwWorker::Work */
int Port() const
Returns the current port number.
Definition: mwtask.h:321
int TimeOut() const
Returns the currently set network connection time-out interval.
Definition: mwtask.h:343
int timeout
The network communication time-out in seconds.
Definition: mwtask.h:171
std::vector< std::string > computers
A list of workers or coordinators to connect to at start-up.
Definition: mwtask.h:201
std::vector< int > ports
A list of port numbers of workers to connect to at start-up.
Definition: mwtask.h:204
int WorkOne(int fd)
Runs one working session after having connected to the coordinator at the socket identified by 'fd'.
Definition: mwworker.h:185
@ mwTimeOut
A connection time out has occurred.
Definition: mwlowlev.h:63
int mwListen(int port, int queuesize)
Begins listening at the given port.
int mwAccept(int fd, std::string &computer, int timeout=-1)
Waits for and accepts a connection at the given socket.
void mwDisconnect(int fd)
Disconnects the given socket.
int mwConnect(const char *name, int port)
Connects to the given computer at the given port.

References chomp::multiwork::mwTask::computers, keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwAccept(), chomp::multiwork::mwConnect(), chomp::multiwork::mwDisconnect(), chomp::multiwork::mwError, chomp::multiwork::mwListen(), chomp::multiwork::mwOk, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), chomp::multiwork::mwTask::ports, chomp::multiwork::mwTask::TimeOut(), chomp::multiwork::mwTask::timeout, and WorkOne().

◆ WorkOne()

int chomp::multiwork::mwWorker::WorkOne ( int  fd)
inlineprivate

Runs one working session after having connected to the coordinator at the socket identified by 'fd'.

Returns mwOK if everything was fine or mwError if some error occurred and the worker should better exit.

Definition at line 185 of file mwworker.h.

186{
187 // be ready to send the port number at which the worker is listening
188 bool sendPort = (this -> Port () > 0);
189 int timelimit = this -> TimeOut ();
190
191 while (1)
192 {
193 // disconnect if the coordinator is not responding
194 if (timelimit > 0)
195 {
196 // call the function 'select' to wait
197 // until data is available for reading
198 int portArray [1];
199 portArray [0] = fd;
200 int ioFlags [2];
201 ioFlags [0] = mwCanRead;
202 ioFlags [1] = mwNone;
203 int result = mwSelect (portArray, 1, fd, ioFlags,
204 timelimit);
205
206 // disconnect in case of an I/O error
207 if (result == mwError)
208 {
209 if (logFile)
210 *logFile << "Error while "
211 "waiting for data from the "
212 "coordinator." << std::endl;
213 return mwError;
214 }
215
216 // finish if disconnected
217 if (result == mwLost)
218 {
219 if (logFile)
220 *logFile << "Disconnected while "
221 "waiting for data from the "
222 "coordinator." << std::endl;
223 return mwOk;
224 }
225
226 // finish if not responding
227 if (result == mwTimeOut)
228 {
229 if (logFile)
230 *logFile << "Time out reached while "
231 "waiting for data from the "
232 "coordinator." << std::endl;
233 return mwOk;
234 }
235 }
236
237 // receive a message
238 mwData msg;
239 unsigned int code = 0;
240 int result = RecvMessageW (fd, code, msg);
241
242 // disconnect in case of an I/O error
243 if (result == mwError)
244 {
245 if (logFile)
246 *logFile << "Error while receiving data "
247 "from the coordinator." << std::endl;
248 return mwError;
249 }
250
251 // disconnect if the connection was lost
252 if (result == mwLost)
253 {
254 if (logFile)
255 *logFile << "The connection closed "
256 "by the coordinator." << std::endl;
257 return mwOk;
258 }
259
260 // initialize the worker if requested to
261 if (code & mwInitMsg)
262 {
263 if (logFile)
264 *logFile << "Initializing the worker." <<
265 std::endl;
266
267 // update the 'keepWorker' flag if requested to
268 if (code & mwKeepMsg)
269 keepWorker = true;
270 if (code & mwDontKeepMsg)
271 keepWorker = false;
272
273 // initialize the data
274 int initResult = Initialize (msg);
275
276 // if the initialization fails then quit the work
277 if (initResult != mwOk)
278 {
279 if (logFile)
280 *logFile << "The initialization "
281 "failed." << std::endl;
282 return mwError;
283 }
284 }
285
286 // disconnect if requested to
287 if (code & mwByeMsg)
288 {
289 // update the 'keepWorker' flag if requested to
290 if (code & mwKeepMsg)
291 keepWorker = true;
292 if (code & mwDontKeepMsg)
293 keepWorker = false;
294
295 if (logFile)
296 *logFile << "Disconnecting upon "
297 "coordinator's request." <<
298 std::endl;
299 return mwOk;
300 }
301
302 // skip the rest if there is no standard message to process
303 if (!(code & mwStdMsg))
304 continue;
305
306 // prepare a basis for the returned code
307 unsigned int retcode = 0;
308
309 // process the message data
310 result = Process (msg);
311
312 // if an error occurred, disconnect and quit
313 if (result == mwError)
314 {
315 if (logFile)
316 *logFile << "Data processing failed." <<
317 std::endl;
318 return mwError;
319 }
320
321 // if rejected, reset the data and set the rejection flag
322 else if (result == mwReject)
323 {
324 if (logFile)
325 *logFile << "* Data rejected." << std::endl;
326 msg. Reset ();
327 retcode |= mwRejectedMsg;
328 }
329
330 // if processed successfully, make a note of it
331 else
332 {
333 if (logFile)
334 *logFile << "* Data processed." << std::endl;
335 }
336
337 // send port number if relevant
338 if (sendPort)
339 {
340 // send the port number at which the worker listens
341 mwData d;
342 d << this -> Port ();
343 unsigned int code = mwPortMsg;
344 int res = SendMessageW (fd, code, d);
345
346 // quit in case of failure
347 if (res < 0)
348 {
349 if (logFile)
350 *logFile << "Error while sending "
351 "the port number." <<
352 std::endl;
353 return mwError;
354 }
355
356 // make a note of having sent the port number
357 sendPort = false;
358 if (logFile)
359 *logFile << "* Port number sent." <<
360 std::endl;
361 }
362
363 // send the result of the processed data
364 result = SendMessageW (fd, retcode, msg);
365
366 // quit if the connection was lost
367 if (result == mwLost)
368 {
369 if (logFile)
370 *logFile << "Connection lost while sending "
371 "data to the coordinator." <<
372 std::endl;
373 return mwError;
374 }
375
376 // quit if an error occurred
377 if (result != mwOk)
378 {
379 if (logFile)
380 *logFile << "Error while sending data." <<
381 std::endl;
382 return mwError;
383 }
384 }
385} /* mwWorker::WorkOne */
virtual int Initialize(mwData &data)
This function is called to process initialization data, if any.
Definition: mwworker.h:144
virtual int Process(mwData &data)
This function is called to process a data portion and replace it with the result of computation.
Definition: mwworker.h:139
int RecvMessageW(int fd, unsigned int &code, mwData &x) const
Receives a message with data from the socket of a coordinator.
Definition: mwworker.h:161
int SendMessageW(int fd, unsigned int code, const mwData &x) const
Sends a message with data to the given socket as a worker.
Definition: mwworker.h:151
@ mwCanRead
Reading possible.
Definition: mwlowlev.h:57
@ mwNone
No flag selected.
Definition: mwlowlev.h:54
@ mwLost
The network connection has been lost.
Definition: mwlowlev.h:66
@ mwReject
The data has been rejected.
Definition: mwconfig.h:169
int mwSelect(const int *workers, int nworkers, int listensocket, int *ioflags, int timeout)
Determines IOflags for each of the workers and additionally the listensocket (the last flag).
@ mwKeepMsg
Message to the Worker: Keep running after having disconnected.
Definition: mwtask.h:69
@ mwStdMsg
Message to the Worker: A standard piece of data to be processed.
Definition: mwtask.h:66
@ mwInitMsg
Message to the Worker: A piece of initialization data to be processed.
Definition: mwtask.h:63
@ mwRejectedMsg
Message to the Coordinator: The data has been rejected.
Definition: mwtask.h:78
@ mwByeMsg
Message to the Worker: Please, disconnect.
Definition: mwtask.h:75
@ mwPortMsg
Message to the Coordinator: I will listen at this port number.
Definition: mwtask.h:81
@ mwDontKeepMsg
Message to the Worker: Don't keep running after disconnecting.
Definition: mwtask.h:72

References Initialize(), keepWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwByeMsg, chomp::multiwork::mwCanRead, chomp::multiwork::mwDontKeepMsg, chomp::multiwork::mwError, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, chomp::multiwork::mwLost, chomp::multiwork::mwNone, chomp::multiwork::mwOk, chomp::multiwork::mwPortMsg, chomp::multiwork::mwReject, chomp::multiwork::mwRejectedMsg, chomp::multiwork::mwSelect(), chomp::multiwork::mwStdMsg, chomp::multiwork::mwTimeOut, chomp::multiwork::mwTask::Port(), Process(), RecvMessageW(), SendMessageW(), and chomp::multiwork::mwTask::TimeOut().

Referenced by Work().

Friends And Related Function Documentation

◆ mwCoordinator

friend class mwCoordinator
friend

Definition at line 103 of file mwworker.h.

Member Data Documentation

◆ keepWorker

bool chomp::multiwork::mwWorker::keepWorker
private

Should the worker remain running after coordinator disconnects?

Definition at line 100 of file mwworker.h.

Referenced by KeepWorker(), Work(), and WorkOne().


The documentation for this class was generated from the following file: