The Coroutine Library |
2 |
![]() |
Tasks do not actually execute concurrently. A single task continues to execute until it suspends or terminates itself; usually, another task then resumes execution. You can use the coroutine library to simulate concurrent execution, and use simulated time to make the execution of the coroutines actually appear parallel.
Note - If your application is compiled with -O4 or O5, you will not be able to use the coroutine library. You must use an -O3 or lower level of optimization in order to use the coroutine library.
Note - As of the time of this printing, the coroutine library will not be supported beyond the current version.
In addition, two important base classes are described in Table 2-2.
Objects
The coroutine library defines class object as a base class for other classes in the library. For example, messages passed between tasks are instances of classes derived from class object. You can derive your own special-purpose classes from class object.
Tasks
Tasks are the basic features of coroutine-style programming. A task runs until it explicitly allows another task to run. When one task suspends or terminates itself, the task system chooses the next task on the list of ready-to-run tasks and runs it.
Figure 2-1 Coroutine Library Structure.
Class task
A task is an object of a class derived from class task. The action of a task is contained in the constructor of the task's class. Before returning, the constructor of a task terminates it by a call to resultis. A task is always in one of three states, as shown in Table 2-4.
Parts of a Task
Table 2-5 describes the public part of class task:
A Simple Task Example
A simple example of a task is one where the function main creates two tasks, one of which needs to get information from the other. Appendix A, "Coroutine Examples" shows this example.
In the following example, one task gets a string from the user while the second counts the number of '0' characters in the string.
Code Example 2-1 Classes |
class getLine : public task {
public:
getLine();
};
class countZero : public task {
public:
countZero(getLine*);
};
The implementation of the constructor for getLine is simple. Code Example 2-2 assumes type int and char* are the same size and can be freely cast back and forth. This may not be the case with other C++ implementations:
Code Example 2-2 getline Constructor |
getLine::getLine()
{
char* tmpbuf = new char[512];
cout << "Enter string: ";
cin >> tmpbuf;
resultis((int)tmpbuf);
}
Code Example 2-3 shows the constructor for countZero.
Code Example 2-3 countZero Constructor |
countZero::countZero(getLine *g)
{
char *s, c;
int i = 0;
s = (char*)g->result();
while( c = *s++)
if( c == '0' )
i++;
resultis(i);
}
The main program looks like this:
Code Example 2-4 Zero-char Counter Main Program |
// Simple zero-char counter program
int main()
{
getLine g;
countZero c(&g);
cout << "Count result = "
<< c.result() << "\n";
thistask->resultis(0);
return 0;
}
Waiting States for Tasks
When a task waits for some other task to take some action or produce some information, it becomes IDLE. Later, when the condition that led to its suspension becomes satisfied, the task again becomes RUNNING. Pending Objects
An object is said to be pending if it is waiting for some event. For example, an empty queue head is pending, since nothing can be removed until an item is appended. Calling result to Wait for Information
result is a task member function that a task can call on another task. It returns a single int value. For example:
// within someTask() secondClass secondObject(); int i = secondObject.result(); |
If secondObject has not terminated when someTask calls member function result, someTask is suspended (becomes IDLE) until secondObject does terminate. At that point, someTask resumes (becomes RUNNING) with the result from secondObject available.
Suspending when Dealing with Queues
When you try to get a message from an empty queue (see "FIFO Queues" on page 25) or try to put a message in a full queue, the queue function suspends your task if the mode of the queue is WMODE. When the condition passes, your task becomes RUNNING again.
Putting Your Task to Sleep
You can put a task to sleep until a pending task is no longer pending. If the task you want to wait for is not a pending task and you use sleep, the calling task suspends itself indefinitely. If you want to wait for a task that may be nonpending, and have your task continue execution, use wait. You can put a task to sleep by calling:
void sleep(object* t = 0); |
The calling task goes to sleep until the object pointed to by the parameter is no longer pending. If the task is not pending when you execute this call, the calling task goes to sleep indefinitely. If you give a null pointer--as in sleep(0)--your task goes to sleep indefinitely.
Waiting for an Object
You can make a task can wait for another task to become ready (nonpending) by using the wait task member function. Make a task wait by calling:
void wait(object* ob); |
The calling task waits until the object pointed to by the parameter is no longer pending. If the task is not pending when you execute this call, or the object pointer is null, the calling task is not suspended.
Waiting for a List of Tasks
Tasks have two member functions that make them wait for any one of a list of pending objects to become no longer pending. The two functions are:
int waitlist(object*, ...); int waitvec(object**); |
qhead* firstQ; qtail* secondQ; taskType* aTask; . . . int which = waitlist(firstQ, secondQ, aTask, (object*)0); |
If all of the items are pending, the calling task is suspended (becomes IDLE). When any one of the items in the list becomes ready (no longer pending), waitlist returns and the calling task resumes its RUNNING state. If one of the items is ready when it is called, waitlist returns immediately. The return value of waitlist is the position in the list of a ready task, counting from 0. There may be more than one ready task, in which case one is arbitrarily identified as the task that caused waitlist to return.
object* vec[] = {firstQ, secondQ, aTask, 0}; int which = waitvec(vec); |
Waiting for a Predetermined Time
You can set a specific timed delay. With this kind of delay, the task remains in a RUNNING state, thus simulating the passage of time. (See "System Time" on page 22.) For example:
// ... do something delay(6); // wait // ... do s ome more |
In this example, after the call to delay has returned, six units of simulated time will have passed. Other tasks may or may not have run in the meantime, depending on their own scheduling requests.
System Time
The task system maintains a simulated time, which need not be (and usually is not) related to real time. The static member function sched::get_clock returns the current simulated time, which is by default initialized to zero. The static member function sched::setclock can be used to initialize the system clock to a starting time, but cannot be called once the time has advanced.
Timers
A timer behaves like a mini-task whose only function is to execute a delay; it has no result. Like any object, it can be waited on. One difference from a task is that a timer can be reset; it does not have to be destroyed first and reconstructed. Table 2-6 describes the public parts of class timer:
Queues
In "A Simple Task Example" on page 17, the two tasks act like ordinary functions: the first one completes its action before the second one begins execution. This is because information is passed using the resultis and result functions; the information is not passed until the task has terminated.
countZero::countZero(qhead *lineQ, qtail *countQ) |
{ |
char c; |
lineHolder *inmessage; |
while( 1 ) { |
inmessage = (lineHolder*)lineQ->get(); |
char *s = inmessage->line; |
int i = 0; |
while( c = *s++ ) |
if( c == '0' ) |
i++; |
numZero *num = new numZero(i); |
countQ->put(num); |
} |
resultis(1); // never gets here |
} |
Appendix A, "Coroutine Examples", Code Example A-2 gives the full text of a program written this way.
FIFO Queues
A FIFO queue is made of two objects: a qhead and a qtail. You create a queue by creating a qhead object for it. You then create a tail by calling the member function of qhead:
qtail* qhead::tail(); |
int qtail::put(object*) |
then take objects from the queue with the member function of qhead:
object* qhead::get() |
int qhead::putback(object*) |
A problem with the putback function is that if you try to use it on a full queue, you produce a runtime error in queue mode WMODE as well as EMODE. See "Queue Modes" on page 28 for an explanation of these modes.
Code Example 2-6 Zero-counter Program Using FIFO Queue |
FIFO.h
#include <task.h>
#include <iostream.h>
class getLine : public task {
public:
getLine(qhead*, qtail*);
};
class countZero : public task {
public:
countZero(qhead*, qtail*);
};
class lineHolder : public object {
public:
char *line;
lineHolder(char* s) : line(s) { }
};
class numZero : public object {
public:
int zero;
numZero(int count) : zero(count) { }
};
Now, you can rewrite the main function as shown below:
Code Example 2-7 Zero-counter Main Program |
main.cc
// Zero-counter program using queues
#include <task.h>
#include "FIFO.h"
#include "countZero.h"
#include "getLine.h"
int main()
{
qhead *stringQhead = new qhead;
qtail *stringQtail = stringQhead->tail();
qhead *countQhead = new qhead;
qtail *countQtail = countQhead->tail();
countZero counter(stringQhead, countQtail);
getLine g(countQhead, stringQtail);
thistask->resultis(0);
return 0;
}
Code Example 2-8 countZero Constructor |
countZero.h
countZero::countZero(qhead *lineQ, qtail *countQ)
{
char c;
lineHolder *inmessage;
while( 1 ) {
inmessage = (lineHolder*)lineQ->get();
char *s = inmessage->line;
int i = 0;
while( c = *s++ )
if( c == '0' )
i++;
numZero *num = new numZero(i);
countQ->put(num);
}
resultis(1); // never gets here
}
In this version, countZero is created first in the main program, after establishing queues for communication. When countZero tries to get a message from the queue there is none. countZero suspends, because this is the default waiting-type queue. At that point, the main program creates the line getter. Code Example 2-9 is the implementation of getline:
Code Example 2-9 getLine Constructor |
getLine.h
getLine::getLine(qhead* countQ, qtail* lineQ)
{
numZero *qdata;
while( 1 ) {
cout << "Enter a string, ^C to end session: ";
char tmpbuf[512];
cin >> tmpbuf;
lineQ->put(new lineHolder(tmpbuf));
qdata = (numZero*) countQ->get();
cout << "Count of zeroes = " << qdata->zero << "\n";
};
resultis(1); // never gets here
}
When this routine begins execution, it first gets a line from standard input, places that on the line queue, and then asks the count queue for the count. That action makes it suspend itself until the zero counter places its message on the queue.
Queue Modes
Three queue modes govern what happens when a task asks for a message from an empty queue or tries to put a message into a full queue:
You can find out the current mode using the head and tail member function:
qmodetype rdmode(); |
and set the mode using the head and tail member function:
void setmode(qmodetype m); |
Queue Size
By default, a queue is limited to 10,000 objects, although space for that number of objects is not actually allocated. Table 2-7 describes queue functions related to queue size.
Function |
Description |
int rdmax() |
Maximum number of objects allowed in queue. |
void setmax(int) |
Sets new maximum number of objects allowed. You can set the maximum to a number less than the number currently in the queue. In that case, the queue is considered full until the number falls to the new maximum. |
int rdcount() |
Number of objects in queue. |
int rdspace() |
Number of additional objects which can be inserted in queue. |
Cutting and Splicing
Since a queue is made up of a separate head and tail, you can cut and splice queues. The main use for this feature is to insert a filter, a special task which outputs a transformed version of its input. By cutting an existing queue and splicing in a filter, you can perform transformations without changing or affecting any existing code using the original queue.
Code Example 2-10 Buffer Class |
#include <task.h>
class Generator : public task {
public:
Generator(qtail *target);
...
};
class Printer : public task {
public:
Printer(qhead *source);
...
};
int main() {
...
// buffer up to 100 lines, using Wait mode
qhead *Buffer = new qhead(WMODE, 100);
// generator writes to the tail of the buffer
Generator *gen = new Generator(Buffer->qtail());
// printer reads from the head of the buffer
Printer *prt = new Printer(Buffer);
...
};
Code Example 2-11 Cutting and Splicing a Queue |
#include <task.h>
class Format : public task {
public:
Format(qhead *source, qtail *target);
...
};
Format::Format(qhead *source, qtail *target)
{
...
};
int main ()
{
qhead *Buffer = new qhead(WMODE, 100);
...
// insert formatter into Buffer
qhead *formhead = Buffer->cut();
qtail *formtail = Buffer->tail();
Format form(formhead, formtail);
...
// finished with formatting, restore original Buffer
formhead->splice(formtail);
return 1;
}
You can do this cutting and splicing anytime, inserting and removing filters as needed. As explained in the manual page queue(3C++), Generator continues to write to the same qtail as before, but there is a new qhead associated with it, formhead. Similarly, Printer continues to extract from the same qhead as before, but it is attached to a new qtail, formtail. The formatter, form, reads from the old qhead, and writes to the qtail of the queue that Printer reads.
Scheduling
Scheduling is a cooperative effort among all tasks. Although you don't work directly with scheduling, you may need to know what it can and cannot do. Scheduling does the following:
Three classes of random-number generators are provided, as shown in Table 2-8.
Class |
Description |
randint |
Uniformly-distributed random numbers, int or floating-point. |
urand |
Uniformly-distributed random ints in a given range. |
erand |
Exponentially-distributed ints about a given mean. |
Class randint has the members described in Table 2-9.
Member |
Description |
randint(long seed=0) |
Constructor, providing initial seed for function rand. |
int draw() |
Returns uniformly-distributed ints in the range 0 to INT_MAX. |
float fdraw() |
Returns uniformly-distributed floats in the range 0.0 to 1.0. |
double ddraw() |
Returns uniformly-distributed doubles in the range 0.0 to 1.0. |
void seed(long) |
Sets a new seed and reinitializes the generator. |
Class urand has the members described in Table 2-10.
Member |
Description |
urand(int low, int high) |
Constructor, providing lower and upper bounds of the range. |
int draw() |
Returns uniformly-distributed ints in the range low through high. |
Class erand has the members described in Table 2-11.
Member |
Description |
erand(int mean); |
Constructor, providing the mean value. |
int draw() |
Returns exponentially-distributed ints about the mean. |
Histograms
The coroutine library provides class histogram for data gathering. A histogram consists of a set of bins, each containing a count of the number of items within the range of the bin. When you construct an object of class histogram, you specify the initial range and number of bins. If values outside the current range must be counted, the range is automatically extended by doubling the range of each individual bin. The number of bins cannot be changed. The add function increments the count of the bin associated with the given value. The print function displays the contents of the histogram in the form of a table. Other data is maintained, as described in Table 2-12.
Member |
Description |
histogram(int nbins=16, int l=0, int r=16) |
Constructor; sets the number of bins and initial range |
void add(int value) |
Increment count of the histogram bin for value. |
void print() |
Prints the current histogram. |
int l, r |
Denotes the left and right boundaries of the current range. |
int binsize |
Denotes the current size of each bin. |
int nbin |
Denotes the number of bins (fixed). |
int* h |
Denotes the pointer to storage for bins. |
long sum |
Denotes the sum of all bins. |
long sqsum |
Denotes the sum of squares of all bins. |
Real-Time and Interrupts
As noted in "System Time" on page 22, the coroutine library normally runs independently of realtime, and uses only a simulated passage of time in arbitrary units. A class which handles interrupts is available to allow real-time response to external events. You can define an interrupt handler for any UNIX signal using class Interrupt_handler:
class Interrupt_handler : public object { |
public: |
virtual int pending(); // False once after each interrupt |
Interrupt_handler(int sig); // Create handler for signal sig |
~Interrupt_handler(); |
|
private: |
virtual void interrupt(); // the interrupt handler function |
int signo; // signal number |
int gotint; // got an interrupt but alert not done |
Interrupt_handler *prev; // previous handler for this signal |
}; |
When the signal occurs, the virtual member function interrupt gets control, interrupting whatever task is currently running. When interrupt returns, the original task resumes where it left off. This seems to violate the non-preemptive nature of the task system, but function interrupt is not a task. For this reason, function interrupt should just establish whatever data is necessary for a normal task to process. The base-class version of interrupt does nothing but return. You derive your own handler class from Interrupt_handler to do whatever you need.
Code Example 2-12 Handling Interrupts |
#include <task.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
static char **gargv; // next command-line argument
static char **oargv; // first command-line argument
static int gcount = 0; // number of args gotten
int get_data() { // return the next command-line argument
++gcount;
if( *gargv == 0 ) // recycle if not enough
gargv = oargv;
return atoi(*(gargv++));
}
// KB interrupt handler
class KBhandler : public Interrupt_handler {
void interrupt();
int *simQ, *simQ_end, *simQ_h, *simQ_t; // simulated queue
public:
int getNext(int&); // get the next item from the queue
KBhandler(int size = 5);
~KBhandler() { delete [] simQ; }
};
KBhandler::KBhandler(int size) : Interrupt_handler(SIGINT) {
// set up simulated queue
simQ_t = simQ_h = simQ = new int[size];
simQ_end = &simQ[size];
}
void KBhandler::interrupt() {
// put the next command-line arg into the simulated queue
int *p = simQ_t;
*p = get_data();
if( ++p == simQ_end) p = simQ;
if( p != simQ_h)
simQ_t = p;
else {
puts("interrupt queue overflow");
task_error(0, 0);
}
}
int KBhandler::getNext(int& val) {
int *p = simQ_h;
if( p == simQ_t )
return 0; // queue empty
val = *p;
if( ++p == simQ_end ) p = simQ;
simQ_h = p;
return 1; // data available
}
// our user task which will wait for interrupts
class KBprinter : public task {
KBhandler *handler;
public:
KBprinter();
};
KBprinter::KBprinter() :
task("KBprinter"),
handler(new KBhandler) {
while( gcount < 5 ) { // first 5 values only
wait(handler); // wait for KB interrupt
int i;
while( handler->getNext(i) ) // get any available values
printf("found %d\n", i);
}
resultis(0); // task is finished
}
int main(int argc, char **argv) {
if( argc < 2 ) {
printf("Usage: %s <list of integers>\n", argv[0]);
return 1; // error exit
}
oargv = gargv = &argv[1]; // make command-line data available
KBprinter theTask; // print data on each KB interrupt
theTask.result(); // wait for theTask to finish
thistask->resultis(0); // terminate main task
return 0;
}
The previous sample program is a simulation of a simulation. Imagine that you have a queue of data to be processed, and that an external interrupt (UNIX signal) should trigger a round of processing. In the example, you expect a list of integer values on the program command line, and you use these to simulate a source of integer data. Function get_data returns the next command-line integer, cycling back to the beginning if there are not enough of them.
Note - The main program is an anonymous task, and should terminate by calling resultis on itself.
Coroutine Library Limitations
The coroutine library is flat because a class derived from task may not have derived classes. Only one level of derivation is allowed. This is the way the library was designed and reflects the way the tasks are manipulated on the stack. The enhancement of allowing multiple levels would require a rewrite of the design. For example, the following is not allowed:
class base : public task { ... }; class task1 : public base { ... }; // compiles, but will not work class task2 : public base { ... }; // compiles, but will not work |
class base { ... }; // shared portion class task1 : public task, public base { ... }; // OK class task2 : public task, public base { ... }; // OK |