The Original CHomP Software
Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
chomp::multiwork::mwCoordinator Class Reference

This class defines a generic coordinator task object for the multi-work distributed computations framework. More...

#include <mwcoord.h>

Inheritance diagram for chomp::multiwork::mwCoordinator:
chomp::multiwork::mwTask chomp::multiwork::mwSubCoordinator< dim, coord >

Public Member Functions

 mwCoordinator ()
 The default constructor. More...
 
virtual ~mwCoordinator ()
 The virtual destructor. More...
 
void KeepWorkers (bool keep=true)
 Makes workers keep running after the coordinator's completion. More...
 
int SaveWorkers (const char *filename)
 Saves addresses of workers to the given file in the form of address:port. More...
 
void Init (mwData &data)
 Defines a portion of initialization data which will be sent to every newly connected worker. More...
 
int Coordinate (mwWorker *w=NULL)
 Run the coordinator until all the work has been completed. More...
 
- Public Member Functions inherited from chomp::multiwork::mwTask
 mwTask ()
 The default constructor. More...
 
virtual ~mwTask ()
 The destructor. More...
 
void Port (int number)
 Sets the port number for the communication or 0 to use none. More...
 
int Port () const
 Returns the current port number. More...
 
void ControlNumber (unsigned int number)
 Sets the control number for identification. More...
 
unsigned int ControlNumber () const
 Returns the currently set identification control number. More...
 
void TimeOut (int seconds)
 Sets the network connection time-out interval in seconds. More...
 
int TimeOut () const
 Returns the currently set network connection time-out interval. More...
 
int LogFile (const char *filename)
 Begins logging detailed communication debug information to the given file. More...
 
void LogFile (const mwTask &other)
 Uses another task's log file to log this task's information. More...
 
void LogClose ()
 Closes the log file and adds a line with the time information unless this log file was borrowed from another task. More...
 
int Add (const char *name, int port=-1)
 Adds an address to the list of computers to connect to at the beginning of working or coordinating. More...
 
int Load (const char *filename)
 Loads computer addresses from the given file. More...
 
int QuitWorkers ()
 Quits all the workers whose addresses were added with the 'Add' and 'Load' functions. More...
 

Private Member Functions

virtual int Prepare (mwData &data)
 Prepares a piece of data to be sent to a worker. More...
 
virtual int Accept (mwData &data)
 Accepts a result received from a worker. More...
 
virtual int Reject (mwData &data)
 Acknowledges data that was rejected by a worker. More...
 
int RunLoop (bool no_more_data)
 Runs the main communication loop: Sends the data to workers and receives the results of their computations. More...
 
int RunLoopLocally ()
 Runs the main communication loop using the local worker. More...
 
void ConnectWorkers ()
 Connects to all workers in the list. More...
 
void BeginListening ()
 Begins listening at the given port. More...
 
void DisconnectAll ()
 Disconnects all the workers (normally called in the destructor). More...
 
int SendMessageC (int fd, unsigned int code, const mwData &x) const
 Sends a message with data to the given socket as a coordinator. More...
 
int RecvMessageC (int fd, unsigned int &code, mwData &x) const
 Receives a message with data from the socket of a worker. More...
 

Static Private Member Functions

static void mwTableDel (int *tab, int len, int pos)
 A helper function for deleting a table entry and shifting the remainder of the table backwards. More...
 
static void mwTableDel (mwData *tab, int len, int pos)
 A helper function for deleting a table entry and shifting the remainder of the table backwards using the method "Take" of the mwData class. More...
 

Private Attributes

bool singleWork
 Should data be processed locally only? More...
 
mwWorkerlocalWorker
 The address of a local worker or 0 if none is available. More...
 
bool keepWorkers
 Should the workers be kept running after coordinator is done? More...
 
mwData initData
 The initialization data that has to be sent to workers. More...
 
int nWaiting
 The number of workers waiting for their data. More...
 
mwWorkerData xWaiting [mwMAXWORK]
 The workers waiting for their tasks. More...
 
int nWorking
 The number of workers processing their data. More...
 
mwWorkerData xWorking [mwMAXWORK]
 The workers processing their data. More...
 
int nToDo
 The number of data pieces to be sent to working tasks. More...
 
mwData xToDo [mwMAXWORK]
 The data pieces waiting to be sent to working tasks. More...
 
int nRejected
 The number of recently rejected pieces of data. More...
 
mwData xRejected [mwMAXWORK]
 The recently rejected pieces of data. More...
 
int nDone
 The number of recently finished pieces of data. More...
 
mwData xDone [mwMAXWORK]
 The recently finished pieces of data. More...
 
int listensocket
 The socket number at which new workers are listened to. More...
 

Additional Inherited Members

- Static Protected Member Functions inherited from chomp::multiwork::mwTask
static int SendMessage (int fd, unsigned int ctrl, unsigned int code, const mwData &x)
 Sends a message with data to the given socket. More...
 
