The Original CHomP Software
mwworker.h
Go to the documentation of this file.
1
3
13
14// Copyright (C) 1997-2020 by Pawel Pilarczyk.
15//
16// This file is part of my research software package. This is free software:
17// you can redistribute it and/or modify it under the terms of the GNU
18// General Public License as published by the Free Software Foundation,
19// either version 3 of the License, or (at your option) any later version.
20//
21// This software is distributed in the hope that it will be useful,
22// but WITHOUT ANY WARRANTY; without even the implied warranty of
23// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24// GNU General Public License for more details.
25//
26// You should have received a copy of the GNU General Public License
27// along with this software; see the file "license.txt". If not,
28// please, see <https://www.gnu.org/licenses/>.
29
30// Started on August 11, 2004. Last revision: December 3, 2010.
31
32
33#ifndef _CHOMP_MULTIWORK_MWWORKER_H_
34#define _CHOMP_MULTIWORK_MWWORKER_H_
35
40
41namespace chomp {
42namespace multiwork {
43
44class mwCoordinator;
45
46
47// --------------------------------------------------
48// -------------------- mwWorker --------------------
49// --------------------------------------------------
50
54class mwWorker: public virtual mwTask
55{
56public:
58 mwWorker ();
59
61 virtual ~mwWorker ();
62
65 int Work ();
66
69 void KeepWorker (bool keep = true);
70
71private:
72 // --- worker's job ---
73
80 virtual int Process (mwData &data);
81
89 virtual int Initialize (mwData &data);
90
95 int WorkOne (int fd);
96
97 // --- configuration ---
98
101
102 // grant access to the protected functions and data to a coordinator
103 friend class mwCoordinator;
104
105 // --- network communiation ---
106
109 int SendMessageW (int fd, unsigned int code, const mwData &x) const;
110
113 int RecvMessageW (int fd, unsigned int &code, mwData &x) const;
114
115}; /* class mwWorker */
116
117// --------------------------------------------------
118
119inline mwWorker::mwWorker (): keepWorker (false)
120{
121 return;
122} /* mwWorker::mwWorker */
123
125{
126 return;
127} /* mwWorker::~mwWorker */
128
129// --------------------------------------------------
130
131inline void mwWorker::KeepWorker (bool keep)
132{
133 keepWorker = keep;
134 return;
135} /* mwWorker::KeepWorkers */
136
137// --------------------------------------------------
138
140{
141 return mwOk;
142} /* mwWorker::Process */
143
145{
146 return mwOk;
147} /* mwWorker::Initialize */
148
149// --------------------------------------------------
150
151inline int mwWorker::SendMessageW (int fd, unsigned int code,
152 const mwData &x) const
153{
154 // prepare the negation of the control number to send
155 unsigned int ctrl = ~(this -> ControlNumber ());
156
157 // send the message with the control number and the message code
158 return this -> SendMessage (fd, ctrl, code, x);
159} /* mwWorker::SendMessageW */
160
161inline int mwWorker::RecvMessageW (int fd, unsigned int &code,
162 mwData &x) const
163{
164 // receive the message
165 unsigned int ctrl = 0;
166 int result = this -> RecvMessage (fd, ctrl, code, x);
167
168 // if there was an error then return its code
169 if (result != mwOk)
170 return result;
171
172 // if the control number is correct then finish successfully
173 if (ctrl == this -> ControlNumber ())
174 return mwOk;
175
176 // if the control number is wrong then return an error code
177 if (logFile)
178 *logFile << "Wrong control code received "
179 "from the coordinator: " << ctrl << "." << std::endl;
180 return mwError;
181} /* mwWorker::RecvMessageW */
182
183// --------------------------------------------------
184
185inline int mwWorker::WorkOne (int fd)
186{
187 // be ready to send the port number at which the worker is listening
188 bool sendPort = (this -> Port () > 0);
189 int timelimit = this -> TimeOut ();
190
191 while (1)
192 {
193 // disconnect if the coordinator is not responding
194 if (timelimit > 0)
195 {
196 // call the function 'select' to wait
197 // until data is available for reading
198 int portArray [1];
199 portArray [0] = fd;
200 int ioFlags [2];
201 ioFlags [0] = mwCanRead;
202 ioFlags [1] = mwNone;
203 int result = mwSelect (portArray, 1, fd, ioFlags,
204 timelimit);
205
206 // disconnect in case of an I/O error
207 if (result == mwError)
208 {
209 if (logFile)
210 *logFile << "Error while "
211 "waiting for data from the "
212 "coordinator." << std::endl;
213 return mwError;
214 }
215
216 // finish if disconnected
217 if (result == mwLost)
218 {
219 if (logFile)
220 *logFile << "Disconnected while "
221 "waiting for data from the "
222 "coordinator." << std::endl;
223 return mwOk;
224 }
225
226 // finish if not responding
227 if (result == mwTimeOut)
228 {
229 if (logFile)
230 *logFile << "Time out reached while "
231 "waiting for data from the "
232 "coordinator." << std::endl;
233 return mwOk;
234 }
235 }
236
237 // receive a message
238 mwData msg;
239 unsigned int code = 0;
240 int result = RecvMessageW (fd, code, msg);
241
242 // disconnect in case of an I/O error
243 if (result == mwError)
244 {
245 if (logFile)
246 *logFile << "Error while receiving data "
247 "from the coordinator." << std::endl;
248 return mwError;
249 }
250
251 // disconnect if the connection was lost
252 if (result == mwLost)
253 {
254 if (logFile)
255 *logFile << "The connection closed "
256 "by the coordinator." << std::endl;
257 return mwOk;
258 }
259
260 // initialize the worker if requested to
261 if (code & mwInitMsg)
262 {
263 if (logFile)
264 *logFile << "Initializing the worker." <<
265 std::endl;
266
267 // update the 'keepWorker' flag if requested to
268 if (code & mwKeepMsg)
269 keepWorker = true;
270 if (code & mwDontKeepMsg)
271 keepWorker = false;
272
273 // initialize the data
274 int initResult = Initialize (msg);
275
276 // if the initialization fails then quit the work
277 if (initResult != mwOk)
278 {
279 if (logFile)
280 *logFile << "The initialization "
281 "failed." << std::endl;
282 return mwError;
283 }
284 }
285
286 // disconnect if requested to
287 if (code & mwByeMsg)
288 {
289 // update the 'keepWorker' flag if requested to
290 if (code & mwKeepMsg)
291 keepWorker = true;
292 if (code & mwDontKeepMsg)
293 keepWorker = false;
294
295 if (logFile)
296 *logFile << "Disconnecting upon "
297 "coordinator's request." <<
298 std::endl;
299 return mwOk;
300 }
301
302 // skip the rest if there is no standard message to process
303 if (!(code & mwStdMsg))
304 continue;
305
306 // prepare a basis for the returned code
307 unsigned int retcode = 0;
308
309 // process the message data
310 result = Process (msg);
311
312 // if an error occurred, disconnect and quit
313 if (result == mwError)
314 {
315 if (logFile)
316 *logFile << "Data processing failed." <<
317 std::endl;
318 return mwError;
319 }
320
321 // if rejected, reset the data and set the rejection flag
322 else if (result == mwReject)
323 {
324 if (logFile)
325 *logFile << "* Data rejected." << std::endl;
326 msg. Reset ();
327 retcode |= mwRejectedMsg;
328 }
329
330 // if processed successfully, make a note of it
331 else
332 {
333 if (logFile)
334 *logFile << "* Data processed." << std::endl;
335 }
336
337 // send port number if relevant
338 if (sendPort)
339 {
340 // send the port number at which the worker listens
341 mwData d;
342 d << this -> Port ();
343 unsigned int code = mwPortMsg;
344 int res = SendMessageW (fd, code, d);
345
346 // quit in case of failure
347 if (res < 0)
348 {
349 if (logFile)
350 *logFile << "Error while sending "
351 "the port number." <<
352 std::endl;
353 return mwError;
354 }
355
356 // make a note of having sent the port number
357 sendPort = false;
358 if (logFile)
359 *logFile << "* Port number sent." <<
360 std::endl;
361 }
362
363 // send the result of the processed data
364 result = SendMessageW (fd, retcode, msg);
365
366 // quit if the connection was lost
367 if (result == mwLost)
368 {
369 if (logFile)
370 *logFile << "Connection lost while sending "
371 "data to the coordinator." <<
372 std::endl;
373 return mwError;
374 }
375
376 // quit if an error occurred
377 if (result != mwOk)
378 {
379 if (logFile)
380 *logFile << "Error while sending data." <<
381 std::endl;
382 return mwError;
383 }
384 }
385} /* mwWorker::WorkOne */
386
387inline int mwWorker::Work ()
388{
389 if (logFile)
390 {
391 *logFile << "Running as a WORKER." << std::endl;
392
393 // indicate what kind of network connection is in use
394 #if !mwNETWORK
395 *logFile << "There is no network in use, "
396 "so exiting right now." << std::endl;
397 #elif mwWXWIN
398 *logFile << "Using the sockets interface "
399 "provided by wxWindows." << std::endl;
400 #else
401 *logFile << "Using the standard sockets "
402 "for network connections." << std::endl;
403 #endif
404 }
405
406 // connect to the first comp. on the list that is running
407 int fd = -1;
408 for (unsigned cur = 0; cur < this -> computers. size (); ++ cur)
409 {
410 // retrieve the computer name and port from the list
411 int port = this -> ports [cur];
412 const char *name = this -> computers [cur]. c_str ();
413
414 // if no valid name or no port read, skip this data
415 if (!*name || !port)
416 continue;
417
418 // try connecting to the computer
419 fd = mwConnect (name, port);
420
421 // add an appropriate message to the log file
422 if (logFile)
423 {
424 *logFile << "Connection to " << name << ":" <<
425 port << ((fd < 0) ? " refused." :
426 " established.") << std::endl;
427 }
428 }
429
430 while (1)
431 {
432 // if not connected then try listening
433 if ((fd < 0) && (this -> Port () > 0))
434 {
435 // listen at the given port (use a very short queue)
436 if (logFile)
437 *logFile << "Waiting for a coordinator at "
438 "port " << this -> Port () << "." <<
439 std::endl;
440 int fdlisten = mwListen (this -> Port (), 1);
441
442 // if the listening failed, return with an error
443 if (fdlisten < 0)
444 {
445 if (logFile)
446 *logFile << "Error: This port is "
447 "probably in use." <<
448 std::endl;
449 return mwError;
450 }
451
452 // accept a connection and stop listening
453 std::string computer;
454 int timeout = this -> TimeOut ();
455 fd = mwAccept (fdlisten, computer, timeout);
456 mwDisconnect (fdlisten);
457
458 // if too much time elapsed, quit the job
459 if (fd == mwTimeOut)
460 {
461 if (logFile)
462 *logFile << "Time out. Exiting." <<
463 std::endl;
464 return mwOk;
465 }
466
467 // if an error occurred, quit the job
468 else if (fd < 0)
469 {
470 if (logFile)
471 *logFile << "Error while connecting "
472 "to a coordinator." <<
473 std::endl;
474 return mwError;
475 }
476
477 // report the fact of connection to the log file
478 if (logFile)
479 *logFile << "Connected to a coordinator "
480 "at '" << computer << "'." <<
481 std::endl;
482 }
483
484 // receive messages and work
485 int result = 0;
486 try
487 {
488 result = WorkOne (fd);
489 mwDisconnect (fd);
490 fd = -1;
491 }
492 catch (...)
493 {
494 mwDisconnect (fd);
495 if (logFile)
496 *logFile << "An exception was thrown while "
497 "processing data." << std::endl;
498 throw;
499 }
500
501 // quit if there is no point to listen to another coordinator
502 // or if an error occurred
503 if ((this -> Port () <= 0) || !keepWorker ||
504 (result != mwOk))
505 {
506 return result;
507 }
508
509 // add a line to the log file between working sessions
510 if (logFile)
511 *logFile << "============================" <<
512 std::endl;
513 }
514} /* mwWorker::Work */
515
516
517} // namespace multiwork
518} // namespace chomp
519
520#endif // _CHOMP_MULTIWORK_MWWORKER_H_
521
523
This class defines a generic coordinator task object for the multi-work distributed computations fram...
Definition: mwcoord.h:127
This class is used to convert data structures into a single sequence of bytes and to retrieve this da...
Definition: mwdata.h:67
This class defines a generic task object (coordinator or worker) for the multi-work distributed compu...
Definition: mwtask.h:99
static int SendMessage(int fd, unsigned int ctrl, unsigned int code, const mwData &x)
Sends a message with data to the given socket.
Definition: mwtask.h:427
int Port() const
Returns the current port number.
Definition: mwtask.h:321
std::ofstream * logFile
The debug log file stream.
Definition: mwtask.h:177
int TimeOut() const
Returns the currently set network connection time-out interval.
Definition: mwtask.h:343
unsigned int ControlNumber() const
Returns the currently set identification control number.
Definition: mwtask.h:332
static int RecvMessage(int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
Receives a message with data from the given socket.
Definition: mwtask.h:437
int timeout
The network communication time-out in seconds.
Definition: mwtask.h:171
std::vector< std::string > computers
A list of workers or coordinators to connect to at start-up.
Definition: mwtask.h:201
std::vector< int > ports
A list of port numbers of workers to connect to at start-up.
Definition: mwtask.h:204
This class defines a generic worker task object for the multi-work distributed computations framework...
Definition: mwworker.h:55
int WorkOne(int fd)
Runs one working session after having connected to the coordinator at the socket identified by 'fd'.
Definition: mwworker.h:185
int Work()
Runs the worker on this computer.
Definition: mwworker.h:387
virtual int Initialize(mwData &data)
This function is called to process initialization data, if any.
Definition: mwworker.h:144
virtual int Process(mwData &data)
This function is called to process a data portion and replace it with the result of computation.
Definition: mwworker.h:139
virtual ~mwWorker()
The destructor.
Definition: mwworker.h:124
mwWorker()
The default constructor.
Definition: mwworker.h:119
int RecvMessageW(int fd, unsigned int &code, mwData &x) const
Receives a message with data from the socket of a coordinator.
Definition: mwworker.h:161
int SendMessageW(int fd, unsigned int code, const mwData &x) const
Sends a message with data to the given socket as a worker.
Definition: mwworker.h:151
void KeepWorker(bool keep=true)
Makes the worker keep running after the coordinator has disconnected.
Definition: mwworker.h:131
bool keepWorker
Should the worker remain running after coordinator disconnects?
Definition: mwworker.h:100
This file contains the basic configuration of the MultiWork module, mainly as a definition of a serie...
This file contains the definition of the MultiWork data class.
This file contains the definition of some low-level functions for the TCP/IP streams communication an...
This file contains the definition of the MultiWork task class.
@ mwTimeOut
A connection time out has occurred.
Definition: mwlowlev.h:63
@ mwCanRead
Reading possible.
Definition: mwlowlev.h:57
@ mwNone
No flag selected.
Definition: mwlowlev.h:54
@ mwLost
The network connection has been lost.
Definition: mwlowlev.h:66
@ mwReject
The data has been rejected.
Definition: mwconfig.h:169
@ mwError
A serious error occurred.
Definition: mwconfig.h:162
@ mwOk
Everything is fine.
Definition: mwconfig.h:159
int mwListen(int port, int queuesize)
Begins listening at the given port.
int mwSelect(const int *workers, int nworkers, int listensocket, int *ioflags, int timeout)
Determines IOflags for each of the workers and additionally the listensocket (the last flag).
int mwAccept(int fd, std::string &computer, int timeout=-1)
Waits for and accepts a connection at the given socket.
void mwDisconnect(int fd)
Disconnects the given socket.
int mwConnect(const char *name, int port)
Connects to the given computer at the given port.
@ mwKeepMsg
Message to the Worker: Keep running after having disconnected.
Definition: mwtask.h:69
@ mwStdMsg
Message to the Worker: A standard piece of data to be processed.
Definition: mwtask.h:66
@ mwInitMsg
Message to the Worker: A piece of initialization data to be processed.
Definition: mwtask.h:63
@ mwRejectedMsg
Message to the Coordinator: The data has been rejected.
Definition: mwtask.h:78
@ mwByeMsg
Message to the Worker: Please, disconnect.
Definition: mwtask.h:75
@ mwPortMsg
Message to the Coordinator: I will listen at this port number.
Definition: mwtask.h:81
@ mwDontKeepMsg
Message to the Worker: Don't keep running after disconnecting.
Definition: mwtask.h:72
This namespace contains the entire CHomP library interface.
Definition: bitmaps.h:51