The Original CHomP Software
mwcoord.h
Go to the documentation of this file.
1
3
13
14// Copyright (C) 1997-2020 by Pawel Pilarczyk.
15//
16// This file is part of my research software package. This is free software:
17// you can redistribute it and/or modify it under the terms of the GNU
18// General Public License as published by the Free Software Foundation,
19// either version 3 of the License, or (at your option) any later version.
20//
21// This software is distributed in the hope that it will be useful,
22// but WITHOUT ANY WARRANTY; without even the implied warranty of
23// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24// GNU General Public License for more details.
25//
26// You should have received a copy of the GNU General Public License
27// along with this software; see the file "license.txt". If not,
28// please, see <https://www.gnu.org/licenses/>.
29
30// Started on August 11, 2004. Last revision: December 11, 2007.
31
32
33#ifndef _CHOMP_MULTIWORK_MWCOORD_H_
34#define _CHOMP_MULTIWORK_MWCOORD_H_
35
36#include <algorithm>
37#include <string>
38#include <cstring>
39
44
45
46namespace chomp {
47namespace multiwork {
48
49class mwWorker;
50
51
52// --------------------------------------------------
53// ------------------ mwWorkerData ------------------
54// --------------------------------------------------
55
59{
60public:
62 mwWorkerData ();
63
66
69
73 int fd;
74
76 std::string name;
77
79 int port;
80
83 int status;
84
86 friend void swap (mwWorkerData &data1, mwWorkerData &data2);
87
88private:
90 mwWorkerData (const mwWorkerData &) {return;};
91
93 mwWorkerData &operator = (const mwWorkerData &) {return *this;};
94
95}; /* class mwWorkerData */
96
98 fd (-1), name (""), port (0), status (0)
99{
100 return;
101} /* mwWorkerData::mwWorkerData */
102
104{
105 return;
106} /* mwWorkerData::~mwWorkerData */
107
108inline void swap (mwWorkerData &data1, mwWorkerData &data2)
109{
110 swap (data1. data, data2. data);
111 std::swap (data1. fd, data2. fd);
112 std::swap (data1. name, data2. name);
113 std::swap (data1. port, data2. port);
114 std::swap (data1. status, data2. status);
115 return;
116} /* swap */
117
118
119// --------------------------------------------------
120// ----------------- mwCoordinator ------------------
121// --------------------------------------------------
122
126class mwCoordinator: public virtual mwTask
127{
128public:
129 // --- constructor/destructor ---
130
132 mwCoordinator ();
133
135 virtual ~mwCoordinator ();
136
137 // --- configuration ---
138
140 void KeepWorkers (bool keep = true);
141
144 int SaveWorkers (const char *filename);
145
146 // --- initialization data ---
147
151 void Init (mwData &data);
152
153 // --- run the coordinator ---
154
159 int Coordinate (mwWorker *w = NULL);
160
161private:
162 // --- coordinator's job ---
163
170 virtual int Prepare (mwData &data);
171
176 virtual int Accept (mwData &data);
177
184 virtual int Reject (mwData &data);
185
186 // --- the coordinator's functions ---
187
195 int RunLoop (bool no_more_data);
196
199 int RunLoopLocally ();
200
202 void ConnectWorkers ();
203
205 void BeginListening ();
206
208 void DisconnectAll ();
209
210 // --- configuration ---
211
214
217
220
221 // --- initialization data ---
222
225
226 // --- data processing ---
227
230
233
236
239
241 int nToDo;
242
245
248
251
253 int nDone;
254
257
258 // --- data used for the communication with workers ---
259
262
263 // --- operations on arrays (tables) ---
264
267 static void mwTableDel (int *tab, int len, int pos);
268
272 static void mwTableDel (mwData *tab, int len, int pos);
273
274 // --- network communiation ---
275
278 int SendMessageC (int fd, unsigned int code, const mwData &x) const;
279
282 int RecvMessageC (int fd, unsigned int &code, mwData &x) const;
283
284}; /* class mwCoordinator */
285
286// --------------------------------------------------
287
289#if mwNETWORK
290 singleWork (false),
291#else
292 singleWork (true),
293#endif
294 localWorker (0),
295 keepWorkers (false),
296 nWaiting (0),
297 nWorking (0),
298 nToDo (0),
299 nRejected (0),
300 nDone (0),
301 listensocket (-1)
302{
303 return;
304} /* mwCoordinator::mwCoordinator */
305
307{
308 // release all the workers if they are still connected
309 for (int i = 0; i < nWaiting + nWorking; ++ i)
310 {
311 // determine the particular worker
312 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
313 xWorking [i - nWaiting];
314
315 // if this worker is already disconnected, take the next one
316 if (w. fd < 0)
317 continue;
318
319 // prepare the code to send to the worker
320 unsigned int code = mwByeMsg;
322
323 // send the 'Bye!' message to the worker
324 mwData empty;
325 SendMessageC (w. fd, code, empty);
326
327 // disconnect the worker
328 mwDisconnect (w. fd);
329 w. fd = -1;
330
331 // make a note of what happened in the log file
332 if (logFile)
333 *logFile << "Worker " << i << " (" << w. name <<
334 ":" << w. port << ") disconnected and " <<
335 (keepWorkers ? "waiting." : "exited.") <<
336 std::endl;
337 }
338
339 return;
340} /* mwCoordinator::Disconnect */
341
343{
344 // disconnect all the workers
345 DisconnectAll ();
346
347 return;
348} /* mwCoordinator::~mwCoordinator */
349
350inline void mwCoordinator::KeepWorkers (bool keep)
351{
352 keepWorkers = keep;
353 return;
354} /* mwCoordinator::KeepWorkers */
355
356inline int mwCoordinator::SaveWorkers (const char *filename)
357{
358 // create a file for the list of workers
359 // prepare to create a file to list the workers
360 std::ofstream f;
361 bool first = false;
362
363 // go through all the connected workers
364 int counter = 0;
365 for (int i = 0; i < nWaiting + nWorking; ++ i)
366 {
367 // determine the particular worker
368 mwWorkerData &w = (i < nWaiting) ? xWaiting [i] :
369 xWorking [i - nWaiting];
370
371 // determine the host name
372 const char *host = w. name. c_str ();
373
374 // if no host name or port number is known, skip it
375 if (!host || !*host || (w. port <= 0))
376 continue;
377
378 // check if the same worker was already listed
379 int j;
380 for (j = 0; j < i; ++ j)
381 {
382 // determine the other worker
383 mwWorkerData &v = (j < nWaiting) ? xWaiting [j] :
384 xWorking [j - nWaiting];
385
386 // determine the name of the other host
387 const char *host_j = v. name. c_str ();
388
389 // if no host name or port number is known, skip it
390 if (!host_j || !*host_j || (v. port <= 0))
391 continue;
392
393 // if the ports are different, skip the other worker
394 if (w. port != v. port)
395 continue;
396
397 // if the names are different, skip the other worker
398 if (std::strcmp (host, host_j))
399 continue;
400
401 // make a note that this is the same worker
402 j = i + 1;
403 break;
404 }
405 if (j > i)
406 continue;
407
408 // create the file if this is the first time to write data
409 if (first)
410 {
411 f. open (filename, std::ios::out | std::ios::trunc);
412 if (!f)
413 return mwError;
414 f << "; A list of currently running workers:\n";
415 first = false;
416 }
417
418 // write the address
419 f << host << ":" << w. port << "\n";
420 ++ counter;
421 }
422
423 // exit if no workers have been listed
424 if (!counter)
425 return mwOk;
426
427 // add a summary if any workers have been listed
428 f << "; A total of " << counter << " workers";
429 if (counter != nWaiting + nWorking)
430 f << " out of " << (nWaiting + nWorking);
431 f << " saved.\n";
432
433 if (!keepWorkers)
434 f << "; The workers will exit upon disconnection.\n";
435 else
436 f << "; The workers will remain running "
437 "after disconnection.\n";
438 f. close ();
439 if (!f)
440 return mwError;
441 else
442 return mwOk;
443} /* mwCoordinator::SaveWorkers */
444
445// --------------------------------------------------
446
448{
449 return mwNoData;
450} /* mwCoordinator::Prepare */
451
453{
454 return mwOk;
455} /* mwCoordinator::Accept */
456
458{
459 return mwOk;
460} /* mwCoordinator::Reject */
461
462// --------------------------------------------------
463
464inline void mwCoordinator::Init (mwData &data)
465{
466 initData. Take (data);
467 return;
468} /* mwCoordinator::Init */
469
470
471// --------------------------------------------------
472
473inline int mwCoordinator::SendMessageC (int fd, unsigned int code,
474 const mwData &x) const
475{
476 // prepare the control number to send
477 unsigned int ctrl = this -> ControlNumber ();
478
479 // send the message with the control number and the message code
480 return this -> SendMessage (fd, ctrl, code, x);
481} /* mwCoordinator::SendMessageC */
482
483inline int mwCoordinator::RecvMessageC (int fd, unsigned int &code,
484 mwData &x) const
485{
486 // receive the message
487 unsigned int ctrl = 0;
488 int result = this -> RecvMessage (fd, ctrl, code, x);
489
490 // if there was an error then return its code
491 if (result != mwOk)
492 return result;
493
494 // if the control number is correct then finish successfully
495 if (ctrl == ~(this -> ControlNumber ()))
496 return mwOk;
497
498 // if the control number is wrong then return an error code
499 if (logFile)
500 *logFile << "Wrong control code received "
501 "from the worker: " << ~ctrl << "." << std::endl;
502 return mwError;
503} /* mwCoordinator::SendMessageC */
504
505// --------------------------------------------------
506
508{
509 if (!localWorker)
510 return mwError;
511
512 // indicate that some worker is continuously waiting for data
513 if (!nWaiting)
514 ++ nWaiting;
515
516 // process the data
517 while ((nDone < mwMAXWORK) && (nRejected < mwMAXWORK) && (nToDo > 0))
518 {
519 // process the data and replace it with the result
520 -- nToDo;
521 xToDo [nToDo]. Rewind ();
522 int result = localWorker -> Process (xToDo [nToDo]);
523 xToDo [nToDo]. Rewind ();
524
525 // if the data caused an error then abort the loop
526 if (result == mwError)
527 {
528 if (logFile)
529 *logFile << "Data processing failed." <<
530 std::endl;
531 return mwError;
532 }
533
534 // if the data was rejected
535 // then move it to the right place
536 else if (result == mwReject)
537 {
538 xRejected [nRejected]. Take (xToDo [nToDo]);
539 ++ nRejected;
540 }
541
542 // if the data was accepted
543 // then move it to the right place
544 else
545 {
546 xDone [nDone]. Take (xToDo [nToDo]);
547 ++ nDone;
548 }
549 }
550
551 return mwOk;
552} /* mwCoordinator::RunLoopLocally */
553
555{
556 int nComputers = computers. size ();
557 for (int i = 0; (nWaiting < mwMAXWORK - 1) && (i < nComputers); ++ i)
558 {
559 // determine the computer name and port number
560 const std::string &name = computers [i];
561 int port = ports [i];
562
563 // open the connection
564 int fd = mwConnect (name. c_str (), port);
565
566 // if the connection attempt was successful, add this worker
567 if (fd >= 0)
568 {
569 if (logFile)
570 *logFile << "Connected to " << name << ":" <<
571 port << "." << std::endl;
573 w. fd = fd;
574 w. name = name;
575 w. port = port;
576 ++ nWaiting;
577
578 // prepare the initialization code
579 int code = mwInitMsg |
581
582 // send the initialization data to the worker
583 int initResult = SendMessageC (w. fd, code,
584 initData);
585 if (initResult != mwOk)
586 {
587 if (logFile)
588 *logFile << "Error while sending "
589 "the initialization data." <<
590 std::endl;
591 -- nWaiting;
592 }
593 }
594
595 // if unable to connect, make only a note in the log
596 else if (logFile)
597 *logFile << "Connection attempt to " << name <<
598 ":" << port << " failed." << std::endl;
599 }
600 return;
601} /* mwCoordinator::ConnectWorkers */
602
604{
605 // if there is no valid port number then cancel this operation
606 if (this -> Port () <= 0)
607 return;
608
609 // open a listening socket at the given port
610 listensocket = mwListen (this -> Port (), 15);
611
612 // add a message to the log file whether it was successful or not
613 if (logFile)
614 {
615 if (listensocket < 0)
616 *logFile << "Listening attempt at port " <<
617 this -> Port () << " failed." << std::endl;
618 else
619 *logFile << "Waiting for workers at port " <<
620 this -> Port () << "." << std::endl;
621 }
622
623 return;
624} /* mwCoordinator::BeginListening */
625
626inline int mwCoordinator::RunLoop (bool no_more_data)
627{
628 if (false && logFile)
629 *logFile << "\nDebug0: " << nWaiting << " waiting, " <<
630 nWorking << " working, " << nToDo <<
631 " data pieces." << std::endl;
632
633 // if not listening and there are no workers then try switching
634 // to the single-work mode, unless there is no local worker
635 if ((listensocket < 0) && !nWorking && !nWaiting)
636 {
637 // switch to the single-work mode if allowed to
638 if (localWorker)
639 {
640 singleWork = true;
641 if (logFile)
642 *logFile << "All workers disconnected. "
643 "Switching to the single-work "
644 "mode." << std::endl;
645 return mwOk;
646 }
647
648 // if data cannot be processed locally, report a failure
649 else
650 {
651 if (logFile)
652 *logFile << "Failure: All workers "
653 "disconnected. The work cannot be "
654 "continued." << std::endl;
655 return mwError;
656 }
657 }
658
659 // TO DO: If there is no more data (options & mwNoMoreData)
660 // and some workers are idle, and some others are working
661 // for a long time, send their data also to a few idle
662 // workers... This requires some A.I., of course. ;)
663
664 // the i/o flags ans sockets of the workers + one for 'listensocket'
665 int ioflags [mwMAXWORK];
666 int sockets [mwMAXWORK];
667 int nWorkers = nToDo ? (nWorking + nWaiting) : nWorking;
668
669 // prepare flags for the working and waiting workers
670 for (int i = 0; i < nWorking; ++ i)
671 {
672 ioflags [i] = mwCanRead;
673 sockets [i] = xWorking [i]. fd;
674 }
675 if (nWorkers > nWorking)
676 {
677 for (int i = 0; i < nWaiting; ++ i)
678 {
679 ioflags [nWorking + i] = mwCanWrite;
680 sockets [nWorking + i] = xWaiting [i]. fd;
681 }
682 }
683
684 // prepare the listen socket flag
685 bool listening = false;
686 int listenflag = nWorkers;
687 if ((listensocket >= 0) && (nWorking + nWaiting < mwMAXWORK - 1))
688 {
689 ioflags [listenflag] = mwCanRead;
690 listening = true;
691 }
692 else
693 ioflags [listenflag] = mwNone;
694
695 // determine whether it is necessary to wait or not
696 int timelimit = this -> TimeOut ();
697 if (localWorker && !nWorking && !nWaiting)
698 timelimit = 0;
699 if (listening && !no_more_data && !nToDo && !nWorking && !nWaiting)
700 timelimit = 0;
701 if (!no_more_data && !nToDo && nWaiting && (nWorking < mwMAXWORK))
702 timelimit = 0;
703// if (no_more_data && !nToDo && !nWorking)
704// timelimit = 0;
705
706 // report the select's parameters to the log file
707 if (logFile)
708 {
709 *logFile << ">>> Select, t = " << timelimit << ", flags =";
710 for (int i = 0; i <= nWorkers; ++ i)
711 *logFile << " " << ioflags [i];
712 *logFile << "." << std::endl;
713 }
714
715 // wait until data can be received from or sent to any socket
716 int result = mwSelect (sockets, nWorkers,
717 listening ? listensocket : -1, ioflags, timelimit);
718
719 // report the returned parametrs to the log file
720 if (logFile)
721 {
722 *logFile << ">>> Returned flags =";
723 for (int i = 0; i <= nWorkers; ++ i)
724 *logFile << " " << ioflags [i];
725 *logFile << "." << std::endl;
726 }
727
728 // in case of select's failure, exit the loop with a failure message
729 if (result == mwError)
730 {
731 if (logFile)
732 *logFile << "Error: The 'select' function failed." <<
733 std::endl;
734 return mwError;
735 }
736
737 // report a time-out if necessary
738 if ((timelimit > 0) && (result == mwTimeOut))
739 {
740 if (logFile)
741 *logFile << "Time-out occurred at 'select'." <<
742 std::endl;
743 }
744
745 // receive data from all the workers who are ready
746 for (int i = 0; (i < nWorking) && (nDone < mwMAXWORK) &&
747 (nRejected < mwMAXWORK) && (nToDo < mwMAXWORK); ++ i)
748 {
749 // if this worker is not ready, yet, then skip it
750 if (!(ioflags [i] & mwCanRead))
751 continue;
752
753 // receive the entire data chunk from the worker
754 unsigned int code = 0;
755 int result = RecvMessageC (sockets [i], code, xDone [nDone]);
756
757 // remember which worker this is
758 mwWorkerData &w = xWorking [i];
759
760 // reject the worker in case of error
761 if (result < 0)
762 {
763 // log the details of what happened
764 if (logFile)
765 {
766 *logFile << "Worker " << i;
767 if (!w. name. empty ())
768 *logFile << " (" << w. name << ")";
769 *logFile << " disconnected: ";
770 if (result == mwLost)
771 *logFile << "Connection lost.";
772 else
773 *logFile << "An error occurred.";
774 *logFile << std::endl;
775 }
776
777 // disconnect the worker
778 mwDisconnect (sockets [i]);
779
780 // move the worker's data back to the "to-do" list
781 xToDo [nToDo]. Take (w. data);
782 ++ nToDo;
783
784 // make a note of this in the worker's data
785 w. fd = -1;
786 w. status = -1;
787 }
788
789 // if transmitting the port number only, take it
790 else if (code & mwPortMsg)
791 {
792 // retrieve the port number
793 xDone [nDone] >> w. port;
794
795 // report this fact to the log file
796 if (logFile)
797 *logFile << "Port number " << w. port <<
798 " received from worker " << i <<
799 "." << std::endl;
800 }
801
802 // if the data was rejected, move it to 'xRejected'
803 else if (code & mwRejectedMsg)
804 {
805 // report what happened
806 if (logFile)
807 *logFile << "Data was rejected by worker " <<
808 i << "." << std::endl;
809
810 // move the data to the rejected table
811 xRejected [nRejected]. Take (w. data);
812 ++ nRejected;
813
814 // indicate the status change of this worker
815 w. status = 1;
816 }
817
818 // if the data was Ok, the result is in 'xDone'
819 else
820 {
821 // report this to the log file
822 if (logFile)
823 *logFile << "Processed data received from "
824 "worker " << i << "." << std::endl;
825
826 // accept this piece of data
827 ++ nDone;
828
829 // indicate the status change of this worker
830 w. status = 1;
831 }
832 }
833
834 // send data to those workers who are ready
835 for (int i = 0; (i < nWaiting) && nToDo; ++ i)
836 {
837 // remember the worker and its offset
838 mwWorkerData &w = xWaiting [i];
839 int offset = nWorking + i;
840
841 // if this worker is not ready to get data then skip it
842 if (!(ioflags [offset] & mwCanWrite))
843 continue;
844
845 // prepare a message code to send
846 unsigned int code = mwStdMsg;
847
848 // send a data chunk for processing
849 int result = SendMessageC (sockets [offset], code,
850 xToDo [nToDo - 1]);
851
852 // if the data was sent successfully
853 if (result == mwOk)
854 {
855 // take the data chunk to the worker
856 -- nToDo;
857 w. data. Take (xToDo [nToDo]);
858
859 // indicate the status of the worker
860 w. status = 1;
861
862 // report this to the log file
863 if (logFile)
864 *logFile << "Data " << nToDo << " sent to "
865 "worker " << i << "." << std::endl;
866 }
867
868 // if an error occurred, reject the worker
869 else
870 {
871 // report this situation to the log file
872 if (logFile)
873 *logFile << "Worker " << i << " disconnected"
874 ": " << ((result == mwLost) ?
875 "Connection lost." :
876 "An error occurred.") << std::endl;
877
878 // disconnect the worker
879 mwDisconnect (sockets [offset]);
880
881 // modify the status of the worker accordingly
882 w. fd = -1;
883 w. status = -1;
884 }
885 }
886
887 if (false && logFile)
888 {
889 *logFile << "Debug1: " << nWaiting << " waiting:";
890 for (int i = 0; i < nWaiting; ++ i)
891 *logFile << " " << xWaiting [i]. status;
892 *logFile << "; " << nWorking << " working:";
893 for (int i = 0; i < nWorking; ++ i)
894 *logFile << " " << xWorking [i]. status;
895 *logFile << std::endl;
896 }
897
898 // purge workers who have been disconnected and move workers
899 // whose data has been acquired to the 'waiting' queue
900 for (int i = 0; i < nWorking; ++ i)
901 {
902 // skip this worker if it is fine
903 if (xWorking [i]. status == 0)
904 continue;
905
906 // swap this worker with the last one if necessary
907 if (i < nWorking - 1)
908 swap (xWorking [i], xWorking [nWorking - 1]);
909
910 // move the worker to the waiting queue if necessary
911 if (xWorking [nWorking - 1]. status > 0)
912 {
913 xWorking [nWorking - 1]. status = 0;
915 ++ nWaiting;
916 }
917
918 // remove this worker from the working queue
919 -- nWorking;
920
921 // consider the same entry in the table again
922 -- i;
923 }
924
925 // purge workers who have been disconnected and move workers
926 // who received data to the 'working' queue
927 for (int i = 0; i < nWaiting; ++ i)
928 {
929 // skip this worker if it is fine
930 if (xWaiting [i]. status == 0)
931 continue;
932
933 // swap this worker with the last one if necessary
934 if (i < nWaiting - 1)
935 swap (xWaiting [i], xWaiting [nWaiting - 1]);
936
937 // move the worker to the working queue if necessary
938 if (xWaiting [nWaiting - 1]. status > 0)
939 {
940 xWaiting [nWaiting - 1]. status = 0;
942 ++ nWorking;
943 }
944
945 // remove this worker from the waiting queue
946 -- nWaiting;
947
948 // consider the same entry in the table again
949 -- i;
950 }
951
952 if (false && logFile)
953 *logFile << "Debug2: " << nWaiting << " waiting, " <<
954 nWorking << " working, " << nToDo <<
955 " data pieces." << std::endl;
956
957 // accept connections from new workers if any
958 if (listening && (ioflags [listenflag] & mwCanRead) &&
960 {
961 // accept the connection
963 w. name = std::string ("");
964 w. port = 0;
965 w. status = 0;
966 w. fd = mwAccept (listensocket, w. name);
967
968 // if the new worker has been accepted successfully
969 if (w. fd >= 0)
970 {
971 // report the worker's acceptance
972 if (logFile)
973 *logFile << "A worker from '" << w. name <<
974 "' accepted." << std::endl;
975
976 // add the worker to the waiting queue
977 ++ nWaiting;
978
979 // prepare the initialization code
980 int code = mwInitMsg |
982
983 // send the initialization data to the worker
984 int initResult = SendMessageC (w. fd, code,
985 initData);
986 if (initResult != mwOk)
987 {
988 if (logFile)
989 *logFile << "Error while sending "
990 "the initialization data." <<
991 std::endl;
992 -- nWaiting;
993 }
994 }
995
996 // report the error to the log file if not successful
997 else if (logFile)
998 *logFile << "Unsuccessful connection of a worker "
999 "from '" << w. name << "'." << std::endl;
1000 }
1001
1002 if (false && logFile)
1003 *logFile << "Debug3: " << nWaiting << " waiting, " <<
1004 nWorking << " working, " << nToDo <<
1005 " data pieces." << std::endl;
1006
1007 // ask for some data to process if none is waiting
1008 // or too much data acquired from workers is accumulated
1009 if (localWorker && !nWorking && !nWaiting &&
1010 !no_more_data && (result == mwTimeOut) && !nToDo)
1011 {
1012 if (logFile)
1013 *logFile << "Asking for some data to be "
1014 "processed locally." << std::endl;
1015 return mwOk;
1016 }
1017
1018 // run the work locally as a last resort
1019 if (localWorker && !nWorking && !nWaiting && (result == mwTimeOut) &&
1020 nToDo && (nDone < mwMAXWORK) && (nRejected < mwMAXWORK))
1021 {
1022 // make a note in the log file of what is going on
1023 if (logFile)
1024 *logFile << "Processing data locally." << std::endl;
1025
1026 // process one piece of data
1027 -- nToDo;
1028 xToDo [nToDo]. Rewind ();
1029 int result = localWorker -> Process (xToDo [nToDo]);
1030 xToDo [nToDo]. Rewind ();
1031
1032 if (result == mwReject)
1033 {
1034 xRejected [nRejected]. Take (xToDo [nToDo]);
1035 ++ nRejected;
1036 }
1037 else if (result == mwError)
1038 {
1039 if (logFile)
1040 *logFile << "Data processing failed." <<
1041 std::endl;
1042 return mwError;
1043 }
1044 else
1045 {
1046 xDone [nDone]. Take (xToDo [nToDo]);
1047 ++ nDone;
1048 }
1049 }
1050
1051 if (false && logFile)
1052 *logFile << "Debug4: " << nWaiting << " waiting, " <<
1053 nWorking << " working, " << nToDo <<
1054 " data pieces." << std::endl;
1055
1056 return mwOk;
1057} /* mwCoordinator::RunLoop */
1058
1059// --------------------------------------------------
1060
1062{
1063 // remember the local worker's address
1064 localWorker = w;
1065
1066 // initialize the local worker if any
1067 if (localWorker)
1068 localWorker -> Initialize (initData);
1069
1070 if (logFile)
1071 {
1072 *logFile << "Running as a COORDINATOR." << std::endl;
1073
1074 // indicate what kind of network connection is in use
1075 #if !mwNETWORK
1076 *logFile << "There is no network in use." << std::endl;
1077 #elif mwWXWIN
1078 *logFile << "Using the sockets interface "
1079 "provided by wxWindows." << std::endl;
1080 #else
1081 *logFile << "Using the standard sockets "
1082 "for network connections." << std::endl;
1083 #endif
1084
1085 // say if running in the single-work mode only
1086 if (singleWork && localWorker)
1087 *logFile << "Running in the single-work mode." <<
1088 std::endl;
1089 }
1090
1091 // connect to workers on the list and begin listening if necessary
1092 if (!singleWork)
1093 {
1094 this -> ConnectWorkers ();
1095 this -> BeginListening ();
1096 }
1097
1098 // if not listening and there are no workers then try switching
1099 // to the single-work mode, unless there is no local worker
1100 if (!singleWork && (listensocket < 0) && !nWaiting)
1101 {
1102 // switch to the single-work mode if allowed to
1103 if (localWorker)
1104 {
1105 singleWork = true;
1106 if (logFile)
1107 *logFile << "No remote workers. Switching "
1108 "to the single-work mode." <<
1109 std::endl;
1110 }
1111
1112 // if data cannot be processed locally, report a failure
1113 else
1114 {
1115 if (logFile)
1116 *logFile << "Failure: No workers." <<
1117 std::endl;
1118 return mwError;
1119 }
1120 }
1121
1122 // is there no more data to be sent?
1123 bool no_more_data = false;
1124
1125 while (1)
1126 {
1127 // run the communications loop
1128 int loopresult = singleWork ? this -> RunLoopLocally () :
1129 this -> RunLoop (no_more_data);
1130
1131 // stop if the coordinator failed badly
1132 if (loopresult == mwError)
1133 return mwError;
1134
1135 // if some data was rejected, process all this data
1136 while (nRejected > 0)
1137 {
1138 // run the user's procedure to acquire rejected data
1139 -- nRejected;
1140 xRejected [nRejected]. Rewind ();
1141 int result = this -> Reject (xRejected [nRejected]);
1142
1143 // interrupt if the user says that an error occurred
1144 if (result != mwOk)
1145 return mwError;
1146
1147 // reset the data acquired by the user
1148 xRejected [nRejected]. Reset ();
1149 no_more_data = false;
1150 }
1151
1152 // if some new data arrived, process all this data
1153 while (nDone > 0)
1154 {
1155 // call the user's procedure to accept the data piece
1156 -- nDone;
1157 xDone [nDone]. Rewind ();
1158 int result = this -> Accept (xDone [nDone]);
1159
1160 // interrupt if the user says that an error occurred
1161 if (result != mwOk)
1162 return mwError;
1163
1164 // reset the data acquired by the user
1165 xDone [nDone]. Reset ();
1166 no_more_data = false;
1167 }
1168
1169 // determine whether a new data item must be prepared:
1170 // if there are workers waiting and there is no data then YES
1171 bool hungry = (nWaiting > 0) && !nToDo;
1172 // if there is no worker and no data then YES
1173 if ((localWorker || (listensocket >= 0)) &&
1174 !nWorking && !nWaiting && !nToDo)
1175 {
1176 hungry = true;
1177 }
1178 // if no more data is needed then definitely NO
1179 if (no_more_data)
1180 hungry = false;
1181
1182 // prepare a new data item if necessary
1183 if (hungry)
1184 {
1185 // run the user's procedure for preparing data
1186 int result = this -> Prepare (xToDo [nToDo]);
1187
1188 // break if the user says that an error occurred
1189 if (result == mwError)
1190 return mwError;
1191
1192 // make a note if there is no more data
1193 else if (result == mwNoData)
1194 no_more_data = true;
1195
1196 // add the data piece to the work queue otherwise
1197 else
1198 ++ nToDo;
1199 }
1200
1201 // stop if the tasks are completed and there is no more data
1202 if (no_more_data && !nWorking && !nToDo)
1203 return mwOk;
1204 }
1205} /* mwCoordinator::Coordinate */
1206
1207// --------------------------------------------------
1208
1209inline void mwCoordinator::mwTableDel (int *tab, int len, int pos)
1210// Delete the entry at position 'pos' from the table of 'len' positions.
1211// Shift all the positions after 'pos' to the back.
1212{
1213 for (int i = pos + 1; i < len; ++ i)
1214 tab [i - 1] = tab [i];
1215 return;
1216} /* mwTableDel */
1217
1218inline void mwCoordinator::mwTableDel (mwData *tab, int len, int pos)
1219// Delete the entry at position 'pos' from the table of 'len' positions.
1220// Shift all the positions after 'pos' to the back.
1221{
1222 for (int i = pos + 1; i < len; ++ i)
1223 tab [i - 1]. Take (tab [i]);
1224 return;
1225} /* mwTableDel */
1226
1227
1228} // namespace multiwork
1229} // namespace chomp
1230
1231#endif // _CHOMP_MULTIWORK_MWCOORD_H_
1232
1234
This class defines a generic coordinator task object for the multi-work distributed computations fram...
Definition: mwcoord.h:127
int nToDo
The number of data pieces to be sent to working tasks.
Definition: mwcoord.h:241
mwData xDone[mwMAXWORK]
The recently finished pieces of data.
Definition: mwcoord.h:256
mwData xToDo[mwMAXWORK]
The data pieces waiting to be sent to working tasks.
Definition: mwcoord.h:244
int RunLoopLocally()
Runs the main communication loop using the local worker.
Definition: mwcoord.h:507
int SaveWorkers(const char *filename)
Saves addresses of workers to the given file in the form of address:port.
Definition: mwcoord.h:356
mwWorker * localWorker
The address of a local worker or 0 if none is available.
Definition: mwcoord.h:216
int listensocket
The socket number at which new workers are listened to.
Definition: mwcoord.h:261
int RecvMessageC(int fd, unsigned int &code, mwData &x) const
Receives a message with data from the socket of a worker.
Definition: mwcoord.h:483
mwWorkerData xWaiting[mwMAXWORK]
The workers waiting for their tasks.
Definition: mwcoord.h:232
void BeginListening()
Begins listening at the given port.
Definition: mwcoord.h:603
virtual int Accept(mwData &data)
Accepts a result received from a worker.
Definition: mwcoord.h:452
mwWorkerData xWorking[mwMAXWORK]
The workers processing their data.
Definition: mwcoord.h:238
virtual ~mwCoordinator()
The virtual destructor.
Definition: mwcoord.h:342
void KeepWorkers(bool keep=true)
Makes workers keep running after the coordinator's completion.
Definition: mwcoord.h:350
int SendMessageC(int fd, unsigned int code, const mwData &x) const
Sends a message with data to the given socket as a coordinator.
Definition: mwcoord.h:473
int nRejected
The number of recently rejected pieces of data.
Definition: mwcoord.h:247
mwCoordinator()
The default constructor.
Definition: mwcoord.h:288
int nWaiting
The number of workers waiting for their data.
Definition: mwcoord.h:229
static void mwTableDel(int *tab, int len, int pos)
A helper function for deleting a table entry and shifting the remainder of the table backwards.
Definition: mwcoord.h:1209
void Init(mwData &data)
Defines a portion of initialization data which will be sent to every newly connected worker.
Definition: mwcoord.h:464
int RunLoop(bool no_more_data)
Runs the main communication loop: Sends the data to workers and receives the results of their computa...
Definition: mwcoord.h:626
virtual int Prepare(mwData &data)
Prepares a piece of data to be sent to a worker.
Definition: mwcoord.h:447
bool singleWork
Should data be processed locally only?
Definition: mwcoord.h:213
int nWorking
The number of workers processing their data.
Definition: mwcoord.h:235
mwData initData
The initialization data that has to be sent to workers.
Definition: mwcoord.h:224
void DisconnectAll()
Disconnects all the workers (normally called in the destructor).
Definition: mwcoord.h:306
void ConnectWorkers()
Connects to all workers in the list.
Definition: mwcoord.h:554
mwData xRejected[mwMAXWORK]
The recently rejected pieces of data.
Definition: mwcoord.h:250
virtual int Reject(mwData &data)
Acknowledges data that was rejected by a worker.
Definition: mwcoord.h:457
int Coordinate(mwWorker *w=NULL)
Run the coordinator until all the work has been completed.
Definition: mwcoord.h:1061
int nDone
The number of recently finished pieces of data.
Definition: mwcoord.h:253
bool keepWorkers
Should the workers be kept running after coordinator is done?
Definition: mwcoord.h:219
This class is used to convert data structures into a single sequence of bytes and to retrieve this da...
Definition: mwdata.h:67
This class defines a generic task object (coordinator or worker) for the multi-work distributed compu...
Definition: mwtask.h:99
static int SendMessage(int fd, unsigned int ctrl, unsigned int code, const mwData &x)
Sends a message with data to the given socket.
Definition: mwtask.h:427
int Port() const
Returns the current port number.
Definition: mwtask.h:321
std::ofstream * logFile
The debug log file stream.
Definition: mwtask.h:177
int TimeOut() const
Returns the currently set network connection time-out interval.
Definition: mwtask.h:343
unsigned int ControlNumber() const
Returns the currently set identification control number.
Definition: mwtask.h:332
static int RecvMessage(int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
Receives a message with data from the given socket.
Definition: mwtask.h:437
std::vector< std::string > computers
A list of workers or coordinators to connect to at start-up.
Definition: mwtask.h:201
std::vector< int > ports
A list of port numbers of workers to connect to at start-up.
Definition: mwtask.h:204
A helper class for storing data on a single worker.
Definition: mwcoord.h:59
std::string name
The computer name of the worker.
Definition: mwcoord.h:76
~mwWorkerData()
The destructor.
Definition: mwcoord.h:103
int status
The status of the worker: 0 = ok, -1 = failed, 1 = data acquired from a working one or data sent to a...
Definition: mwcoord.h:83
friend void swap(mwWorkerData &data1, mwWorkerData &data2)
Swaps data between two objects of this type.
Definition: mwcoord.h:108
mwWorkerData()
The default constructor.
Definition: mwcoord.h:97
mwWorkerData & operator=(const mwWorkerData &)
The assignment operator is not allowed.
Definition: mwcoord.h:93
mwWorkerData(const mwWorkerData &)
The copy constructor is not allowed.
Definition: mwcoord.h:90
int fd
The file descriptor number of the socket for the communication with the given worker.
Definition: mwcoord.h:73
int port
The port number at which the worker is going to listen next time.
Definition: mwcoord.h:79
mwData data
The data which was sent do the worker and is being processed.
Definition: mwcoord.h:68
This class defines a generic worker task object for the multi-work distributed computations framework...
Definition: mwworker.h:55
#define mwNETWORK
This constant is defined as 1 if the network environment is present and the network communication rou...
Definition: mwconfig.h:92
#define mwMAXWORK
The maximal number of simultaneously connected workers.
Definition: mwconfig.h:143
This file contains the basic configuration of the MultiWork module, mainly as a definition of a serie...
This file contains the definition of the MultiWork data class.
This file contains the definition of some low-level functions for the TCP/IP streams communication an...
This file contains the definition of the MultiWork task class.
void swap(mwWorkerData &data1, mwWorkerData &data2)
Definition: mwcoord.h:108
@ mwTimeOut
A connection time out has occurred.
Definition: mwlowlev.h:63
@ mwCanWrite
Writing possible.
Definition: mwlowlev.h:60
@ mwCanRead
Reading possible.
Definition: mwlowlev.h:57
@ mwNone
No flag selected.
Definition: mwlowlev.h:54
@ mwLost
The network connection has been lost.
Definition: mwlowlev.h:66
@ mwNoData
There is no data to be sent to workers, for example, because everything has been already sent.
Definition: mwconfig.h:166
@ mwReject
The data has been rejected.
Definition: mwconfig.h:169
@ mwError
A serious error occurred.
Definition: mwconfig.h:162
@ mwOk
Everything is fine.
Definition: mwconfig.h:159
int mwListen(int port, int queuesize)
Begins listening at the given port.
int mwSelect(const int *workers, int nworkers, int listensocket, int *ioflags, int timeout)
Determines IOflags for each of the workers and additionally the listensocket (the last flag).
int mwAccept(int fd, std::string &computer, int timeout=-1)
Waits for and accepts a connection at the given socket.
void mwDisconnect(int fd)
Disconnects the given socket.
int mwConnect(const char *name, int port)
Connects to the given computer at the given port.
@ mwKeepMsg
Message to the Worker: Keep running after having disconnected.
Definition: mwtask.h:69
@ mwStdMsg
Message to the Worker: A standard piece of data to be processed.
Definition: mwtask.h:66
@ mwInitMsg
Message to the Worker: A piece of initialization data to be processed.
Definition: mwtask.h:63
@ mwRejectedMsg
Message to the Coordinator: The data has been rejected.
Definition: mwtask.h:78
@ mwByeMsg
Message to the Worker: Please, disconnect.
Definition: mwtask.h:75
@ mwPortMsg
Message to the Coordinator: I will listen at this port number.
Definition: mwtask.h:81
@ mwDontKeepMsg
Message to the Worker: Don't keep running after disconnecting.
Definition: mwtask.h:72
This namespace contains the entire CHomP library interface.
Definition: bitmaps.h:51