The Original CHomP Software
mwtask.h
Go to the documentation of this file.
1
3
13
14// Copyright (C) 1997-2020 by Pawel Pilarczyk.
15//
16// This file is part of my research software package. This is free software:
17// you can redistribute it and/or modify it under the terms of the GNU
18// General Public License as published by the Free Software Foundation,
19// either version 3 of the License, or (at your option) any later version.
20//
21// This software is distributed in the hope that it will be useful,
22// but WITHOUT ANY WARRANTY; without even the implied warranty of
23// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24// GNU General Public License for more details.
25//
26// You should have received a copy of the GNU General Public License
27// along with this software; see the file "license.txt". If not,
28// please, see <https://www.gnu.org/licenses/>.
29
30// Started on August 11, 2004. Last revision: November 29, 2007.
31
32
33#ifndef _CHOMP_MULTIWORK_MWTASK_H_
34#define _CHOMP_MULTIWORK_MWTASK_H_
35
36#include <fstream>
37#include <string>
38#include <vector>
39#include <cstdlib> // for "atoi"
40#include <ctime> // for log start/stop time
41
45
46
47namespace chomp {
48namespace multiwork {
49
50// --------------------------------------------------
51// ----------------- message codes ------------------
52// --------------------------------------------------
53
57{
59 mwNoMsg = 0x0000,
60
63 mwInitMsg = 0x0001,
64
66 mwStdMsg = 0x0002,
67
69 mwKeepMsg = 0x0004,
70
72 mwDontKeepMsg = 0x0008,
73
75 mwByeMsg = 0x0010,
76
78 mwRejectedMsg = 0x0001,
79
81 mwPortMsg = 0x0002
82
83}; /* enum mwCodes */
84
85
86// --------------------------------------------------
87// --------------------- mwTask ---------------------
88// --------------------------------------------------
89
98class mwTask
99{
100public:
102 mwTask ();
103
105 virtual ~mwTask ();
106
107 // --- configuration ---
108
110 void Port (int number);
111
113 int Port () const;
114
116 void ControlNumber (unsigned int number);
117
119 unsigned int ControlNumber () const;
120
122 void TimeOut (int seconds);
123
125 int TimeOut () const;
126
127 // --- log file ---
128
132 int LogFile (const char *filename);
133
135 void LogFile (const mwTask &other);
136
139 void LogClose ();
140
141 // --- computer addresses ---
142
147 int Add (const char *name, int port = -1);
148
151 int Load (const char *filename);
152
153 // --- quit waiting workers ---
154
158 int QuitWorkers ();
159
160private:
161 // --- configuration data ---
162
165
168 unsigned int ctrlnum;
169
172
173 // --- log file ---
174
175protected:
177 std::ofstream *logFile;
178
179private:
182
183protected:
184 // --- network communication ---
185
189 static int SendMessage (int fd, unsigned int ctrl,
190 unsigned int code, const mwData &x);
191
195 static int RecvMessage (int fd, unsigned int &ctrl,
196 unsigned int &code, mwData &x);
197
198 // --- computer addresses ---
199
201 std::vector<std::string> computers;
202
204 std::vector<int> ports;
205
206private:
207 // --- other stuff ---
208
210 mwTask (const mwTask &) {}
211
213 mwTask &operator = (const mwTask &) {return *this;}
214
215}; /* class mwTask */
216
217// --------------------------------------------------
218
220 portnum (mwPORT),
221 ctrlnum (mwCTRLNUM),
222 timeout (mwTIMEOUT),
223 logFile (0),
224 logBorrowed (false)
225{
226 return;
227} /* mwTask::mwTask */
228
229// --------------------------------------------------
230
231inline void mwTask::LogClose ()
232{
233 if (!logFile)
234 return;
235 if (logBorrowed)
236 {
237 logFile = 0;
238 logBorrowed = false;
239 return;
240 }
241 std::time_t stop_time;
242 std::time (&stop_time);
243 *logFile << "\nMultiWork log file closed on " <<
244 std::asctime (std::localtime (&stop_time)) << "\n"
245 "-----------------------------------------------------\n" <<
246 std::endl;
247 delete logFile;
248 logFile = 0;
249 return;
250} /* mwTask::LogClose */
251
252inline int mwTask::LogFile (const char *filename)
253{
254 // close the current log if in use
255 if (logFile)
256 LogClose ();
257
258 // if no file name supplied, return now
259 if (!filename || !*filename)
260 return mwOk;
261
262 // create a file stream variable
263 logFile = new std::ofstream;
264 if (!logFile)
265 return mwError;
266
267 // open the log file for appending
268 logFile -> open (filename, std::ios::app);
269
270 // write the current time to the log
271 std::time_t start_time;
272 std::time (&start_time);
273 *logFile << "MultiWork log file opened on " <<
274 std::asctime (std::localtime (&start_time)) << std::endl;
275
276 // if unable to open the file or an error occurred, return mwError
277 if (!*logFile)
278 {
279 delete logFile;
280 return mwError;
281 }
282
283 return mwOk;
284} /* mwTask::LogFile */
285
286inline void mwTask::LogFile (const mwTask &other)
287{
288 // close the current log if in use
289 LogClose ();
290
291 // if there is no other log file, do nothing
292 if (!other. logFile)
293 return;
294
295 // borrow the log file pointer
296 logBorrowed = true;
297 logFile = other. logFile;
298
299 return;
300} /* mwTask::LogFile */
301
302// --------------------------------------------------
303
305{
306 // close the log file if it was in use
307 LogClose ();
308
309 return;
310} /* mwTask::~mwTask */
311
312// --------------------------------------------------
313
314inline void mwTask::Port (int number)
315{
316 if (number >= 0)
317 portnum = static_cast<short int> (number);
318 return;
319} /* mwTask::Port */
320
321inline int mwTask::Port () const
322{
323 return portnum;
324} /* mwTask::Port */
325
326inline void mwTask::ControlNumber (unsigned int number)
327{
328 ctrlnum = number;
329 return;
330} /* mwTask::ControlNumber */
331
332inline unsigned int mwTask::ControlNumber () const
333{
334 return ctrlnum;
335} /* mwTask::ControlNumber */
336
337inline void mwTask::TimeOut (int seconds)
338{
339 timeout = seconds;
340 return;
341} /* mwTask::TimeOut */
342
343inline int mwTask::TimeOut () const
344{
345 return timeout;
346} /* mwTask::TimeOut */
347
348// --------------------------------------------------
349
350inline int mwTask::Add (const char *name, int port)
351{
352 // if the name is empty, ignore it
353 if (!name || !*name)
354 return mwError;
355
356 // determine whether the name contains a colon and port number
357 int pos = 1;
358 while (name [pos])
359 ++ pos;
360 -- pos;
361 while (pos && (name [pos] != ':') &&
362 (name [pos] >= '0') && (name [pos] <= '9'))
363 {
364 -- pos;
365 }
366
367 // if the name contains colon and some digits after the colon...
368 if (pos && (name [pos] == ':') && name [pos + 1])
369 {
370 // append the computer name and the chosen port number
371 char *compname = new char [pos + 1];
372 for (int i = 0; i < pos; ++ i)
373 compname [i] = name [i];
374 compname [pos] = '\0';
375 port = std::atoi (name + pos + 1);
376 if (port <= 0)
377 return mwError;
378 computers. push_back (std::string (compname));
379 ports. push_back (port);
380 delete [] compname;
381 }
382 else
383 {
384 // if the port number is not reasonable, use the default one
385 if (port <= 0)
386 port = portnum;
387 if (port <= 0)
388 return mwError;
389 computers. push_back (std::string (name));
390 ports. push_back (port);
391 }
392 return mwOk;
393} /* mwTask::Add */
394
395inline int mwTask::Load (const char *filename)
396{
397 std::ifstream f (filename);
398 if (!f)
399 return mwError;
400
401 char buf [512];
402 int counter = 0;
403 while (1)
404 {
405 *buf = '\0';
406 f. getline (buf, 512, '\n');
407 if ((*buf == ';') || (*buf == '#') || (*buf == '/'))
408 continue;
409 if (*buf)
410 {
411 int result = this -> Add (buf, portnum);
412 if (result == mwOk)
413 ++ counter;
414 }
415 if (!f)
416 return counter;
417 }
418} /* mwTask::Load */
419
420
421// --------------------------------------------------
422// -------------------- messages --------------------
423// --------------------------------------------------
424
427inline int mwTask::SendMessage (int fd, unsigned int ctrl,
428 unsigned int code, const mwData &x)
429{
430 mwData sending;
431 sending << ctrl << code << x. Length () << x;
432 return mwSendBytes (fd, sending. Buffer (), sending. Length ());
433} /* SendMessage */
434
437inline int mwTask::RecvMessage (int fd, unsigned int &ctrl,
438 unsigned int &code, mwData &x)
439{
440 // read the code and length of the message
441 char buf00 [12];
442 int result = mwRecvBytes (fd, buf00, 12);
443 if (result < 0)
444 return result;
445 unsigned char *buf0 = reinterpret_cast<unsigned char *> (buf00);
446
447 // extract the length of the message
448 int len = (int) (buf0 [8]) << 24;
449 len |= (int) (buf0 [9]) << 16;
450 len |= (int) (buf0 [10]) << 8;
451 len |= (int) (buf0 [11]);
452 if (len < 0)
453 return mwError;
454
455 // extract the control code of the message
456 ctrl = (int) (buf0 [0]) << 24;
457 ctrl |= (int) (buf0 [1]) << 16;
458 ctrl |= (int) (buf0 [2]) << 8;
459 ctrl |= (int) (buf0 [3]);
460
461 // extract the code of the message
462 code = (int) (buf0 [4]) << 24;
463 code |= (int) (buf0 [5]) << 16;
464 code |= (int) (buf0 [6]) << 8;
465 code |= (int) (buf0 [7]);
466
467 // if the message length is zero, no more reading is necessary
468 if (!len)
469 {
470 x. Reset ();
471 return mwOk;
472 }
473
474 // prepare a memory buffer for the message
475 char *buf = new char [len];
476 if (!buf)
477 return mwError;
478
479 // read the message
480 result = mwRecvBytes (fd, buf, len);
481 if (result < 0)
482 {
483 delete [] buf;
484 return result;
485 }
486
487 // transform the message to mw data
488 x. Take (buf, len);
489 return mwOk;
490} /* RecvMessage */
491
493{
494 // write to the log file what you are doing
495 if (logFile)
496 *logFile << "Turning workers off..." << std::endl;
497
498 // try connecting to each worker and ask them to exit
499 int counter = 0;
500 for (unsigned int n = 0; n < computers. size (); ++ n)
501 {
502 // retrieve the computer name and port from the list
503 const char *name = computers [n]. c_str ();
504 int port = ports [n];
505
506 // if no valid name or port number read, skip this item
507 if (!*name || !port)
508 continue;
509
510 // try connecting to the computer
511 int fd = mwConnect (name, port);
512
513 // if unsuccessful, make a note and take another one
514 if (fd < 0)
515 {
516 if (logFile)
517 {
518 *logFile << "Worker " << name << ":" <<
519 port << " could not be contacted." <<
520 std::endl;
521 }
522 continue;
523 }
524
525 // prepare the control code to send to the worker
526 unsigned int code = mwByeMsg | mwDontKeepMsg;
527
528 // send the 'Bye!' message to the worker and disconnect it
529 mwData empty;
530 int result = this -> SendMessage (fd, ctrlnum, code, empty);
531 mwDisconnect (fd);
532
533 // add an appropriate message to the log file
534 if (result == mwOk)
535 {
536 ++ counter;
537 if (logFile)
538 {
539 *logFile << "Worker " << name << ":" <<
540 port << " exited successfully." <<
541 std::endl;
542 }
543 }
544 else if (logFile)
545 {
546 *logFile << "Error while sending the disconnect "
547 "message to " << name << ":" << port <<
548 "." << std::endl;
549 }
550 }
551
552 // write to the log file how many workers were turned off
553 if (logFile)
554 *logFile << counter << " worker(s) have been shut down." <<
555 std::endl;
556
557 return mwOk;
558} /* mwTask::QuitWorkers */
559
560
561} // namespace multiwork
562} // namespace chomp
563
564#endif // _CHOMP_MULTIWORK_MWTASK_H_
565
567
This class is used to convert data structures into a single sequence of bytes and to retrieve this da...
Definition: mwdata.h:67
This class defines a generic task object (coordinator or worker) for the multi-work distributed compu...
Definition: mwtask.h:99
int Load(const char *filename)
Loads computer addresses from the given file.
Definition: mwtask.h:395
static int SendMessage(int fd, unsigned int ctrl, unsigned int code, const mwData &x)
Sends a message with data to the given socket.
Definition: mwtask.h:427
int Port() const
Returns the current port number.
Definition: mwtask.h:321
std::ofstream * logFile
The debug log file stream.
Definition: mwtask.h:177
int TimeOut() const
Returns the currently set network connection time-out interval.
Definition: mwtask.h:343
int QuitWorkers()
Quits all the workers whose addresses were added with the 'Add' and 'Load' functions.
Definition: mwtask.h:492
void LogClose()
Closes the log file and adds a line with the time information unless this log file was borrowed from ...
Definition: mwtask.h:231
unsigned int ctrlnum
The control number that is used to recognize a compatible worker or a compatible coordinator.
Definition: mwtask.h:168
mwTask()
The default constructor.
Definition: mwtask.h:219
unsigned int ControlNumber() const
Returns the currently set identification control number.
Definition: mwtask.h:332
static int RecvMessage(int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
Receives a message with data from the given socket.
Definition: mwtask.h:437
virtual ~mwTask()
The destructor.
Definition: mwtask.h:304
bool logBorrowed
Is this log file pointer borrowed from another task?
Definition: mwtask.h:181
int timeout
The network communication time-out in seconds.
Definition: mwtask.h:171
mwTask & operator=(const mwTask &)
The assignment operator is forbidden.
Definition: mwtask.h:213
int LogFile(const char *filename)
Begins logging detailed communication debug information to the given file.
Definition: mwtask.h:252
std::vector< std::string > computers
A list of workers or coordinators to connect to at start-up.
Definition: mwtask.h:201
std::vector< int > ports
A list of port numbers of workers to connect to at start-up.
Definition: mwtask.h:204
mwTask(const mwTask &)
The copy constructor is forbidden.
Definition: mwtask.h:210
int Add(const char *name, int port=-1)
Adds an address to the list of computers to connect to at the beginning of working or coordinating.
Definition: mwtask.h:350
int portnum
The network communication port number.
Definition: mwtask.h:164
#define mwPORT
The default port number used for the communication between the coordinator and workers.
Definition: mwconfig.h:120
#define mwTIMEOUT
The default time-out in seconds (8 hours).
Definition: mwconfig.h:130
#define mwCTRLNUM
The default control number used for data identification.
Definition: mwconfig.h:125
This file contains the basic configuration of the MultiWork module, mainly as a definition of a serie...
This file contains the definition of the MultiWork data class.
This file contains the definition of some low-level functions for the TCP/IP streams communication an...
int mwSendBytes(int fd, const char *buf, int len)
Sends the given buffer to the given socket.
@ mwError
A serious error occurred.
Definition: mwconfig.h:162
@ mwOk
Everything is fine.
Definition: mwconfig.h:159
void mwDisconnect(int fd)
Disconnects the given socket.
int mwRecvBytes(int fd, char *buf, int len)
Receives the given amount of data from the given socket.
int mwConnect(const char *name, int port)
Connects to the given computer at the given port.
mwCodes
Various message codes used for the communication between the coordinator and workers.
Definition: mwtask.h:57
@ mwKeepMsg
Message to the Worker: Keep running after having disconnected.
Definition: mwtask.h:69
@ mwStdMsg
Message to the Worker: A standard piece of data to be processed.
Definition: mwtask.h:66
@ mwInitMsg
Message to the Worker: A piece of initialization data to be processed.
Definition: mwtask.h:63
@ mwRejectedMsg
Message to the Coordinator: The data has been rejected.
Definition: mwtask.h:78
@ mwByeMsg
Message to the Worker: Please, disconnect.
Definition: mwtask.h:75
@ mwPortMsg
Message to the Coordinator: I will listen at this port number.
Definition: mwtask.h:81
@ mwNoMsg
Message: No special code information included.
Definition: mwtask.h:59
@ mwDontKeepMsg
Message to the Worker: Don't keep running after disconnecting.
Definition: mwtask.h:72
This namespace contains the entire CHomP library interface.
Definition: bitmaps.h:51