Commit fbe00e73 authored by Mirko Stoffers's avatar Mirko Stoffers

Moved cAsyncModule functionality to cSimpleModule, dropped cAsyncModule

We eliminated the need for special cAsyncModule objects
since it is more convenient to just stick to cSimpleModule in the
model implementation and active Horizon through omnetpp.ini switches.

This means, that you need to change all inheritances of cAsyncModule
to cSimpleModule for your Horizon-enabled models. For new models, you
can use Horizon w/o changing cSimpleModule to cAsyncModule.
parent ffa9279f
......@@ -35,3 +35,5 @@
/ide/org.omnetpp.sequencechart/.project
/ide/org.omnetpp.sequencechart/.settings/
/ide/org.omnetpp.sequencechart/src/sequencechart/
*.swp
Porting an OMNeT++ model to Horizon
===================================
===================================
Porting a given model to run on top of Horizon is relatively straightforward.
* New base class for modules
In order to enable parallel execution in Horizon, modules need to inherit
from cAsyncModule instead of cSimpleModule. Simply replace the base class
in all modules of the model. Additionally, remove the Module_Class_Members()
macro if it is used in the module. This macro is not compatible with Horizon
and also not needed.
* New base class for modules (OBSOLETE!!!)
We used to have a separate base class for asynchronous modules. If you are
still using this class (cAsyncModule), please go back to cSimpleModule.
cAsyncModule is no longer required nor included.
* No Module_Class_Members() macro
Remove the Module_Class_Members() macro if it is used in your modules.
This macro is not compatible with Horizon and also not needed.
* No findObject
In Horizon, the ownership handling of OMNeT++ is partly disabled to allow for
an efficient parallel execution: Child modules still know their parent, but the
......@@ -20,17 +22,16 @@
- findObjectByName(const char* name)
- removeByName(const char* name)
- containsByName(const char* name)
* Random number generation
Since the global random number generator of OMNeT++ is not thread-safe,
every cAsyncModule contains a local random number generator. To use this,
simply prefix every call to a random number generation method with the local
object numGen:
- uniform(a,b) ==> numGen->uniform(a,b)
Seeding of these local random number generators is done by means of a global
seed generator RNG. During the sequential initialization phase of the model,
every model requests a seed for its local RNGs from the central seed generator.
* Random number generation (OBSOLETE!!!)
We used to require the use of numGen->uniform(a,b) instead of uniform(a,b)
for using local RNGs instead of a global, not-thread-safe RNG. We changed
the default behavior. The numGen object no longer exists, you automatically
use local RNGs when you run uniform(a,b).
If you are still using numGen->uniform(a,b), please go back to uniform(a,b).
Seeding of the local random number generators is done by means of a global
seed generator RNG. During the sequential initialization phase of the model,
every model requests a seed for its local RNGs from the central seed generator.
Performance optimization
========================
......
//==========================================================================
// CASYNCMODULE.H - header for
// Horizon/OMNeT++/OMNEST
// Discrete System Simulation in C++
//
//
// Declaration of the following classes:
// cAsyncModule : base for asynchronous event handing in simple module objects
//
//==========================================================================
/*--------------------------------------------------------------*
Copyright (C) 2009 Georg Kunz
This file is distributed WITHOUT ANY WARRANTY. See the file
`license' for details on this and other legal matters.
*--------------------------------------------------------------*/
#ifndef __CASYNCMODULE_H
#define __CASYNCMODULE_H
#include <pthread.h>
#include <semaphore.h>
#include "csimplemodule.h"
#include "cmessage.h"
#ifdef ATOMIC_OPS_DEBUG
#include "catomicopsdebug.h"
#else
#include <atomic_ops.h>
#endif
/**
* New base class for all modules that handle expanded events for asynchronous
* (parallel) execution. It provides new methods for parallel event handling
* and wrappers of the default API.
*
* @see cSimpleModule
*
* @ingroup Horizon
*/
class cAsyncMessage;
class SIM_API cAsyncModule : public cSimpleModule
{
private:
// the two different execution states: either handleMessage is called
// synchronously by the scheduler or asynchronously (to the scheduler)
// by a worker thread
enum ExecutionStates
{
synchronous,
asynchronous
};
// current state of execution
ExecutionStates executionState;
// new messages may only be scheduled with timestamps >= this value
simtime_t t_end;
// priority of the current event
short currentEventPriority;
// shall zero duration events be executed in parallel
bool parZeroDur;
// no worker is active inside this module
static const AO_t NOT_BUSY = 0;
// a worker is active inside this module
static const AO_t BUSY = 1;
// flag indicating that a thread is processing an event within this module
AO_t busy;
// how many messages did a given event send?
unsigned int scheduledMessageCount;
//
unsigned int executionOrderId;
// current simulation time within this module
simtime_t now;
/**
* update meta data needed for event execution
*/
void prepareHandleMessage(cMessage* msg);
/**
* set extended meta data of a message upon sending.
*/
void setMessageMetaData(cMessage* msg);
protected:
public:
/** @name Constructors and Destructors */
//@{
/**
* Constructor
*/
cAsyncModule(const char *name=NULL, cModule *parent=NULL, unsigned stacksize=0);
/**
* Destructor
*/
virtual ~cAsyncModule();
//}@
/** @name User-implemented methods for asynchronous event handling. */
//@{
/**
* Returns the duration of the given expanded event. Called by the event
* scheduler to determine the overlapping of expanded events. The user may
* perform any computation on the expanded event (except for deleting it)
* to calculate the event duration.
*/
virtual simtime_t getProcessingDelay(cMessage* msg)
{
return 0.0;
}
/*
* By default non-expanded events are not executed in parallel. The
* option "parallelize-zero-duration-events" globally enables or disables
* parallel execution. This method allows users to overwrite either behavior
* on a per module (and per event) basis. Should return true if parallel
* execution is allowed, false otherwise.
*/
virtual bool mayParallelize(cMessage* msg, simtime_t duration) const
{
return duration == SimTime::simTimeZero ? parZeroDur : true;
}
/*
* Initializes the local RNGs according to config options read from ini
*/
void initLocalRNGs();
//}@
/** @name Support methods for asynchronous event handling. */
//@{
/**
* Returns the duration of the currently processed event.
*/
simtime_t getCurrentProcessingDelay() const {
return t_end - simTime();
}
/*
* Returns true if the module supports parallel event execution.
*/
virtual bool isAsyncModule() const {
return true;
}
//}@
/** @name Wrapper functions for state keeping inside a module. */
//@{
/**
* INTERNAL: Wrapper for asynchronous message handling.
*/
void callHandleAsyncMessage(cMessage* msg);
/**
* INTERNAL: Wrapper for synchronous message handling.
*/
void callHandleMessage(cMessage* msg);
/*
* INTERNAL: Check if a worker thread is already busy inside this module
* and if so, wait until the worker is done.
*/
inline void waitIfBusy() {
while (AO_load_full(&busy) == BUSY) {
__asm__("pause");
}
}
/*
* INTERNAL: Indicate that a worker is busy inside this module.
*/
inline void setBusy() {
AO_store_full(&busy, BUSY);
}
/**
* INTERNAL: Unset flag to indicate that a worker has finished.
*/
inline void unsetBusy() {
AO_store_full(&busy, NOT_BUSY);
}
//}@r
/** @name Wrapper functions for the OMNeT++ API */
//@{
/**
* @see cSimpleModule
*/
virtual int scheduleAt(simtime_t t, cMessage *msg);
/**
* @see cSimpleModule
*/
virtual int send(cMessage *msg, int gateid);
/**
* @see cSimpleModule
*/
virtual int send(cMessage *msg, const char *gatename, int sn=-1);
/**
* @see cSimpleModule
*/
virtual int send(cMessage *msg, cGate *outputgate);
/**
* @see cSimpleModule
*/
virtual int sendDelayed(cMessage *msg, simtime_t delay, int gateid);
/**
* @see cSimpleModule
*/
virtual int sendDelayed(cMessage *msg, simtime_t delay, const char *gatename, int sn=-1);
/**
* @see cSimpleModule
*/
virtual int sendDelayed(cMessage *msg, simtime_t delay, cGate *outputgate);
/**
* @see cSimpleModule
*/
virtual int sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, int inputgateid);
/**
* @see cSimpleModule
*/
virtual int sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cModule *mod, const char *inputgatename, int sn=-1);
/**
* @see cSimpleModule
*/
virtual int sendDirect(cMessage *msg, simtime_t delay, simtime_t duration, cGate *inputgate);
//}@
};
#endif /* __CASYNCMODULE_H */
......@@ -22,7 +22,6 @@
#include <semaphore.h>
#include "cmessage.h"
#include "casyncmodule.h"
#ifdef ATOMIC_OPS_DEBUG
#include "catomicopsdebug.h"
......
......@@ -12,6 +12,7 @@
/*--------------------------------------------------------------*
Copyright (C) 1992-2008 Andras Varga
Copyright (C) 2006-2008 OpenSim Ltd.
Copyright (C) 2009 Georg Kunz
This file is distributed WITHOUT ANY WARRANTY. See the file
`license' for details on this and other legal matters.
......@@ -20,9 +21,18 @@
#ifndef __CSIMPLEMODULE_H
#define __CSIMPLEMODULE_H
#include <pthread.h>
#include <semaphore.h>
#include "cmodule.h"
#include "cnumgen.h"
#ifdef ATOMIC_OPS_DEBUG
#include "catomicopsdebug.h"
#else
#include <atomic_ops.h>
#endif
NAMESPACE_BEGIN
class cQueue;
......@@ -79,6 +89,56 @@ class SIM_API cSimpleModule : public cModule, public cNumberGenerator //implies
static bool stack_cleanup_requested; // 'true' value asks activity() to throw a cStackCleanupException
static cSimpleModule *after_cleanup_transfer_to; // transfer back to this module (or to main)
private: // Horizon
// the two different execution states: either handleMessage is called
// synchronously by the scheduler or asynchronously (to the scheduler)
// by a worker thread
enum ExecutionStates
{
synchronous,
asynchronous
};
// current state of execution
ExecutionStates executionState;
// new messages may only be scheduled with timestamps >= this value
simtime_t t_end;
// priority of the current event
short currentEventPriority;
// shall zero duration events be executed in parallel
bool parZeroDur;
// no worker is active inside this module
static const AO_t NOT_BUSY = 0;
// a worker is active inside this module
static const AO_t BUSY = 1;
// flag indicating that a thread is processing an event within this module
AO_t busy;
// how many messages did a given event send?
unsigned int scheduledMessageCount;
//
unsigned int executionOrderId;
// current simulation time within this module
simtime_t now;
/**
* update meta data needed for event execution
*/
void prepareHandleMessage(cMessage* msg);
/**
* set extended meta data of a message upon sending.
*/
void setMessageMetaData(cMessage* msg);
private:
// internal use
static void activate(void *p);
......@@ -141,15 +201,6 @@ class SIM_API cSimpleModule : public cModule, public cNumberGenerator //implies
virtual ~cSimpleModule();
//@}
/*
* Returns true if Module is capable of Horizon specific parallel execution
* Called in doOneEvent
* Overridden in cAsyncmodule
*/
virtual bool isAsyncModule() const {
return false;
}
/** @name Redefined cObject member functions. */
//@{
/**
......@@ -232,18 +283,18 @@ class SIM_API cSimpleModule : public cModule, public cNumberGenerator //implies
/**
* Sends a message through the gate given with its ID.
*/
int send(cMessage *msg, int gateid) {return sendDelayed(msg, SIMTIME_ZERO, gateid);}
int send(cMessage *msg, int gateid);
/**
* Sends a message through the gate given with its name and index
* (if multiple gate).
*/
int send(cMessage *msg, const char *gatename, int gateindex=-1) {return sendDelayed(msg, SIMTIME_ZERO, gatename, gateindex);}
int send(cMessage *msg, const char *gatename, int gateindex=-1);
/**
* Sends a message through the gate given with its pointer.
*/
int send(cMessage *msg, cGate *outputgate) {return sendDelayed(msg, SIMTIME_ZERO, outputgate);}
int send(cMessage *msg, cGate *outputgate);
/**
* Delayed sending. Sends a message through the gate given with
......@@ -487,7 +538,86 @@ class SIM_API cSimpleModule : public cModule, public cNumberGenerator //implies
* @see cCoroutine
*/
virtual unsigned getStackUsage() const;
//@}
public: // Horizon
/** @name User-implemented methods for asynchronous event handling. */
//@{
/**
* Returns the duration of the given expanded event. Called by the event
* scheduler to determine the overlapping of expanded events. The user may
* perform any computation on the expanded event (except for deleting it)
* to calculate the event duration.
*/
virtual simtime_t getProcessingDelay(cMessage* msg)
{
return 0.0;
}
/*
* By default non-expanded events are not executed in parallel. The
* option "parallelize-zero-duration-events" globally enables or disables
* parallel execution. This method allows users to overwrite either behavior
* on a per module (and per event) basis. Should return true if parallel
* execution is allowed, false otherwise.
*/
virtual bool mayParallelize(cMessage* msg, simtime_t duration) const
{
return duration == SimTime::simTimeZero ? parZeroDur : true;
}
/*
* Initializes the local RNGs according to config options read from ini
*/
void initLocalRNGs();
//}@
/** @name Support methods for asynchronous event handling. */
//@{
/**
* Returns the duration of the currently processed event.
*/
simtime_t getCurrentProcessingDelay() const {
return t_end - simTime();
}
/** @name Wrapper functions for state keeping inside a module. */
//@{
/**
* INTERNAL: Wrapper for asynchronous message handling.
*/
void callHandleAsyncMessage(cMessage* msg);
/**
* INTERNAL: Wrapper for synchronous message handling.
*/
void callHandleMessage(cMessage* msg);
/*
* INTERNAL: Check if a worker thread is already busy inside this module
* and if so, wait until the worker is done.
*/
inline void waitIfBusy() {
while (AO_load_full(&busy) == BUSY) {
__asm__("pause");
}
}
/*
* INTERNAL: Indicate that a worker is busy inside this module.
*/
inline void setBusy() {
AO_store_full(&busy, BUSY);
}
/**
* INTERNAL: Unset flag to indicate that a worker has finished.
*/
inline void unsetBusy() {
AO_store_full(&busy, NOT_BUSY);
}
//}@r
//@}
};
NAMESPACE_END
......
......@@ -53,7 +53,6 @@ class cModuleType;
class cEnvir;
class cDefaultList;
class cThreadPool;
class cAsyncModule;
class cStopWatch;
SIM_API extern cDefaultList defaultList; // also in globals.h
......@@ -163,7 +162,7 @@ class SIM_API cSimulation : public cNamedObject, noncopyable
void setupThreadPool();
/*
* setup the local Random Number Generators for each cAsyncmodule
* setup the local Random Number Generators for each module
*/
void setupLocalRNGs();
......
//==========================================================================
// CTHREADPOOL.H - header for
// CSPINNINGTHREADPOOL.H - header for
// OMNeT++/OMNEST
// Discrete System Simulation in C++
//
//
// Declaration of the following classes:
// cMessage : thread pool for asynchronous event handling
// cSpinningThreadPool : thread pool for asynchronous event handling
//
//==========================================================================
......@@ -20,6 +20,7 @@
#define __CSPINNINGTHREADPOOL_H
#include "cthreadpool.h"
#include "csimplemodule.h"
#ifdef ATOMIC_OPS_DEBUG
#include "catomicopsdebug.h"
......
......@@ -76,7 +76,6 @@
#include "crng.h"
#include "cscheduler.h"
#include "csimplemodule.h"
#include "casyncmodule.h"
#include "csimulation.h"
#include "cstatistic.h"
#include "cstddev.h"
......
......@@ -73,54 +73,31 @@ clean:
# DO NOT DELETE THIS LINE -- make depend depends on it.
$O/cmdenv.o: cmdenv.cc \
../../include/regmacros.h \
../../include/cstopwatch.h \
../envir/args.h \
cmdenv.h \
../../include/platdep/platdefs.h \
../envir/envirdefs.h \
../../include/cmathfunction.h \
../envir/resultlistener.h \
../../include/catomicopsdebug.h \
../../include/cnamedobject.h \
../../include/clockedmsgheap.h \
../../include/cexpression.h \
../envir/eventlogfilemgr.h \
cmddefs.h \
../../include/cgate.h \
../../include/simtime.h \
../envir/objectprinter.h \
../../include/carray.h \
../envir/speedometer.h \
../../include/simtime_t.h \
../../include/cenvir.h \
../../include/cmsgpar.h \
../../include/ccomponent.h \
../../include/cconfigoption.h \
../../include/cproperties.h \
../../include/cregistrationlist.h \
../../include/platdep/timeutil.h \
../../include/cconfiguration.h \
../../include/opp_string.h \
../envir/intervals.h \
../../include/simkerneldefs.h \
../../include/cthreadpool.h \
../../include/csimulation.h \
../../include/errmsg.h \
../../include/cownedobject.h \
../../include/cmessage.h \
../../include/cobject.h \
../../include/onstartup.h \
../envir/envirbase.h \
../../include/clistener.h \
../../include/envirext.h \
../../include/cdummystringpool.h \
../../include/cstringpool.h \
../../include/cscheduler.h \
../../include/cpar.h \
../../include/globals.h \
../../include/ctaskheap.h \
../envir/appreg.h \
../../include/cdefaultlist.h \
../../include/cvisitor.h \
../../include/cpthreadlock.h \
../../include/cttaslock.h \
......@@ -129,9 +106,35 @@ $O/cmdenv.o: cmdenv.cc \
../../include/cnolock.h \
../../include/cmodule.h \
../../include/platdep/platmisc.h \
../../include/cmessageheap.h \
../../include/cexception.h \
../../include/simutil.h \
../../include/cproperty.h \
../../include/platdep/intxtypes.h \
../../include/ccomponenttype.h
../../include/ccomponenttype.h \
../../include/cstopwatch.h \
../../include/platdep/platdefs.h \
../../include/cmathfunction.h \
../envir/resultlistener.h \
../../include/clockedmsgheap.h \
../../include/catomicopsdebug.h \
../envir/eventlogfilemgr.h \
../../include/simtime.h \
../../include/cgate.h \
../../include/carray.h \
../../include/cenvir.h \
../../include/simtime_t.h \
../../include/cmsgpar.h \
../../include/cconfigoption.h \