OpenM++ runtime library (libopenm)
runControllerImpl.h
Go to the documentation of this file.
1
5// Copyright (c) 2013-2015 OpenM++
6// This code is licensed under the MIT license (see LICENSE.txt for details)
7
8#ifndef RUN_CTRL_IMPL_H
9#define RUN_CTRL_IMPL_H
10
11#include <iterator>
12#include <unordered_set>
13#include "dbParameter.h"
14#include "dbOutputTable.h"
15#include "dbValue.h"
16
17using namespace std;
18
19namespace openm
20{
23 {
24 public:
26 SingleController(const ArgReader & i_argStore, IDbExec * i_dbExec) :
27 RunController(i_argStore),
28 runId(0),
29 taskId(0),
30 taskRunId(0),
31 isWaitTaskRun(false),
32 dbExec(i_dbExec),
33 isSubDone(subValueCount)
34 { }
35
37 virtual ~SingleController(void) noexcept { }
38
40 virtual int nextRun(void) override;
41
43 virtual void readParameter(
44 const char * i_name, int i_subId, const type_info & i_type, size_t i_size, void * io_valueArr
45 ) override;
46
48 virtual void writeAccumulators(
49 const RunOptions & i_runOpts,
50 bool i_isLastTable,
51 const char * i_name,
52 size_t i_size,
53 forward_list<unique_ptr<double[]> > & io_accValues
54 ) override;
55
57 virtual void shutdownRun(int i_runId) override { doShutdownRun(i_runId, taskRunId, dbExec); }
58
60 virtual void shutdownWaitAll(void) override { doShutdownAll(taskRunId, dbExec); }
61
63 virtual void shutdownOnExit(ModelStatus i_status) override { doShutdownOnExit(i_status, runId, taskRunId, dbExec); }
64
66 virtual bool childExchange(void) override;
67
68 protected:
70 virtual void init(void) override;
71
73 int currentRunId(void) const noexcept override { return runId; }
74
75 private:
76 int runId; // if > 0 then model run id
77 int taskId; // if > 0 then modeling task id
78 int taskRunId; // if > 0 then modeling task run id
79 bool isWaitTaskRun; // if true then task run under external supervision
80 IDbExec * dbExec; // db-connection
81 DoneVector isSubDone; // size of [sub-value count], if true then all sub-value accumulators saved in database
82
83 private:
84 SingleController(const SingleController & i_runCtrl) = delete;
85 SingleController & operator=(const SingleController & i_runCtrl) = delete;
86 };
87
90 {
91 public:
93 RootController(int i_processCount, const ArgReader & i_argStore, IDbExec * i_dbExec, IMsgExec * i_msgExec) :
94 RunController(i_argStore),
95 taskId(0),
96 taskRunId(0),
97 isWaitTaskRun(false),
98 dbExec(i_dbExec),
99 msgExec(i_msgExec)
100 {
101 processCount = i_processCount;
102 processRank = 0;
103 }
104
106 virtual ~RootController(void) noexcept { }
107
109 virtual int nextRun(void) override;
110
112 virtual void readParameter(
113 const char * i_name, int i_subId, const type_info & i_type, size_t i_size, void * io_valueArr
114 ) override;
115
117 virtual void writeAccumulators(
118 const RunOptions & i_runOpts,
119 bool i_isLastTable,
120 const char * i_name,
121 size_t i_size,
122 forward_list<unique_ptr<double[]> > & io_accValues
123 ) override;
124
126 virtual void shutdownRun(int i_runId) override;
127
129 virtual void shutdownWaitAll(void) override;
130
132 virtual void shutdownOnExit(ModelStatus i_status) override;
133
138 virtual bool childExchange(void) override;
139
140 protected:
142 virtual void init(void) override;
143
145 int currentRunId(void) const noexcept override {
146 try {
147 return (rootGroupDef.isRootActive && !runGroupLst.empty()) ? runGroupLst.back().runId : 0;
148 }
149 catch (...) {
150 return 0;
151 }
152 }
153
154 private:
155 int taskId; // if > 0 then modeling task id
156 int taskRunId; // if > 0 then modeling task run id
157 bool isWaitTaskRun; // if true then task run under external supervision
158 IDbExec * dbExec; // db-connection
159 IMsgExec * msgExec; // message passing interface
160 ProcessGroupDef rootGroupDef; // root process groups size, groups count and process rank in group
161 list<RunGroup> runGroupLst; // process groups run id and run state
162 list<AccReceive> accRecvLst; // list of accumulators to be received
163
164 // return root process run group: last run group
165 RunGroup & rootRunGroup(void) { return runGroupLst.back(); }
166
168 void broadcastMetaData(void);
169
171 template <class MetaTbl>
172 void broadcastMetaTable(MsgTag i_msgTag, unique_ptr<MetaTbl> & io_tableUptr);
173
175 void broadcastRunOptions(void);
176
178 void broadcastLanguageMessages(void);
179
181 int makeNextRun(RunGroup & i_runGroup);
182
184 void readAllRunParameters(const RunGroup & i_runGroup) const;
185
187 void appendAccReceiveList(int i_runId, const RunGroup & i_runGroup);
188
190 bool receiveSubValues(void);
191
193 void updateAccReceiveList(void);
194
198 bool updateModelRunStatus(void);
199
201 bool receiveStatusUpdate(long i_waitTime = 0L);
202
203 // microdata receive status: row count and size for all entities where microdata required
204 struct EntityMdReceive
205 {
206 int childRank = 0; // child rank to receive microdata from
207 bool isLast = false; // if true then it is last entity microdata on model shutdown
208 int childRunId = 0; // child process run id
209 vector<size_t> count; // microdata row count for each entity
210 };
211
212 // microdata receive queue
213 recursive_mutex mdRcvMutex; // mutex to lock for receive operations
214 //
215 list<EntityMdReceive> entityMdRcvLst;
216 vector<bool> isMicrodataFromRank; // array [worldSize], if value true for the rank then microdata must be received
217
219 bool receiveMicrodata(long i_waitTime = 0L);
220
222 bool isAllMicrodataReceived(RunGroup & i_runGroup);
223
225 bool isAllMicrodataReceived(int i_childRank);
226
227 private:
228 RootController(const RootController & i_runCtrl) = delete;
229 RootController & operator=(const RootController & i_runCtrl) = delete;
230 };
231
234 {
235 public:
237 ChildController(int i_processCount, const ArgReader & i_argStore, IMsgExec * i_msgExec) :
238 RunController(i_argStore),
239 runId(0),
240 msgExec(i_msgExec),
241 lastTimeStatus(chrono::system_clock::now()),
242 lastModelStatus(ModelStatus::init),
243 isFinalExchange(false)
244 {
245 processCount = i_processCount;
246 processRank = i_msgExec->rank();
247 }
248
250 virtual ~ChildController(void) noexcept { }
251
253 virtual int nextRun(void) override;
254
256 virtual void readParameter(
257 const char * i_name, int i_subId, const type_info & i_type, size_t i_size, void * io_valueArr
258 ) override;
259
261 virtual void writeAccumulators(
262 const RunOptions & i_runOpts,
263 bool i_isLastTable,
264 const char * i_name,
265 size_t i_size,
266 forward_list<unique_ptr<double[]> > & io_accValues
267 ) override;
268
270 virtual void shutdownRun(int i_runId) override;
271
273 virtual void shutdownWaitAll(void) override;
274
276 virtual void shutdownOnExit(ModelStatus i_status) override;
277
279 virtual bool childExchange(void) override;
280
281 private:
282 int runId; // if > 0 then model run id
283 ProcessGroupDef groupDef; // child process groups size, groups count and process rank in group
284 IMsgExec * msgExec; // message passing interface
285 chrono::system_clock::time_point lastTimeStatus; // last status update time sent to root
286 ModelStatus lastModelStatus; // last model status sent to root
287 bool isFinalExchange; // if true then final model status send or received from root
288
290 virtual void init(void) override;
291
293 int currentRunId(void) const noexcept override { return runId; }
294
296 int broadcastMetaData(void);
297
299 template <class MetaTbl>
300 void broadcastMetaTable(MsgTag i_msgTag, unique_ptr<MetaTbl> & io_tableUptr);
301
303 void broadcastRunOptions(void);
304
306 void broadcastLanguageMessages(void);
307
309 void sendStatusUpdate(void);
310
312 size_t sendMicrodata(bool i_isLast);
313
314 private:
315 ChildController(const ChildController & i_runCtrl) = delete;
316 ChildController & operator=(const ChildController & i_runCtrl) = delete;
317 };
318
321 {
322 public:
324 RestartController(const ArgReader & i_argStore, IDbExec * i_dbExec) : RunController(i_argStore),
325 runId(0),
326 setId(0),
327 dbExec(i_dbExec),
328 isSubDone(subValueCount)
329 { }
330
332 virtual ~RestartController(void) noexcept { }
333
335 virtual int nextRun(void) override;
336
338 virtual void readParameter(
339 const char * i_name, int i_subId, const type_info & i_type, size_t i_size, void * io_valueArr
340 ) override;
341
343 virtual void writeAccumulators(
344 const RunOptions & i_runOpts,
345 bool i_isLastTable,
346 const char * i_name,
347 size_t i_size,
348 forward_list<unique_ptr<double[]> > & io_accValues
349 ) override;
350
352 virtual void shutdownRun(int i_runId) override { doShutdownRun(i_runId, 0, dbExec); }
353
355 virtual void shutdownWaitAll(void) override { doShutdownAll(0, dbExec); }
356
358 virtual void shutdownOnExit(ModelStatus i_status) override { doShutdownOnExit(i_status, runId, 0, dbExec); }
359
361 virtual bool childExchange(void) override;
362
363 private:
364 int runId; // if > 0 then model run id
365 int setId; // if > 0 then set id of source input parametes
366 IDbExec * dbExec; // db-connection
367 DoneVector isSubDone; // size of [sub-value count], if true then all sub-value accumulators saved in database
368
370 virtual void init(void) override;
371
373 int currentRunId(void) const noexcept override { return runId; }
374
375 // find sub-value to restart the run and update run status
376 bool cleanupRestartSubValue(void);
377
378 private:
379 RestartController(const RestartController & i_runCtrl) = delete;
380 RestartController & operator=(const RestartController & i_runCtrl) = delete;
381 };
382}
383
384#endif // RUN_CTRL_IMPL_H
controller for child process: receive input parameters from root and send output tables.
Definition: runControllerImpl.h:234
virtual int nextRun(void) override
create new run and input parameters in database.
Definition: childController.cpp:192
ChildController(int i_processCount, const ArgReader &i_argStore, IMsgExec *i_msgExec)
create new child run controller
Definition: runControllerImpl.h:237
virtual void shutdownRun(int i_runId) override
model run shutdown: save results and update run status.
Definition: childController.cpp:243
virtual void shutdownOnExit(ModelStatus i_status) override
model process shutdown if exiting without completion (ie: exit on error).
Definition: childController.cpp:222
virtual void readParameter(const char *i_name, int i_subId, const type_info &i_type, size_t i_size, void *io_valueArr) override
read input parameter values.
Definition: childController.cpp:259
virtual ~ChildController(void) noexcept
last cleanup
Definition: runControllerImpl.h:250
virtual void shutdownWaitAll(void) override
model process shutdown: cleanup resources.
Definition: childController.cpp:228
virtual bool childExchange(void) override
exchange between root and child process to send and receive status update.
Definition: childController.cpp:311
virtual void writeAccumulators(const RunOptions &i_runOpts, bool i_isLastTable, const char *i_name, size_t i_size, forward_list< unique_ptr< double[]> > &io_accValues) override
send output table accumulators to root process.
Definition: childController.cpp:280
Definition: modelHelper.h:17
database connection wrapper to execute sql commands.
Definition: dbExec.h:21
public interface for message passing
Definition: msg.h:138
virtual int rank(void) const =0
return current process rank.
int subValueCount
total number of sub-values
Definition: metaLoader.h:257
controller for "restart run": calculate outstanding sub-values for existing run
Definition: runControllerImpl.h:321
virtual bool childExchange(void) override
communicate with between main therad and modeling threads to receive status update.
Definition: restartController.cpp:293
RestartController(const ArgReader &i_argStore, IDbExec *i_dbExec)
create new "restart run" controller
Definition: runControllerImpl.h:324
virtual void shutdownWaitAll(void) override
model process shutdown: cleanup resources.
Definition: runControllerImpl.h:355
virtual void readParameter(const char *i_name, int i_subId, const type_info &i_type, size_t i_size, void *io_valueArr) override
read input parameter values.
Definition: restartController.cpp:250
virtual void writeAccumulators(const RunOptions &i_runOpts, bool i_isLastTable, const char *i_name, size_t i_size, forward_list< unique_ptr< double[]> > &io_accValues) override
write output table accumulators.
Definition: restartController.cpp:274
virtual ~RestartController(void) noexcept
last cleanup
Definition: runControllerImpl.h:332
virtual int nextRun(void) override
create new run and input parameters in database.
Definition: restartController.cpp:126
virtual void shutdownRun(int i_runId) override
model run shutdown: save results and update run status.
Definition: runControllerImpl.h:352
virtual void shutdownOnExit(ModelStatus i_status) override
model process shutdown if exiting without completion (ie: exit on error).
Definition: runControllerImpl.h:358
controller for root process: create new model run(s), send input parameters to children and receieve ...
Definition: runControllerImpl.h:90
virtual bool childExchange(void) override
exchange between root and child processes and threads.
Definition: rootController.cpp:354
virtual void writeAccumulators(const RunOptions &i_runOpts, bool i_isLastTable, const char *i_name, size_t i_size, forward_list< unique_ptr< double[]> > &io_accValues) override
write output table accumulators.
Definition: rootController.cpp:628
virtual void shutdownRun(int i_runId) override
model run shutdown: save results and update run status.
Definition: rootController.cpp:521
virtual void shutdownWaitAll(void) override
model process shutdown: wait for all child to be completed and do final cleanup.
Definition: rootController.cpp:448
virtual int nextRun(void) override
create new run and input parameters in database.
Definition: rootController.cpp:294
RootController(int i_processCount, const ArgReader &i_argStore, IDbExec *i_dbExec, IMsgExec *i_msgExec)
create new root run controller
Definition: runControllerImpl.h:93
virtual void shutdownOnExit(ModelStatus i_status) override
model process shutdown if exiting without completion (ie: exit on error).
Definition: rootController.cpp:440
int currentRunId(void) const noexcept override
return run id of root process model run, if root process is doing any model runs
Definition: runControllerImpl.h:145
virtual void readParameter(const char *i_name, int i_subId, const type_info &i_type, size_t i_size, void *io_valueArr) override
read input parameter values.
Definition: rootController.cpp:573
virtual void init(void) override
initialize root modeling process.
Definition: rootController.cpp:92
virtual ~RootController(void) noexcept
last cleanup
Definition: runControllerImpl.h:106
run controller: create new model run(s) and support data exchange.
Definition: runController.h:24
int processCount
number of modeling processes: MPI world size
Definition: runController.h:33
void doShutdownAll(int i_taskRunId, IDbExec *i_dbExec)
implementation model process shutdown.
Definition: runController.cpp:127
void doShutdownOnExit(ModelStatus i_status, int i_runId, int i_taskRunId, IDbExec *i_dbExec)
impelementation of model process shutdown if exiting without completion.
Definition: runController.cpp:66
void doShutdownRun(int i_runId, int i_taskRunId, IDbExec *i_dbExec)
implementation of model run shutdown.
Definition: runController.cpp:157
int processRank
modeling processes rank: MPI rank
Definition: runController.h:36
controller for single process: create new model run(s), read input parameters and write output tables...
Definition: runControllerImpl.h:23
virtual void init(void) override
initialize root modeling process.
Definition: singleController.cpp:76
virtual void readParameter(const char *i_name, int i_subId, const type_info &i_type, size_t i_size, void *io_valueArr) override
read input parameter values.
Definition: singleController.cpp:184
SingleController(const ArgReader &i_argStore, IDbExec *i_dbExec)
create new single process run controller
Definition: runControllerImpl.h:26
int currentRunId(void) const noexcept override
get run id of the current model run.
Definition: runControllerImpl.h:73
virtual void shutdownRun(int i_runId) override
model run shutdown: save results and update run status.
Definition: runControllerImpl.h:57
virtual void shutdownWaitAll(void) override
model process shutdown: cleanup resources.
Definition: runControllerImpl.h:60
virtual int nextRun(void) override
create new run and input parameters in database.
Definition: singleController.cpp:149
virtual ~SingleController(void) noexcept
last cleanup
Definition: runControllerImpl.h:37
virtual bool childExchange(void) override
communicate with between main therad and modeling threads to receive status update.
Definition: singleController.cpp:227
virtual void writeAccumulators(const RunOptions &i_runOpts, bool i_isLastTable, const char *i_name, size_t i_size, forward_list< unique_ptr< double[]> > &io_accValues) override
write output table accumulators.
Definition: singleController.cpp:208
virtual void shutdownOnExit(ModelStatus i_status) override
model process shutdown if exiting without completion (ie: exit on error).
Definition: runControllerImpl.h:63
OpenM++ data library: output table interfaces.
OpenM++ data library: input parameter interfaces.
OpenM++ data library: db value classes access value rows: input parameter, accumulator or expression ...
openM++ namespace
Definition: log.h:32
MsgTag
tag to identify message content
Definition: msg.h:29
ModelStatus
modeling job status
Definition: omModelRunState.h:43
arguments reader to get runtime arguments from command line and ini-file.
Definition: argReader.h:56
Definition: modelHelper.h:47
Definition: modelHelper.h:67
basic model run options
Definition: omModel.h:26