static int RecvMessage (int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
 Receives a message with data from the given socket. More...
 
- Protected Attributes inherited from chomp::multiwork::mwTask
std::ofstream * logFile
 The debug log file stream. More...
 
std::vector< std::string > computers
 A list of workers or coordinators to connect to at start-up. More...
 
std::vector< int > ports
 A list of port numbers of workers to connect to at start-up. More...
 

Detailed Description

This class defines a generic coordinator task object for the multi-work distributed computations framework.

Each coordinator class defined by the user must inherit from this class.

Definition at line 126 of file mwcoord.h.

Constructor & Destructor Documentation

◆ mwCoordinator()

chomp::multiwork::mwCoordinator::mwCoordinator ( )
inline

The default constructor.

Definition at line 288 of file mwcoord.h.

288 :
289#if mwNETWORK
290 singleWork (false),
291#else
292 singleWork (true),
293#endif
294 localWorker (0),
295 keepWorkers (false),
296 nWaiting (0),
297 nWorking (0),
298 nToDo (0),
299 nRejected (0),
300 nDone (0),
301 listensocket (-1)
302{
303 return;
304} /* mwCoordinator::mwCoordinator */
int nToDo
The number of data pieces to be sent to working tasks.
Definition: mwcoord.h:241
mwWorker * localWorker
The address of a local worker or 0 if none is available.
Definition: mwcoord.h:216
int listensocket
The socket number at which new workers are listened to.
Definition: mwcoord.h:261
int nRejected
The number of recently rejected pieces of data.
Definition: mwcoord.h:247
int nWaiting
The number of workers waiting for their data.
Definition: mwcoord.h:229
bool singleWork
Should data be processed locally only?
Definition: mwcoord.h:213
int nWorking
The number of workers processing their data.
Definition: mwcoord.h:235
int nDone
The number of recently finished pieces of data.
Definition: mwcoord.h:253
bool keepWorkers
Should the workers be kept running after coordinator is done?
Definition: mwcoord.h:219

◆ ~mwCoordinator()

chomp::multiwork::mwCoordinator::~mwCoordinator ( )
inlinevirtual

The virtual destructor.

Definition at line 342 of file mwcoord.h.

343{
344 // disconnect all the workers
345 DisconnectAll ();
346
347 return;
348} /* mwCoordinator::~mwCoordinator */
void DisconnectAll()
Disconnects all the workers (normally called in the destructor).
Definition: mwcoord.h:306

References DisconnectAll().

Member Function Documentation

◆ Accept()

int chomp::multiwork::mwCoordinator::Accept ( mwData data)
inlineprivatevirtual

Accepts a result received from a worker.

Should return mwOk on success, mwError to stop the computations. Please, overload this function in the class you derive from mwCoordinator.

Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.

Definition at line 452 of file mwcoord.h.

453{
454 return mwOk;
455} /* mwCoordinator::Accept */
@ mwOk
Everything is fine.
Definition: mwconfig.h:159

References chomp::multiwork::mwOk.

Referenced by Coordinate().

◆ BeginListening()

void chomp::multiwork::mwCoordinator::BeginListening ( )
inlineprivate

Begins listening at the given port.

Definition at line 603 of file mwcoord.h.

604{
605 // if there is no valid port number then cancel this operation
606 if (this -> Port () <= 0)
607 return;
608
609 // open a listening socket at the given port
610 listensocket = mwListen (this -> Port (), 15);
611
612 // add a message to the log file whether it was successful or not
613 if (logFile)
614 {
615 if (listensocket < 0)
616 *logFile << "Listening attempt at port " <<
617 this -> Port () << " failed." << std::endl;
618 else
619 *logFile << "Waiting for workers at port " <<
620 this -> Port () << "." << std::endl;
621 }
622
623 return;
624} /* mwCoordinator::BeginListening */
int Port() const
Returns the current port number.
Definition: mwtask.h:321
std::ofstream * logFile
The debug log file stream.
Definition: mwtask.h:177
int mwListen(int port, int queuesize)
Begins listening at the given port.

References listensocket, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwListen(), and chomp::multiwork::mwTask::Port().

Referenced by Coordinate().

◆ ConnectWorkers()

void chomp::multiwork::mwCoordinator::ConnectWorkers ( )
inlineprivate

Connects to all workers in the list.

Definition at line 554 of file mwcoord.h.

555{
556 int nComputers = computers. size ();
557 for (int i = 0; (nWaiting < mwMAXWORK - 1) && (i < nComputers); ++ i)
558 {
559 // determine the computer name and port number
560 const std::string &name = computers [i];
561 int port = ports [i];
562
563 // open the connection
564 int fd = mwConnect (name. c_str (), port);
565
566 // if the connection attempt was successful, add this worker
567 if (fd >= 0)
568 {
569 if (logFile)
570 *logFile << "Connected to " << name << ":" <<
571 port << "." << std::endl;
572 mwWorkerData &w = xWaiting [nWaiting];
573 w. fd = fd;
574 w. name = name;
575 w. port = port;
576 ++ nWaiting;
577
578 // prepare the initialization code
579 int code = mwInitMsg |
581
582 // send the initialization data to the worker
583 int initResult = SendMessageC (w. fd, code,
584 initData);
585 if (initResult != mwOk)
586 {
587 if (logFile)
588 *logFile << "Error while sending "
589 "the initialization data." <<
590 std::endl;
591 -- nWaiting;
592 }
593 }
594
595 // if unable to connect, make only a note in the log
596 else if (logFile)
597 *logFile << "Connection attempt to " << name <<
598 ":" << port << " failed." << std::endl;
599 }
600 return;
601} /* mwCoordinator::ConnectWorkers */
mwWorkerData xWaiting[mwMAXWORK]
The workers waiting for their tasks.
Definition: mwcoord.h:232
int SendMessageC(int fd, unsigned int code, const mwData &x) const
Sends a message with data to the given socket as a coordinator.
Definition: mwcoord.h:473
mwData initData
The initialization data that has to be sent to workers.
Definition: mwcoord.h:224
std::vector< std::string > computers
A list of workers or coordinators to connect to at start-up.
Definition: mwtask.h:201
std::vector< int > ports
A list of port numbers of workers to connect to at start-up.
Definition: mwtask.h:204
#define mwMAXWORK
The maximal number of simultaneously connected workers.
Definition: mwconfig.h:143
int mwConnect(const char *name, int port)
Connects to the given computer at the given port.
@ mwKeepMsg
Message to the Worker: Keep running after having disconnected.
Definition: mwtask.h:69
@ mwInitMsg
Message to the Worker: A piece of initialization data to be processed.
Definition: mwtask.h:63
@ mwDontKeepMsg
Message to the Worker: Don't keep running after disconnecting.
Definition: mwtask.h:72

References chomp::multiwork::mwTask::computers, initData, keepWorkers, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwConnect(), chomp::multiwork::mwDontKeepMsg, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, mwMAXWORK, chomp::multiwork::mwOk, nWaiting, chomp::multiwork::mwTask::ports, SendMessageC(), and xWaiting.

Referenced by Coordinate().

◆ Coordinate()

int chomp::multiwork::mwCoordinator::Coordinate ( mwWorker w = NULL)
inline

Run the coordinator until all the work has been completed.

If a worker object is supplied, the work will be done locally unless any remote worker is connected to the coordinator. Return mwOk or mwError.

Definition at line 1061 of file mwcoord.h.

1062{
1063 // remember the local worker's address
1064 localWorker = w;
1065
1066 // initialize the local worker if any
1067 if (localWorker)
1068 localWorker -> Initialize (initData);
1069
1070 if (logFile)
1071 {
1072 *logFile << "Running as a COORDINATOR." << std::endl;
1073
1074 // indicate what kind of network connection is in use
1075 #if !mwNETWORK
1076 *logFile << "There is no network in use." << std::endl;
1077 #elif mwWXWIN
1078 *logFile << "Using the sockets interface "
1079 "provided by wxWindows." << std::endl;
1080 #else
1081 *logFile << "Using the standard sockets "
1082 "for network connections." << std::endl;
1083 #endif
1084
1085 // say if running in the single-work mode only
1086 if (singleWork && localWorker)
1087 *logFile << "Running in the single-work mode." <<
1088 std::endl;
1089 }
1090
1091 // connect to workers on the list and begin listening if necessary
1092 if (!singleWork)
1093 {
1094 this -> ConnectWorkers ();
1095 this -> BeginListening ();
1096 }
1097
1098 // if not listening and there are no workers then try switching
1099 // to the single-work mode, unless there is no local worker
1100 if (!singleWork && (listensocket < 0) && !nWaiting)
1101 {
1102 // switch to the single-work mode if allowed to
1103 if (localWorker)
1104 {
1105 singleWork = true;
1106 if (logFile)
1107 *logFile << "No remote workers. Switching "
1108 "to the single-work mode." <<
1109 std::endl;
1110 }
1111
1112 // if data cannot be processed locally, report a failure
1113 else
1114 {
1115 if (logFile)
1116 *logFile << "Failure: No workers." <<
1117 std::endl;
1118 return mwError;
1119 }
1120 }
1121
1122 // is there no more data to be sent?
1123 bool no_more_data = false;
1124
1125 while (1)
1126 {
1127 // run the communications loop
1128 int loopresult = singleWork ? this -> RunLoopLocally () :
1129 this -> RunLoop (no_more_data);
1130
1131 // stop if the coordinator failed badly
1132 if (loopresult == mwError)
1133 return mwError;
1134
1135 // if some data was rejected, process all this data
1136 while (nRejected > 0)
1137 {
1138 // run the user's procedure to acquire rejected data
1139 -- nRejected;
1140 xRejected [nRejected]. Rewind ();
1141 int result = this -> Reject (xRejected [nRejected]);
1142
1143 // interrupt if the user says that an error occurred
1144 if (result != mwOk)
1145 return mwError;
1146
1147 // reset the data acquired by the user
1148 xRejected [nRejected]. Reset ();
1149 no_more_data = false;
1150 }
1151
1152 // if some new data arrived, process all this data
1153 while (nDone > 0)
1154 {
1155 // call the user's procedure to accept the data piece
1156 -- nDone;
1157 xDone [nDone]. Rewind ();
1158 int result = this -> Accept (xDone [nDone]);
1159
1160 // interrupt if the user says that an error occurred
1161 if (result != mwOk)
1162 return mwError;
1163
1164 // reset the data acquired by the user
1165 xDone [nDone]. Reset ();
1166 no_more_data = false;
1167 }
1168
1169 // determine whether a new data item must be prepared:
1170 // if there are workers waiting and there is no data then YES
1171 bool hungry = (nWaiting > 0) && !nToDo;
1172 // if there is no worker and no data then YES
1173 if ((localWorker || (listensocket >= 0)) &&
1174 !nWorking && !nWaiting && !nToDo)
1175 {
1176 hungry = true;
1177 }
1178 // if no more data is needed then definitely NO
1179 if (no_more_data)
1180 hungry = false;
1181
1182 // prepare a new data item if necessary
1183 if (hungry)
1184 {
1185 // run the user's procedure for preparing data
1186 int result = this -> Prepare (xToDo [nToDo]);
1187
1188 // break if the user says that an error occurred
1189 if (result == mwError)
1190 return mwError;
1191
1192 // make a note if there is no more data
1193 else if (result == mwNoData)
1194 no_more_data = true;
1195
1196 // add the data piece to the work queue otherwise
1197 else
1198 ++ nToDo;
1199 }
1200
1201 // stop if the tasks are completed and there is no more data
1202 if (no_more_data && !nWorking && !nToDo)
1203 return mwOk;
1204 }
1205} /* mwCoordinator::Coordinate */
mwData xDone[mwMAXWORK]
The recently finished pieces of data.
Definition: mwcoord.h:256
mwData xToDo[mwMAXWORK]
The data pieces waiting to be sent to working tasks.
Definition: mwcoord.h:244
int RunLoopLocally()
Runs the main communication loop using the local worker.
Definition: mwcoord.h:507
void BeginListening()
Begins listening at the given port.
Definition: mwcoord.h:603
virtual int Accept(mwData &data)
Accepts a result received from a worker.
Definition: mwcoord.h:452
int RunLoop(bool no_more_data)
Runs the main communication loop: Sends the data to workers and receives the results of their computa...
Definition: mwcoord.h:626
virtual int Prepare(mwData &data)
Prepares a piece of data to be sent to a worker.
Definition: mwcoord.h:447
void ConnectWorkers()
Connects to all workers in the list.
Definition: mwcoord.h:554
mwData xRejected[mwMAXWORK]
The recently rejected pieces of data.
Definition: mwcoord.h:250
virtual int Reject(mwData &data)
Acknowledges data that was rejected by a worker.
Definition: mwcoord.h:457
@ mwNoData
There is no data to be sent to workers, for example, because everything has been already sent.
Definition: mwconfig.h:166
@ mwError
A serious error occurred.
Definition: mwconfig.h:162

References Accept(), BeginListening(), ConnectWorkers(), initData, listensocket, localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, chomp::multiwork::mwNoData, chomp::multiwork::mwOk, nDone, nRejected, nToDo, nWaiting, nWorking, Prepare(), Reject(), RunLoop(), RunLoopLocally(), singleWork, xDone, xRejected, and xToDo.

◆ DisconnectAll()

void chomp::multiwork::mwCoordinator::DisconnectAll ( )
inlineprivate

Disconnects all the workers (normally called in the destructor).

Definition at line 306 of file mwcoord.h.

307{
308 // release all the workers if they are still connected
309 for (int i = 0; i < nWaiting + nWorking; ++ i)
310 {
311 // determine the particular worker
312 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
313 xWorking [i - nWaiting];
314
315 // if this worker is already disconnected, take the next one
316 if (w. fd < 0)
317 continue;
318
319 // prepare the code to send to the worker
320 unsigned int code = mwByeMsg;
322
323 // send the 'Bye!' message to the worker
324 mwData empty;
325 SendMessageC (w. fd, code, empty);
326
327 // disconnect the worker
328 mwDisconnect (w. fd);
329 w. fd = -1;
330
331 // make a note of what happened in the log file
332 if (logFile)
333 *logFile << "Worker " << i << " (" << w. name <<
334 ":" << w. port << ") disconnected and " <<
335 (keepWorkers ? "waiting." : "exited.") <<
336 std::endl;
337 }
338
339 return;
340} /* mwCoordinator::Disconnect */
mwWorkerData xWorking[mwMAXWORK]
The workers processing their data.
Definition: mwcoord.h:238
void mwDisconnect(int fd)
Disconnects the given socket.
@ mwByeMsg
Message to the Worker: Please, disconnect.
Definition: mwtask.h:75

References keepWorkers, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwByeMsg, chomp::multiwork::mwDisconnect(), chomp::multiwork::mwDontKeepMsg, chomp::multiwork::mwKeepMsg, nWaiting, nWorking, SendMessageC(), xWaiting, and xWorking.

Referenced by ~mwCoordinator().

◆ Init()

void chomp::multiwork::mwCoordinator::Init ( mwData data)
inline

Defines a portion of initialization data which will be sent to every newly connected worker.

The argument variable is reset and the data is moved to the internal buffer.

Definition at line 464 of file mwcoord.h.

465{
466 initData. Take (data);
467 return;
468} /* mwCoordinator::Init */

References initData.

◆ KeepWorkers()

void chomp::multiwork::mwCoordinator::KeepWorkers ( bool  keep = true)
inline

Makes workers keep running after the coordinator's completion.

Definition at line 350 of file mwcoord.h.

351{
352 keepWorkers = keep;
353 return;
354} /* mwCoordinator::KeepWorkers */

References keepWorkers.

◆ mwTableDel() [1/2]

void chomp::multiwork::mwCoordinator::mwTableDel ( int *  tab,
int  len,
int  pos 
)
inlinestaticprivate

A helper function for deleting a table entry and shifting the remainder of the table backwards.

Definition at line 1209 of file mwcoord.h.

1212{
1213 for (int i = pos + 1; i < len; ++ i)
1214 tab [i - 1] = tab [i];
1215 return;
1216} /* mwTableDel */

◆ mwTableDel() [2/2]

void chomp::multiwork::mwCoordinator::mwTableDel ( mwData tab,
int  len,
int  pos 
)
inlinestaticprivate

A helper function for deleting a table entry and shifting the remainder of the table backwards using the method "Take" of the mwData class.

Definition at line 1218 of file mwcoord.h.

1221{
1222 for (int i = pos + 1; i < len; ++ i)
1223 tab [i - 1]. Take (tab [i]);
1224 return;
1225} /* mwTableDel */

◆ Prepare()

int chomp::multiwork::mwCoordinator::Prepare ( mwData data)
inlineprivatevirtual

Prepares a piece of data to be sent to a worker.

Should return mwOk on success, mwNothing if there is no data to send based on the computations completed so far, mwError to cancel the computations. Please, overload this function in the class you derive from mwCoordinator.

Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.

Definition at line 447 of file mwcoord.h.

448{
449 return mwNoData;
450} /* mwCoordinator::Prepare */

References chomp::multiwork::mwNoData.

Referenced by Coordinate().

◆ RecvMessageC()

int chomp::multiwork::mwCoordinator::RecvMessageC ( int  fd,
unsigned int &  code,
mwData x 
) const
inlineprivate

Receives a message with data from the socket of a worker.

Returns mwOk on success or mwError in the case of failure.

Definition at line 483 of file mwcoord.h.

485{
486 // receive the message
487 unsigned int ctrl = 0;
488 int result = this -> RecvMessage (fd, ctrl, code, x);
489
490 // if there was an error then return its code
491 if (result != mwOk)
492 return result;
493
494 // if the control number is correct then finish successfully
495 if (ctrl == ~(this -> ControlNumber ()))
496 return mwOk;
497
498 // if the control number is wrong then return an error code
499 if (logFile)
500 *logFile << "Wrong control code received "
501 "from the worker: " << ~ctrl << "." << std::endl;
502 return mwError;
503} /* mwCoordinator::SendMessageC */
unsigned int ControlNumber() const
Returns the currently set identification control number.
Definition: mwtask.h:332
static int RecvMessage(int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
Receives a message with data from the given socket.
Definition: mwtask.h:437

References chomp::multiwork::mwTask::ControlNumber(), chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, chomp::multiwork::mwOk, and chomp::multiwork::mwTask::RecvMessage().

Referenced by RunLoop().

◆ Reject()

int chomp::multiwork::mwCoordinator::Reject ( mwData data)
inlineprivatevirtual

Acknowledges data that was rejected by a worker.

Should return mwOk if this is fine, or mwError to cancel the computations. Please, overload this function in the class you derive from mwCoordinator unless the default behavior is fine (returning mwOk and ignoring the data).

Reimplemented in chomp::multiwork::mwSubCoordinator< dim, coord >.

Definition at line 457 of file mwcoord.h.

458{
459 return mwOk;
460} /* mwCoordinator::Reject */

References chomp::multiwork::mwOk.

Referenced by Coordinate().

◆ RunLoop()

int chomp::multiwork::mwCoordinator::RunLoop ( bool  no_more_data)
inlineprivate

Runs the main communication loop: Sends the data to workers and receives the results of their computations.

If there is no_more_data then waits for the running tasks until they finalize and some data becomes available. Otherwise, returns also when a new worker is ready to acquire a portion of data for processing. Returns mwOk or mwError.

Definition at line 626 of file mwcoord.h.

627{
628 if (false && logFile)
629 *logFile << "\nDebug0: " << nWaiting << " waiting, " <<
630 nWorking << " working, " << nToDo <<
631 " data pieces." << std::endl;
632
633 // if not listening and there are no workers then try switching
634 // to the single-work mode, unless there is no local worker
635 if ((listensocket < 0) && !nWorking && !nWaiting)
636 {
637 // switch to the single-work mode if allowed to
638 if (localWorker)
639 {
640 singleWork = true;
641 if (logFile)
642 *logFile << "All workers disconnected. "
643 "Switching to the single-work "
644 "mode." << std::endl;
645 return mwOk;
646 }
647
648 // if data cannot be processed locally, report a failure
649 else
650 {
651 if (logFile)
652 *logFile << "Failure: All workers "
653 "disconnected. The work cannot be "
654 "continued." << std::endl;
655 return mwError;
656 }
657 }
658
659 // TO DO: If there is no more data (options & mwNoMoreData)
660 // and some workers are idle, and some others are working
661 // for a long time, send their data also to a few idle
662 // workers... This requires some A.I., of course. ;)
663
664 // the i/o flags ans sockets of the workers + one for 'listensocket'
665 int ioflags [mwMAXWORK];
666 int sockets [mwMAXWORK];
667 int nWorkers = nToDo ? (nWorking + nWaiting) : nWorking;
668
669 // prepare flags for the working and waiting workers
670 for (int i = 0; i < nWorking; ++ i)
671 {
672 ioflags [i] = mwCanRead;
673 sockets [i] = xWorking [i]. fd;
674 }
675 if (nWorkers > nWorking)
676 {
677 for (int i = 0; i < nWaiting; ++ i)
678 {
679 ioflags [nWorking + i] = mwCanWrite;
680 sockets [nWorking + i] = xWaiting [i]. fd;
681 }
682 }
683
684 // prepare the listen socket flag
685 bool listening = false;
686 int listenflag = nWorkers;
687 if ((listensocket >= 0) && (nWorking + nWaiting < mwMAXWORK - 1))
688 {
689 ioflags [listenflag] = mwCanRead;
690 listening = true;
691 }
692 else
693 ioflags [listenflag] = mwNone;
694
695 // determine whether it is necessary to wait or not
696 int timelimit = this -> TimeOut ();
697 if (localWorker && !nWorking && !nWaiting)
698 timelimit = 0;
699 if (listening && !no_more_data && !nToDo && !nWorking && !nWaiting)
700 timelimit = 0;
701 if (!no_more_data && !nToDo && nWaiting && (nWorking < mwMAXWORK))
702 timelimit = 0;
703// if (no_more_data && !nToDo && !nWorking)
704// timelimit = 0;
705
706 // report the select's parameters to the log file
707 if (logFile)
708 {
709 *logFile << ">>> Select, t = " << timelimit << ", flags =";
710 for (int i = 0; i <= nWorkers; ++ i)
711 *logFile << " " << ioflags [i];
712 *logFile << "." << std::endl;
713 }
714
715 // wait until data can be received from or sent to any socket
716 int result = mwSelect (sockets, nWorkers,
717 listening ? listensocket : -1, ioflags, timelimit);
718
719 // report the returned parametrs to the log file
720 if (logFile)
721 {
722 *logFile << ">>> Returned flags =";
723 for (int i = 0; i <= nWorkers; ++ i)
724 *logFile << " " << ioflags [i];
725 *logFile << "." << std::endl;
726 }
727
728 // in case of select's failure, exit the loop with a failure message
729 if (result == mwError)
730 {
731 if (logFile)
732 *logFile << "Error: The 'select' function failed." <<
733 std::endl;
734 return mwError;
735 }
736
737 // report a time-out if necessary
738 if ((timelimit > 0) && (result == mwTimeOut))
739 {
740 if (logFile)
741 *logFile << "Time-out occurred at 'select'." <<
742 std::endl;
743 }
744
745 // receive data from all the workers who are ready
746 for (int i = 0; (i < nWorking) && (nDone < mwMAXWORK) &&
747 (nRejected < mwMAXWORK) && (nToDo < mwMAXWORK); ++ i)
748 {
749 // if this worker is not ready, yet, then skip it
750 if (!(ioflags [i] & mwCanRead))
751 continue;
752
753 // receive the entire data chunk from the worker
754 unsigned int code = 0;
755 int result = RecvMessageC (sockets [i], code, xDone [nDone]);
756
757 // remember which worker this is
758 mwWorkerData &w = xWorking [i];
759
760 // reject the worker in case of error
761 if (result < 0)
762 {
763 // log the details of what happened
764 if (logFile)
765 {
766 *logFile << "Worker " << i;
767 if (!w. name. empty ())
768 *logFile << " (" << w. name << ")";
769 *logFile << " disconnected: ";
770 if (result == mwLost)
771 *logFile << "Connection lost.";
772 else
773 *logFile << "An error occurred.";
774 *logFile << std::endl;
775 }
776
777 // disconnect the worker
778 mwDisconnect (sockets [i]);
779
780 // move the worker's data back to the "to-do" list
781 xToDo [nToDo]. Take (w. data);
782 ++ nToDo;
783
784 // make a note of this in the worker's data
785 w. fd = -1;
786 w. status = -1;
787 }
788
789 // if transmitting the port number only, take it
790 else if (code & mwPortMsg)
791 {
792 // retrieve the port number
793 xDone [nDone] >> w. port;
794
795 // report this fact to the log file
796 if (logFile)
797 *logFile << "Port number " << w. port <<
798 " received from worker " << i <<
799 "." << std::endl;
800 }
801
802 // if the data was rejected, move it to 'xRejected'
803 else if (code & mwRejectedMsg)
804 {
805 // report what happened
806 if (logFile)
807 *logFile << "Data was rejected by worker " <<
808 i << "." << std::endl;
809
810 // move the data to the rejected table
811 xRejected [nRejected]. Take (w. data);
812 ++ nRejected;
813
814 // indicate the status change of this worker
815 w. status = 1;
816 }
817
818 // if the data was Ok, the result is in 'xDone'
819 else
820 {
821 // report this to the log file
822 if (logFile)
823 *logFile << "Processed data received from "
824 "worker " << i << "." << std::endl;
825
826 // accept this piece of data
827 ++ nDone;
828
829 // indicate the status change of this worker
830 w. status = 1;
831 }
832 }
833
834 // send data to those workers who are ready
835 for (int i = 0; (i < nWaiting) && nToDo; ++ i)
836 {
837 // remember the worker and its offset
838 mwWorkerData &w = xWaiting [i];
839 int offset = nWorking + i;
840
841 // if this worker is not ready to get data then skip it
842 if (!(ioflags [offset] & mwCanWrite))
843 continue;
844
845 // prepare a message code to send
846 unsigned int code = mwStdMsg;
847
848 // send a data chunk for processing
849 int result = SendMessageC (sockets [offset], code,
850 xToDo [nToDo - 1]);
851
852 // if the data was sent successfully
853 if (result == mwOk)
854 {
855 // take the data chunk to the worker
856 -- nToDo;
857 w. data. Take (xToDo [nToDo]);
858
859 // indicate the status of the worker
860 w. status = 1;
861
862 // report this to the log file
863 if (logFile)
864 *logFile << "Data " << nToDo << " sent to "
865 "worker " << i << "." << std::endl;
866 }
867
868 // if an error occurred, reject the worker
869 else
870 {
871 // report this situation to the log file
872 if (logFile)
873 *logFile << "Worker " << i << " disconnected"
874 ": " << ((result == mwLost) ?
875 "Connection lost." :
876 "An error occurred.") << std::endl;
877
878 // disconnect the worker
879 mwDisconnect (sockets [offset]);
880
881 // modify the status of the worker accordingly
882 w. fd = -1;
883 w. status = -1;
884 }
885 }
886
887 if (false && logFile)
888 {
889 *logFile << "Debug1: " << nWaiting << " waiting:";
890 for (int i = 0; i < nWaiting; ++ i)
891 *logFile << " " << xWaiting [i]. status;
892 *logFile << "; " << nWorking << " working:";
893 for (int i = 0; i < nWorking; ++ i)
894 *logFile << " " << xWorking [i]. status;
895 *logFile << std::endl;
896 }
897
898 // purge workers who have been disconnected and move workers
899 // whose data has been acquired to the 'waiting' queue
900 for (int i = 0; i < nWorking; ++ i)
901 {
902 // skip this worker if it is fine
903 if (xWorking [i]. status == 0)
904 continue;
905
906 // swap this worker with the last one if necessary
907 if (i < nWorking - 1)
908 swap (xWorking [i], xWorking [nWorking - 1]);
909
910 // move the worker to the waiting queue if necessary
911 if (xWorking [nWorking - 1]. status > 0)
912 {
913 xWorking [nWorking - 1]. status = 0;
915 ++ nWaiting;
916 }
917
918 // remove this worker from the working queue
919 -- nWorking;
920
921 // consider the same entry in the table again
922 -- i;
923 }
924
925 // purge workers who have been disconnected and move workers
926 // who received data to the 'working' queue
927 for (int i = 0; i < nWaiting; ++ i)
928 {
929 // skip this worker if it is fine
930 if (xWaiting [i]. status == 0)
931 continue;
932
933 // swap this worker with the last one if necessary
934 if (i < nWaiting - 1)
935 swap (xWaiting [i], xWaiting [nWaiting - 1]);
936
937 // move the worker to the working queue if necessary
938 if (xWaiting [nWaiting - 1]. status > 0)
939 {
940 xWaiting [nWaiting - 1]. status = 0;
942 ++ nWorking;
943 }
944
945 // remove this worker from the waiting queue
946 -- nWaiting;
947
948 // consider the same entry in the table again
949 -- i;
950 }
951
952 if (false && logFile)
953 *logFile << "Debug2: " << nWaiting << " waiting, " <<
954 nWorking << " working, " << nToDo <<
955 " data pieces." << std::endl;
956
957 // accept connections from new workers if any
958 if (listening && (ioflags [listenflag] & mwCanRead) &&
960 {
961 // accept the connection
962 mwWorkerData &w = xWaiting [nWaiting];
963 w. name = std::string ("");
964 w. port = 0;
965 w. status = 0;
966 w. fd = mwAccept (listensocket, w. name);
967
968 // if the new worker has been accepted successfully
969 if (w. fd >= 0)
970 {
971 // report the worker's acceptance
972 if (logFile)
973 *logFile << "A worker from '" << w. name <<
974 "' accepted." << std::endl;
975
976 // add the worker to the waiting queue
977 ++ nWaiting;
978
979 // prepare the initialization code
980 int code = mwInitMsg |
982
983 // send the initialization data to the worker
984 int initResult = SendMessageC (w. fd, code,
985 initData);
986 if (initResult != mwOk)
987 {
988 if (logFile)
989 *logFile << "Error while sending "
990 "the initialization data." <<
991 std::endl;
992 -- nWaiting;
993 }
994 }
995
996 // report the error to the log file if not successful
997 else if (logFile)
998 *logFile << "Unsuccessful connection of a worker "
999 "from '" << w. name << "'." << std::endl;
1000 }
1001
1002 if (false && logFile)
1003 *logFile << "Debug3: " << nWaiting << " waiting, " <<
1004 nWorking << " working, " << nToDo <<
1005 " data pieces." << std::endl;
1006
1007 // ask for some data to process if none is waiting
1008 // or too much data acquired from workers is accumulated
1009 if (localWorker && !nWorking && !nWaiting &&
1010 !no_more_data && (result == mwTimeOut) && !nToDo)
1011 {
1012 if (logFile)
1013 *logFile << "Asking for some data to be "
1014 "processed locally." << std::endl;
1015 return mwOk;
1016 }
1017
1018 // run the work locally as a last resort
1019 if (localWorker && !nWorking && !nWaiting && (result == mwTimeOut) &&
1020 nToDo && (nDone < mwMAXWORK) && (nRejected < mwMAXWORK))
1021 {
1022 // make a note in the log file of what is going on
1023 if (logFile)
1024 *logFile << "Processing data locally." << std::endl;
1025
1026 // process one piece of data
1027 -- nToDo;
1028 xToDo [nToDo]. Rewind ();
1029 int result = localWorker -> Process (xToDo [nToDo]);
1030 xToDo [nToDo]. Rewind ();
1031
1032 if (result == mwReject)
1033 {
1034 xRejected [nRejected]. Take (xToDo [nToDo]);
1035 ++ nRejected;
1036 }
1037 else if (result == mwError)
1038 {
1039 if (logFile)
1040 *logFile << "Data processing failed." <<
1041 std::endl;
1042 return mwError;
1043 }
1044 else
1045 {
1046 xDone [nDone]. Take (xToDo [nToDo]);
1047 ++ nDone;
1048 }
1049 }
1050
1051 if (false && logFile)
1052 *logFile << "Debug4: " << nWaiting << " waiting, " <<
1053 nWorking << " working, " << nToDo <<
1054 " data pieces." << std::endl;
1055
1056 return mwOk;
1057} /* mwCoordinator::RunLoop */
int RecvMessageC(int fd, unsigned int &code, mwData &x) const
Receives a message with data from the socket of a worker.
Definition: mwcoord.h:483
int TimeOut() const
Returns the currently set network connection time-out interval.
Definition: mwtask.h:343
void swap(mwWorkerData &data1, mwWorkerData &data2)
Definition: mwcoord.h:108
@ mwTimeOut
A connection time out has occurred.
Definition: mwlowlev.h:63
@ mwCanWrite
Writing possible.
Definition: mwlowlev.h:60
@ mwCanRead
Reading possible.
Definition: mwlowlev.h:57
@ mwNone
No flag selected.
Definition: mwlowlev.h:54
@ mwLost
The network connection has been lost.
Definition: mwlowlev.h:66
@ mwReject
The data has been rejected.
Definition: mwconfig.h:169
int mwSelect(const int *workers, int nworkers, int listensocket, int *ioflags, int timeout)
Determines IOflags for each of the workers and additionally the listensocket (the last flag).
int mwAccept(int fd, std::string &computer, int timeout=-1)
Waits for and accepts a connection at the given socket.
@ mwStdMsg
Message to the Worker: A standard piece of data to be processed.
Definition: mwtask.h:66
@ mwRejectedMsg
Message to the Coordinator: The data has been rejected.
Definition: mwtask.h:78
@ mwPortMsg
Message to the Coordinator: I will listen at this port number.
Definition: mwtask.h:81

References initData, keepWorkers, listensocket, localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwAccept(), chomp::multiwork::mwCanRead, chomp::multiwork::mwCanWrite, chomp::multiwork::mwDisconnect(), chomp::multiwork::mwDontKeepMsg, chomp::multiwork::mwError, chomp::multiwork::mwInitMsg, chomp::multiwork::mwKeepMsg, chomp::multiwork::mwLost, mwMAXWORK, chomp::multiwork::mwNone, chomp::multiwork::mwOk, chomp::multiwork::mwPortMsg, chomp::multiwork::mwReject, chomp::multiwork::mwRejectedMsg, chomp::multiwork::mwSelect(), chomp::multiwork::mwStdMsg, chomp::multiwork::mwTimeOut, nDone, nRejected, nToDo, nWaiting, nWorking, RecvMessageC(), SendMessageC(), singleWork, chomp::multiwork::swap(), chomp::multiwork::mwTask::TimeOut(), xDone, xRejected, xToDo, xWaiting, and xWorking.

Referenced by Coordinate().

◆ RunLoopLocally()

int chomp::multiwork::mwCoordinator::RunLoopLocally ( )
inlineprivate

Runs the main communication loop using the local worker.

Returns mwOk or mwError.

Definition at line 507 of file mwcoord.h.

508{
509 if (!localWorker)
510 return mwError;
511
512 // indicate that some worker is continuously waiting for data
513 if (!nWaiting)
514 ++ nWaiting;
515
516 // process the data
517 while ((nDone < mwMAXWORK) && (nRejected < mwMAXWORK) && (nToDo > 0))
518 {
519 // process the data and replace it with the result
520 -- nToDo;
521 xToDo [nToDo]. Rewind ();
522 int result = localWorker -> Process (xToDo [nToDo]);
523 xToDo [nToDo]. Rewind ();
524
525 // if the data caused an error then abort the loop
526 if (result == mwError)
527 {
528 if (logFile)
529 *logFile << "Data processing failed." <<
530 std::endl;
531 return mwError;
532 }
533
534 // if the data was rejected
535 // then move it to the right place
536 else if (result == mwReject)
537 {
538 xRejected [nRejected]. Take (xToDo [nToDo]);
539 ++ nRejected;
540 }
541
542 // if the data was accepted
543 // then move it to the right place
544 else
545 {
546 xDone [nDone]. Take (xToDo [nToDo]);
547 ++ nDone;
548 }
549 }
550
551 return mwOk;
552} /* mwCoordinator::RunLoopLocally */

References localWorker, chomp::multiwork::mwTask::logFile, chomp::multiwork::mwError, mwMAXWORK, chomp::multiwork::mwOk, chomp::multiwork::mwReject, nDone, nRejected, nToDo, nWaiting, xDone, xRejected, and xToDo.

Referenced by Coordinate().

◆ SaveWorkers()

int chomp::multiwork::mwCoordinator::SaveWorkers ( const char *  filename)
inline

Saves addresses of workers to the given file in the form of address:port.

Repeated combinations are saved once only.

Definition at line 356 of file mwcoord.h.

357{
358 // create a file for the list of workers
359 // prepare to create a file to list the workers
360 std::ofstream f;
361 bool first = false;
362
363 // go through all the connected workers
364 int counter = 0;
365 for (int i = 0; i < nWaiting + nWorking; ++ i)
366 {
367 // determine the particular worker
368 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
369 xWorking [i - nWaiting];
370
371 // determine the host name
372 const char *host = w. name. c_str ();
373
374 // if no host name or port number is known, skip it
375 if (!host || !*host || (w. port <= 0))
376 continue;
377
378 // check if the same worker was already listed
379 int j;
380 for (j = 0; j < i; ++ j)
381 {
382 // determine the other worker
383 mwWorkerData &v = (j < nWaiting) ? xWaiting [j] :
384 xWorking [j - nWaiting];
385
386 // determine the name of the other host
387 const char *host_j = v. name. c_str ();
388
389 // if no host name or port number is known, skip it
390 if (!host_j || !*host_j || (v. port <= 0))
391 continue;
392
393 // if the ports are different, skip the other worker
394 if (w. port != v. port)
395 continue;
396
397 // if the names are different, skip the other worker
398 if (std::strcmp (host, host_j))
399 continue;
400
401 // make a note that this is the same worker
402 j = i + 1;
403 break;
404 }
405 if (j > i)
406 continue;
407
408 // create the file if this is the first time to write data
409 if (first)
410 {
411 f. open (filename, std::ios::out | std::ios::trunc);
412 if (!f)
413 return mwError;
414 f << "; A list of currently running workers:\n";
415 first = false;
416 }
417
418 // write the address
419 f << host << ":" << w. port << "\n";
420 ++ counter;
421 }
422
423 // exit if no workers have been listed
424 if (!counter)
425 return mwOk;
426
427 // add a summary if any workers have been listed
428 f << "; A total of " << counter << " workers";
429 if (counter != nWaiting + nWorking)
430 f << " out of " << (nWaiting + nWorking);
431 f << " saved.\n";
432
433 if (!keepWorkers)
434 f << "; The workers will exit upon disconnection.\n";
435 else
436 f << "; The workers will remain running "
437 "after disconnection.\n";
438 f. close ();
439 if (!f)
440 return mwError;
441 else
442 return mwOk;
443} /* mwCoordinator::SaveWorkers */

References keepWorkers, chomp::multiwork::mwError, chomp::multiwork::mwOk, nWaiting, nWorking, xWaiting, and xWorking.

◆ SendMessageC()

int chomp::multiwork::mwCoordinator::SendMessageC ( int  fd,
unsigned int  code,
const mwData x 
) const
inlineprivate

Sends a message with data to the given socket as a coordinator.

Returns mwOk on success and mwError in the case of failure.

Definition at line 473 of file mwcoord.h.

475{
476 // prepare the control number to send
477 unsigned int ctrl = this -> ControlNumber ();
478
479 // send the message with the control number and the message code
480 return this -> SendMessage (fd, ctrl, code, x);
481} /* mwCoordinator::SendMessageC */
static int SendMessage(int fd, unsigned int ctrl, unsigned int code, const mwData &x)
Sends a message with data to the given socket.
Definition: mwtask.h:427

References chomp::multiwork::mwTask::ControlNumber(), and chomp::multiwork::mwTask::SendMessage().

Referenced by ConnectWorkers(), DisconnectAll(), and RunLoop().

Member Data Documentation

◆ initData

mwData chomp::multiwork::mwCoordinator::initData
private

The initialization data that has to be sent to workers.

Definition at line 224 of file mwcoord.h.

Referenced by ConnectWorkers(), Coordinate(), Init(), and RunLoop().

◆ keepWorkers

bool chomp::multiwork::mwCoordinator::keepWorkers
private

Should the workers be kept running after coordinator is done?

Definition at line 219 of file mwcoord.h.

Referenced by ConnectWorkers(), DisconnectAll(), KeepWorkers(), RunLoop(), and SaveWorkers().

◆ listensocket

int chomp::multiwork::mwCoordinator::listensocket
private

The socket number at which new workers are listened to.

Definition at line 261 of file mwcoord.h.

Referenced by BeginListening(), Coordinate(), and RunLoop().

◆ localWorker

mwWorker* chomp::multiwork::mwCoordinator::localWorker
private

The address of a local worker or 0 if none is available.

Definition at line 216 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

◆ nDone

int chomp::multiwork::mwCoordinator::nDone
private

The number of recently finished pieces of data.

Definition at line 253 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

◆ nRejected

int chomp::multiwork::mwCoordinator::nRejected
private

The number of recently rejected pieces of data.

Definition at line 247 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

◆ nToDo

int chomp::multiwork::mwCoordinator::nToDo
private

The number of data pieces to be sent to working tasks.

Definition at line 241 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

◆ nWaiting

int chomp::multiwork::mwCoordinator::nWaiting
private

The number of workers waiting for their data.

Definition at line 229 of file mwcoord.h.

Referenced by ConnectWorkers(), Coordinate(), DisconnectAll(), RunLoop(), RunLoopLocally(), and SaveWorkers().

◆ nWorking

int chomp::multiwork::mwCoordinator::nWorking
private

The number of workers processing their data.

Definition at line 235 of file mwcoord.h.

Referenced by Coordinate(), DisconnectAll(), RunLoop(), and SaveWorkers().

◆ singleWork

bool chomp::multiwork::mwCoordinator::singleWork
private

Should data be processed locally only?

Definition at line 213 of file mwcoord.h.

Referenced by Coordinate(), and RunLoop().

◆ xDone

mwData chomp::multiwork::mwCoordinator::xDone[mwMAXWORK]
private

The recently finished pieces of data.

Definition at line 256 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

◆ xRejected

mwData chomp::multiwork::mwCoordinator::xRejected[mwMAXWORK]
private

The recently rejected pieces of data.

Definition at line 250 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

◆ xToDo

mwData chomp::multiwork::mwCoordinator::xToDo[mwMAXWORK]
private

The data pieces waiting to be sent to working tasks.

Definition at line 244 of file mwcoord.h.

Referenced by Coordinate(), RunLoop(), and RunLoopLocally().

◆ xWaiting

mwWorkerData chomp::multiwork::mwCoordinator::xWaiting[mwMAXWORK]
private

The workers waiting for their tasks.

Definition at line 232 of file mwcoord.h.

Referenced by ConnectWorkers(), DisconnectAll(), RunLoop(), and SaveWorkers().

◆ xWorking

mwWorkerData chomp::multiwork::mwCoordinator::xWorking[mwMAXWORK]
private

The workers processing their data.

Definition at line 238 of file mwcoord.h.

Referenced by DisconnectAll(), RunLoop(), and SaveWorkers().


The documentation for this class was generated from the following file: