33#ifndef _CHOMP_MULTIWORK_MWCOORD_H_
34#define _CHOMP_MULTIWORK_MWCOORD_H_
98 fd (-1), name (
""), port (0), status (0)
110 swap (data1. data, data2. data);
114 std::swap (data1. status, data2. status);
195 int RunLoop (
bool no_more_data);
267 static void mwTableDel (
int *tab,
int len,
int pos);
333 *
logFile <<
"Worker " << i <<
" (" << w. name <<
334 ":" << w. port <<
") disconnected and " <<
372 const char *host = w. name. c_str ();
375 if (!host || !*host || (w. port <= 0))
380 for (j = 0; j < i; ++ j)
387 const char *host_j = v. name. c_str ();
390 if (!host_j || !*host_j || (v. port <= 0))
394 if (w. port != v. port)
398 if (std::strcmp (host, host_j))
411 f. open (filename, std::ios::out | std::ios::trunc);
414 f <<
"; A list of currently running workers:\n";
419 f << host <<
":" << w. port <<
"\n";
428 f <<
"; A total of " << counter <<
" workers";
434 f <<
"; The workers will exit upon disconnection.\n";
436 f <<
"; The workers will remain running "
437 "after disconnection.\n";
487 unsigned int ctrl = 0;
488 int result =
this ->
RecvMessage (fd, ctrl, code, x);
500 *
logFile <<
"Wrong control code received "
501 "from the worker: " << ~ctrl <<
"." << std::endl;
529 *
logFile <<
"Data processing failed." <<
561 int port =
ports [i];
564 int fd =
mwConnect (name. c_str (), port);
570 *
logFile <<
"Connected to " << name <<
":" <<
571 port <<
"." << std::endl;
585 if (initResult !=
mwOk)
588 *
logFile <<
"Error while sending "
589 "the initialization data." <<
597 *
logFile <<
"Connection attempt to " << name <<
598 ":" << port <<
" failed." << std::endl;
606 if (
this ->
Port () <= 0)
616 *
logFile <<
"Listening attempt at port " <<
617 this ->
Port () <<
" failed." << std::endl;
619 *
logFile <<
"Waiting for workers at port " <<
620 this ->
Port () <<
"." << std::endl;
631 " data pieces." << std::endl;
642 *
logFile <<
"All workers disconnected. "
643 "Switching to the single-work "
644 "mode." << std::endl;
652 *
logFile <<
"Failure: All workers "
653 "disconnected. The work cannot be "
654 "continued." << std::endl;
685 bool listening =
false;
686 int listenflag = nWorkers;
693 ioflags [listenflag] =
mwNone;
696 int timelimit =
this ->
TimeOut ();
709 *
logFile <<
">>> Select, t = " << timelimit <<
", flags =";
710 for (
int i = 0; i <= nWorkers; ++ i)
711 *
logFile <<
" " << ioflags [i];
716 int result =
mwSelect (sockets, nWorkers,
722 *
logFile <<
">>> Returned flags =";
723 for (
int i = 0; i <= nWorkers; ++ i)
724 *
logFile <<
" " << ioflags [i];
732 *
logFile <<
"Error: The 'select' function failed." <<
738 if ((timelimit > 0) && (result ==
mwTimeOut))
741 *
logFile <<
"Time-out occurred at 'select'." <<
754 unsigned int code = 0;
767 if (!w. name. empty ())
768 *
logFile <<
" (" << w. name <<
")";
771 *
logFile <<
"Connection lost.";
773 *
logFile <<
"An error occurred.";
797 *
logFile <<
"Port number " << w. port <<
798 " received from worker " << i <<
807 *
logFile <<
"Data was rejected by worker " <<
808 i <<
"." << std::endl;
823 *
logFile <<
"Processed data received from "
824 "worker " << i <<
"." << std::endl;
865 "worker " << i <<
"." << std::endl;
873 *
logFile <<
"Worker " << i <<
" disconnected"
874 ": " << ((result ==
mwLost) ?
876 "An error occurred.") << std::endl;
955 " data pieces." << std::endl;
958 if (listening && (ioflags [listenflag] &
mwCanRead) &&
963 w. name = std::string (
"");
973 *
logFile <<
"A worker from '" << w. name <<
974 "' accepted." << std::endl;
986 if (initResult !=
mwOk)
989 *
logFile <<
"Error while sending "
990 "the initialization data." <<
998 *
logFile <<
"Unsuccessful connection of a worker "
999 "from '" << w. name <<
"'." << std::endl;
1005 " data pieces." << std::endl;
1013 *
logFile <<
"Asking for some data to be "
1014 "processed locally." << std::endl;
1024 *
logFile <<
"Processing data locally." << std::endl;
1040 *
logFile <<
"Data processing failed." <<
1054 " data pieces." << std::endl;
1072 *
logFile <<
"Running as a COORDINATOR." << std::endl;
1076 *
logFile <<
"There is no network in use." << std::endl;
1078 *
logFile <<
"Using the sockets interface "
1079 "provided by wxWindows." << std::endl;
1081 *
logFile <<
"Using the standard sockets "
1082 "for network connections." << std::endl;
1087 *
logFile <<
"Running in the single-work mode." <<
1107 *
logFile <<
"No remote workers. Switching "
1108 "to the single-work mode." <<
1116 *
logFile <<
"Failure: No workers." <<
1123 bool no_more_data =
false;
1129 this ->
RunLoop (no_more_data);
1149 no_more_data =
false;
1166 no_more_data =
false;
1194 no_more_data =
true;
1213 for (
int i = pos + 1; i < len; ++ i)
1214 tab [i - 1] = tab [i];
1222 for (
int i = pos + 1; i < len; ++ i)
1223 tab [i - 1]. Take (tab [i]);
This class defines a generic coordinator task object for the multi-work distributed computations fram...
int nToDo
The number of data pieces to be sent to working tasks.
mwData xDone[mwMAXWORK]
The recently finished pieces of data.
mwData xToDo[mwMAXWORK]
The data pieces waiting to be sent to working tasks.
int RunLoopLocally()
Runs the main communication loop using the local worker.
int SaveWorkers(const char *filename)
Saves addresses of workers to the given file in the form of address:port.
mwWorker * localWorker
The address of a local worker or 0 if none is available.
int listensocket
The socket number at which new workers are listened to.
int RecvMessageC(int fd, unsigned int &code, mwData &x) const
Receives a message with data from the socket of a worker.
mwWorkerData xWaiting[mwMAXWORK]
The workers waiting for their tasks.
void BeginListening()
Begins listening at the given port.
virtual int Accept(mwData &data)
Accepts a result received from a worker.
mwWorkerData xWorking[mwMAXWORK]
The workers processing their data.
virtual ~mwCoordinator()
The virtual destructor.
void KeepWorkers(bool keep=true)
Makes workers keep running after the coordinator's completion.
int SendMessageC(int fd, unsigned int code, const mwData &x) const
Sends a message with data to the given socket as a coordinator.
int nRejected
The number of recently rejected pieces of data.
mwCoordinator()
The default constructor.
int nWaiting
The number of workers waiting for their data.
static void mwTableDel(int *tab, int len, int pos)
A helper function for deleting a table entry and shifting the remainder of the table backwards.
void Init(mwData &data)
Defines a portion of initialization data which will be sent to every newly connected worker.
int RunLoop(bool no_more_data)
Runs the main communication loop: Sends the data to workers and receives the results of their computa...
virtual int Prepare(mwData &data)
Prepares a piece of data to be sent to a worker.
bool singleWork
Should data be processed locally only?
int nWorking
The number of workers processing their data.
mwData initData
The initialization data that has to be sent to workers.
void DisconnectAll()
Disconnects all the workers (normally called in the destructor).
void ConnectWorkers()
Connects to all workers in the list.
mwData xRejected[mwMAXWORK]
The recently rejected pieces of data.
virtual int Reject(mwData &data)
Acknowledges data that was rejected by a worker.
int Coordinate(mwWorker *w=NULL)
Run the coordinator until all the work has been completed.
int nDone
The number of recently finished pieces of data.
bool keepWorkers
Should the workers be kept running after coordinator is done?
This class is used to convert data structures into a single sequence of bytes and to retrieve this da...
This class defines a generic task object (coordinator or worker) for the multi-work distributed compu...
static int SendMessage(int fd, unsigned int ctrl, unsigned int code, const mwData &x)
Sends a message with data to the given socket.
int Port() const
Returns the current port number.
std::ofstream * logFile
The debug log file stream.
int TimeOut() const
Returns the currently set network connection time-out interval.
unsigned int ControlNumber() const
Returns the currently set identification control number.
static int RecvMessage(int fd, unsigned int &ctrl, unsigned int &code, mwData &x)
Receives a message with data from the given socket.
std::vector< std::string > computers
A list of workers or coordinators to connect to at start-up.
std::vector< int > ports
A list of port numbers of workers to connect to at start-up.
A helper class for storing data on a single worker.
std::string name
The computer name of the worker.
~mwWorkerData()
The destructor.
int status
The status of the worker: 0 = ok, -1 = failed, 1 = data acquired from a working one or data sent to a...
friend void swap(mwWorkerData &data1, mwWorkerData &data2)
Swaps data between two objects of this type.
mwWorkerData()
The default constructor.
mwWorkerData & operator=(const mwWorkerData &)
The assignment operator is not allowed.
mwWorkerData(const mwWorkerData &)
The copy constructor is not allowed.
int fd
The file descriptor number of the socket for the communication with the given worker.
int port
The port number at which the worker is going to listen next time.
mwData data
The data which was sent do the worker and is being processed.
This class defines a generic worker task object for the multi-work distributed computations framework...
#define mwNETWORK
This constant is defined as 1 if the network environment is present and the network communication rou...
#define mwMAXWORK
The maximal number of simultaneously connected workers.
This file contains the basic configuration of the MultiWork module, mainly as a definition of a serie...
This file contains the definition of the MultiWork data class.
This file contains the definition of some low-level functions for the TCP/IP streams communication an...
This file contains the definition of the MultiWork task class.
void swap(mwWorkerData &data1, mwWorkerData &data2)
@ mwTimeOut
A connection time out has occurred.
@ mwCanWrite
Writing possible.
@ mwCanRead
Reading possible.
@ mwNone
No flag selected.
@ mwLost
The network connection has been lost.
@ mwNoData
There is no data to be sent to workers, for example, because everything has been already sent.
@ mwReject
The data has been rejected.
@ mwError
A serious error occurred.
@ mwOk
Everything is fine.
int mwListen(int port, int queuesize)
Begins listening at the given port.
int mwSelect(const int *workers, int nworkers, int listensocket, int *ioflags, int timeout)
Determines IOflags for each of the workers and additionally the listensocket (the last flag).
int mwAccept(int fd, std::string &computer, int timeout=-1)
Waits for and accepts a connection at the given socket.
void mwDisconnect(int fd)
Disconnects the given socket.
int mwConnect(const char *name, int port)
Connects to the given computer at the given port.
@ mwKeepMsg
Message to the Worker: Keep running after having disconnected.
@ mwStdMsg
Message to the Worker: A standard piece of data to be processed.
@ mwInitMsg
Message to the Worker: A piece of initialization data to be processed.
@ mwRejectedMsg
Message to the Coordinator: The data has been rejected.
@ mwByeMsg
Message to the Worker: Please, disconnect.
@ mwPortMsg
Message to the Coordinator: I will listen at this port number.
@ mwDontKeepMsg
Message to the Worker: Don't keep running after disconnecting.
This namespace contains the entire CHomP library interface.