// ************************************************************************** // // // // eses eses // // eses eses // // eses eseses esesese eses Embedded Systems Group // // ese ese ese ese ese // // ese eseseses eseseses ese Department of Computer Science // // eses eses ese eses // // eses eseses eseseses eses University of Kaiserslautern // // eses eses // // // // ************************************************************************** // // The following module implements a FIFO with interruptible-safe enqueue and // // dequeue operations (see [ScHL99,SSSS00]). Even though these operations are // // not atomic, they are implemented in a way that allows interruptions by // // other enqueue and dequeue operations with higher priorities without loosing// // the consistency of the data structure. // // The FIFO buffer consists of objects that contain a data value "val" of // // type nat{V} and a pointer "nxt" to the next value. Thus it is essentially // // a single-linked list. The objects are taken from a heap that additionally // // maintains a boolean flag "bsy" in these objects that is true iff the object// // is currently used in the FIFO. Since the heap has "size" many elements, all// // pointers have type nat{size}. If a new element is required, we can simply // // choose one having a bsy-flag that is false. // // In addition to the single-linked list, the FIFO buffer also maintains a // // variable "tl" that holds the address of the last element in the FIFO, so // // that enqueue operations can be implemented efficiently. In general, there // // is the problem to decide where this variable points to for empty FIFOs. // // There are several ways to handle this (see the other FIFO example where a // // boolean variable isEmpty is used). In the implementation below, the heap // // element with address 0 is not used for the FIFO, i.e., its "val" field is // // unused, and its nxt-pointer points to the first element of the FIFO if // // there is one, otherwise it may have any value. If the FIFO is empty, then // // tl=0 holds, and 0 is not a valid object. Otherwise, tl>0 points to the last// // element in the FIFO. While this implementation wastes an object, it has the// // advantage that there is no need to distinguish between empty and non-empty // // FIFOs in the enqueue operation. // // // // // // ---------------------------------------- // // Interruptible-safe Enqueue Operation // // ---------------------------------------- // // // // Enqueuing element e atomically requires the following sequential code: // // // // 1: heap[tl].nxt = e // // 2: tl = e // // // // However, if this sequential code would be interrupted between instruction // // (1) and (2), the entire queue data structure would become inconsistent. We // // therefore implement it by the following sequential code: // // // // 1: last = tl; // // 2: tl = e; // // 3: while(heap[last].nxt!=nil) { // // 4: last = heap[last].nxt; // // 5: } // // 6: heap[last].nxt = e; // // // // The main difference is that we exchanged the previous assignments. To solve// // the thereby introduced write-after-read conflict on variable tl, we store // // the given tl pointer in an additional variable last. This implementation is// // interrupt-safe: If enqueue operations interrupt after line 1, then "last" // // no longer points to the tail of the queue. Instead, it then points to an // // element inside the queue, and this is sufficient to restore it before using// // it: If line 2 is executed, e becomes the tail of the queue even though the // // previous tail element does not yet point to e. However, all further enqueue// // operations that interrupt from now on, will only extend the new tail e, and// // will not interfere with the part between the element where last points to // // and e. Thus, we can now restore "last" by the loop in lines 3-5 so that we // // find the last element enqueued between the execution of line 1 and 2. Then,// // we can safely execute line 6 to make the FIFO consistent. // // ************************************************************************** // // size parameters of the benchmark macro size = 6; // size of the heap, i.e. maximal size of queue macro enqP = 4; // maximal number of interleaved enqueue processes macro deqP = 4; // maximal number of interleaved dequeue processes macro V = 10; // values in the queue have type nat{V} // macros for elements in the FIFO and the heap macro val = 0; // value of a heap element macro nxt = 1; // next pointer of a heap element macro bsy = 2; // next pointer of a heap element macro null(x) = x==0; // test whether a next pointer is null macro malloc(i) = (i==0 ? 0 : (heap[i].bsy ? malloc(i-1) : i)); macro new = malloc(size-1); macro hd = 0; module InterruptibleFIFO(nat{enqP+1} ?runP,nat{V} ?x,event !rdy) { [size](nat{V} * nat{size+1} * bool) heap; nat{size} tl; // initialize heap tl = 0; for(i=0..size-1) { heap[i].val = 0; heap[i].nxt = 0; heap[i].bsy = false; } // invoke the interleaved enqueue processes for(i=0..enqP-1) do || { w: await(runP==i); suspend { nat{size} e,last; // local pointers to elements e = new; // get a free heap element next(heap[e].bsy) = true; // make this element unavailable next(heap[e].val) = x; // store the value in occupied element e next(heap[e].nxt) = 0; // set the nxt pointer to nil e1: pause; // here comes the code of interruptible enqueue last = tl; // safe the tl current pointer e2: pause; tl = e; // redirect tl to the new element e e3: pause; while(heap[last].nxt!=0) { // restore variable last e4: pause; next(last) = heap[last].nxt; // follow the further enqueued elements e5: pause; } e6: pause; heap[last].nxt = e; // redirect pointer of last element to e e7: pause; } when(runP>i); emit(rdy); } } drivenby { pause; // for initializing the heap // start all enqueue processes with variable delays for(i=0..enqP-1) { x = i+2; runP = i; for(j=0..abs(enqP-i-1)) d1: pause; } // wait until a process terminates, for(i=1..enqP-1) { d2: await(rdy); next(runP) = runP-1; } d3: await(rdy); }