1
2 """
3 This file contains Simerror, FatalSimerror, Process, SimEvent,
4 the resources Resource, Level and Storage
5 as well as their dependencies Buffer, Queue, FIFO and PriorityQ.
6 """
7
8
9 import inspect
10 import new
11 import sys
12 import types
13
14 from SimPy.Lister import Lister
15 from SimPy.Recording import Monitor, Tally
16
17
18 import SimPy.Globals as Globals
19
21 """ SimPy error which terminates "simulate" with an error message"""
24
27
29 """ SimPy error which terminates script execution with an exception"""
33
35 """Superclass of classes which may use generator functions"""
36 - def __init__(self, name = 'a_process', sim = None):
37 if sim is None: sim = Globals.sim
38 self.sim = sim
39
40 self._nextpoint = None
41 if type(name) == type("m"):
42 self.name = name
43 else:
44 raise FatalSimerror("Process name parameter '%s' is not a string"%name)
45 self._nextTime = None
46 self._remainService = 0
47 self._preempted = 0
48 self._priority={}
49 self._getpriority={}
50 self._putpriority={}
51 self._terminated = False
52 self._inInterrupt = False
53 self.eventsFired = []
54 if hasattr(sim, 'trace'):
55 self._doTracing = True
56 else:
57 self._doTracing = False
58
60 return self._nextTime <> None and not self._inInterrupt
61
63 return self._nextTime is None and not self._terminated
64
66 return self._terminated
67
69 return self._inInterrupt and not self._terminated
70
72 return self in resource.waitQ
73
75 """Application function to cancel all event notices for this Process
76 instance;(should be all event notices for the _generator_)."""
77 self.sim._unpost(whom = victim)
78
79 - def start(self, pem = None, at = 'undefined', delay = 'undefined', prior = False):
80 """Activates PEM of this Process.
81 p.start(p.pemname([args])[,{at = t | delay = period}][, prior = False]) or
82 p.start([p.ACTIONS()][,{at = t | delay = period}][, prior = False]) (ACTIONS
83 parameter optional)
84 """
85 if pem is None:
86 try:
87 pem = self.ACTIONS()
88 except AttributeError:
89 raise FatalSimerror\
90 ('no generator function to activate')
91 else:
92 pass
93 if not (type(pem) == types.GeneratorType):
94 raise FatalSimerror('activating function which'+
95 ' is not a generator (contains no \'yield\')')
96 if not self._terminated and not self._nextTime:
97
98 self._nextpoint = pem
99 if at == 'undefined':
100 at = self.sim._t
101 if delay == 'undefined':
102 zeit = max(self.sim._t, at)
103 else:
104 zeit = max(self.sim._t, self.sim._t + delay)
105 if self._doTracing:
106 self.sim.trace.recordActivate(who = self, when = zeit,
107 prior = prior)
108 self.sim._post(what = self, at = zeit, prior = prior)
109
111 if len(a[0]) == 3:
112 delay = a[0][2]
113 if delay < 0:
114 raise FatalSimerror('hold: delay time negative: %s, in %s' % (
115 delay, str(a[0][1])))
116 else:
117 delay = 0
118 who = a[1]
119 self.interruptLeft = delay
120 self._inInterrupt = False
121 self.interruptCause = None
122 self.sim._post(what = who, at = self.sim._t + delay)
123
125 a[0][1]._nextTime = None
126
128 """Application function to interrupt active processes"""
129
130 if victim.active():
131 if self._doTracing:
132 save = self.sim.trace._comment
133 self.sim.trace._comment = None
134 victim.interruptCause = self
135 left = victim._nextTime - self.sim._t
136 victim.interruptLeft = left
137 victim._inInterrupt = True
138 self.sim.reactivate(victim)
139 if self._doTracing:
140 self.sim.trace._comment = save
141 self.sim.trace.recordInterrupt(self, victim)
142 return left
143 else:
144 return None
145
147 """
148 Application function for an interrupt victim to get out of
149 'interrupted' state.
150 """
151 self._inInterrupt = False
152
154 """Multi - functional test for reneging for 'request' and 'get':
155 (1)If res of type Resource:
156 Tests whether resource res was acquired when proces reactivated.
157 If yes, the parallel wakeup process is killed.
158 If not, process is removed from res.waitQ (reneging).
159 (2)If res of type Store:
160 Tests whether item(s) gotten from Store res.
161 If yes, the parallel wakeup process is killed.
162 If no, process is removed from res.getQ
163 (3)If res of type Level:
164 Tests whether units gotten from Level res.
165 If yes, the parallel wakeup process is killed.
166 If no, process is removed from res.getQ.
167 """
168 if isinstance(res, Resource):
169 test = self in res.activeQ
170 if test:
171 self.cancel(self._holder)
172 else:
173 res.waitQ.remove(self)
174 if res.monitored:
175 res.waitMon.observe(len(res.waitQ),t = self.sim.now())
176 return test
177 elif isinstance(res, Store):
178 test = len(self.got)
179 if test:
180 self.cancel(self._holder)
181 else:
182 res.getQ.remove(self)
183 if res.monitored:
184 res.getQMon.observe(len(res.getQ),t = self.sim.now())
185 return test
186 elif isinstance(res, Level):
187 test = not (self.got is None)
188 if test:
189 self.cancel(self._holder)
190 else:
191 res.getQ.remove(self)
192 if res.monitored:
193 res.getQMon.observe(len(res.getQ),t = self.sim.now())
194 return test
195
197 """Test for reneging for 'yield put . . .' compound statement (Level and
198 Store. Returns True if not reneged.
199 If self not in buffer.putQ, kill wakeup process, else take self out of
200 buffer.putQ (reneged)"""
201 test = self in buffer.putQ
202 if test:
203 buffer.putQ.remove(self)
204 if buffer.monitored:
205 buffer.putQMon.observe(len(buffer.putQ),t = self.sim.now())
206 else:
207 self.cancel(self._holder)
208 return not test
209
210
212 """Supports one - shot signalling between processes. All processes waiting for an event to occur
213 get activated when its occurrence is signalled. From the processes queuing for an event, only
214 the first gets activated.
215 """
216 - def __init__(self, name = 'a_SimEvent', sim = None):
217 if sim is None: sim = Globals.sim
218 self.sim = sim
219 self.name = name
220 self.waits = []
221 self.queues = []
222 self.occurred = False
223 self.signalparam = None
224 if hasattr(sim, 'trace'):
225 self._doTracing = True
226 else:
227 self._doTracing = False
228
229 - def signal(self, param = None):
230 """Produces a signal to self;
231 Fires this event (makes it occur).
232 Reactivates ALL processes waiting for this event. (Cleanup waits lists
233 of other events if wait was for an event - group (OR).)
234 Reactivates the first process for which event(s) it is queuing for
235 have fired. (Cleanup queues of other events if wait was for an event - group (OR).)
236 """
237 self.signalparam = param
238 if self._doTracing:
239 self.sim.trace.recordSignal(self)
240 if not self.waits and not self.queues:
241 self.occurred = True
242 else:
243
244 for p in self.waits:
245 p[0].eventsFired.append(self)
246 self.sim.reactivate(p[0], prior = True)
247
248 for ev in p[1]:
249 if ev != self:
250 if ev.occurred:
251 p[0].eventsFired.append(ev)
252 for iev in ev.waits:
253 if iev[0] == p[0]:
254 ev.waits.remove(iev)
255 break
256 self.waits = []
257 if self.queues:
258 proc = self.queues.pop(0)[0]
259 proc.eventsFired.append(self)
260 self.sim.reactivate(proc)
261
263 """Consumes a signal if it has occurred, otherwise process 'proc'
264 waits for this event.
265 """
266 proc = par[0][1]
267
268 if __debug__:
269 if not (proc.sim == self.sim):
270 raise FatalSimerror,\
271 "waitevent: Process %s, SimEvent %s not in "\
272 "same Simulation instance"%(proc.name,self.name)
273 proc.eventsFired = []
274 if not self.occurred:
275 self.waits.append([proc, [self]])
276 proc._nextTime = None
277 else:
278 proc.eventsFired.append(self)
279 self.occurred = False
280 self.sim._post(proc, at = self.sim._t, prior = 1)
281
283 """Handles waiting for an OR of events in a tuple / list.
284 """
285 proc = par[0][1]
286 evlist = par[0][2]
287 proc.eventsFired = []
288 anyoccur = False
289 for ev in evlist:
290
291 if __debug__:
292 if not (proc.sim == ev.sim):
293 raise FatalSimerror,\
294 "waitevent: Process %s, SimEvent %s not in "\
295 "same Simulation instance"%(proc.name,ev.name)
296 if ev.occurred:
297 anyoccur = True
298 proc.eventsFired.append(ev)
299 ev.occurred = False
300 if anyoccur:
301 self.sim._post(proc, at = self.sim._t, prior = 1)
302
303 else:
304 proc.eventsFired = []
305 proc._nextTime = None
306 for ev in evlist:
307 ev.waits.append([proc, evlist])
308
310 """Consumes a signal if it has occurred, otherwise process 'proc'
311 queues for this event.
312 """
313 proc = par[0][1]
314 proc.eventsFired = []
315
316 if __debug__:
317 if not (proc.sim == self.sim):
318 raise FatalSimerror,\
319 "queueevent: Process %s, SimEvent %s not in "\
320 "same Simulation instance"%(proc.name,self.name)
321 if not self.occurred:
322 self.queues.append([proc, [self]])
323 proc._nextTime = None
324 else:
325 proc.eventsFired.append(self)
326 self.occurred = False
327 self.sim._post(proc, at = self.sim._t, prior = 1)
328
330 """Handles queueing for an OR of events in a tuple / list.
331 """
332 proc = par[0][1]
333 evlist = par[0][2]
334 proc.eventsFired = []
335 anyoccur = False
336 for ev in evlist:
337
338 if __debug__:
339 if not (proc.sim == ev.sim):
340 raise FatalSimerror,\
341 "yield queueevent: Process %s, SimEvent %s not in "\
342 "same Simulation instance"%(proc.name,ev.name)
343 if ev.occurred:
344 anyoccur = True
345 proc.eventsFired.append(ev)
346 ev.occurred = False
347 if anyoccur:
348 self.sim._post(proc, at = self.sim._t, prior = 1)
349
350 else:
351 proc.eventsFired = []
352 proc._nextTime = None
353 for ev in evlist:
354 ev.queues.append([proc, evlist])
355
356
359 if not moni is None:
360 self.monit = True
361 else:
362 self.monit = False
363 self.moni = moni
364 self.resource = res
365
368
371
373 self.remove(obj)
374 if self.monit:
375 self.moni.observe(len(self), t = self.moni.sim.now())
376
380
382 self.append(obj)
383 if self.monit:
384 self.moni.observe(len(self),t = self.moni.sim.now())
385
388
391
393 a = self.pop(0)
394 if self.monit:
395 self.moni.observe(len(self),t = self.moni.sim.now())
396 return a
397
399 """Queue is always ordered according to priority.
400 Higher value of priority attribute == higher priority.
401 """
404
406 """Handles request queue for Resource"""
407 if len(self):
408 ix = self.resource
409 if self[-1]._priority[ix] >= obj._priority[ix]:
410 self.append(obj)
411 else:
412 z = 0
413 while self[z]._priority[ix] >= obj._priority[ix]:
414 z += 1
415 self.insert(z, obj)
416 else:
417 self.append(obj)
418 if self.monit:
419 self.moni.observe(len(self),t = self.moni.sim.now())
420
422 """Handles getQ in Buffer"""
423 if len(self):
424 ix = self.resource
425
426 if self[-1]._getpriority[ix] >= obj._getpriority[ix]:
427 self.append(obj)
428 else:
429 z = 0
430 while self[z]._getpriority[ix] >= obj._getpriority[ix]:
431 z += 1
432 self.insert(z, obj)
433 else:
434 self.append(obj)
435 if self.monit:
436 self.moni.observe(len(self),t = self.moni.sim.now())
437
439 """Handles putQ in Buffer"""
440 if len(self):
441 ix = self.resource
442
443 if self[-1]._putpriority[ix] >= obj._putpriority[ix]:
444 self.append(obj)
445 else:
446 z = 0
447 while self[z]._putpriority[ix] >= obj._putpriority[ix]:
448 z += 1
449 self.insert(z, obj)
450 else:
451 self.append(obj)
452 if self.monit:
453 self.moni.observe(len(self),t = self.moni.sim.now())
454
456 """Models shared, limited capacity resources with queuing;
457 FIFO is default queuing discipline.
458 """
459
460 - def __init__(self, capacity = 1, name = 'a_resource', unitName = 'units',
461 qType = FIFO, preemptable = 0, monitored = False,
462 monitorType = Monitor,sim=None):
463 """
464 monitorType={Monitor(default) | Tally}
465 """
466
467 if sim is None: sim = Globals.sim
468 self.sim = sim
469 self.name = name
470 self.capacity = capacity
471 self.unitName = unitName
472 self.n = capacity
473 self.monitored = monitored
474
475 if self.monitored:
476 self.actMon = monitorType(name = 'Active Queue Monitor %s'%self.name,
477 ylab = 'nr in queue', tlab = 'time',
478 sim=self.sim)
479 monact = self.actMon
480 self.waitMon = monitorType(name = 'Wait Queue Monitor %s'%self.name,
481 ylab = 'nr in queue', tlab = 'time',
482 sim=self.sim)
483 monwait = self.waitMon
484 else:
485 monwait = None
486 monact = None
487 self.waitQ = qType(self, monwait)
488 self.preemptable = preemptable
489 self.activeQ = qType(self, monact)
490 self.priority_default = 0
491
493 """Process request event for this resource"""
494 obj = arg[1]
495
496 if __debug__:
497 if not (obj.sim == self.sim):
498 raise FatalSimerror,\
499 "yield request: Process %s, Resource %s not in "\
500 "same Simulation instance"%(obj.name,self.name)
501 if len(arg[0]) == 4:
502 obj._priority[self] = arg[0][3]
503 else:
504 obj._priority[self] = self.priority_default
505 if self.preemptable and self.n == 0:
506
507 preempt = obj._priority[self] > self.activeQ[-1]._priority[self]
508
509 if preempt:
510 z = self.activeQ[-1]
511
512 z._preempted += 1
513
514
515 if z._preempted == 1:
516 z._remainService = z._nextTime - self.sim._t
517
518 Process(sim=self.sim).cancel(z)
519
520 self.activeQ.remove(z)
521
522 self.waitQ.insert(0, z)
523
524 if self.monitored:
525 self.waitMon.observe(len(self.waitQ), self.sim.now())
526
527 z._nextTime = None
528
529 self.activeQ.enter(obj)
530
531 self.sim._post(obj, at = self.sim._t, prior = 1)
532 else:
533 self.waitQ.enter(obj)
534
535 obj._nextTime = None
536 else:
537 if self.n == 0:
538 self.waitQ.enter(obj)
539
540 obj._nextTime = None
541 else:
542 self.n -= 1
543 self.activeQ.enter(obj)
544 self.sim._post(obj, at = self.sim._t, prior = 1)
545
547 """Process release request for this resource"""
548 actor = arg[1]
549 self.n += 1
550 self.activeQ.remove(arg[1])
551 if self.monitored:
552 self.actMon.observe(len(self.activeQ),t = self.sim.now())
553
554 if self.waitQ:
555 obj = self.waitQ.leave()
556 self.n -= 1
557 self.activeQ.enter(obj)
558
559 if self.preemptable:
560
561 if obj._preempted:
562
563 obj._preempted -= 1
564
565
566 if obj._preempted == 0:
567 self.sim.reactivate(obj, delay = obj._remainService,
568 prior = 1)
569
570 else:
571 self.sim.reactivate(obj, delay = 0, prior = 1)
572
573 else:
574 self.sim.reactivate(obj, delay = 0, prior = 1)
575 self.sim._post(arg[1], at = self.sim._t, prior = 1)
576
578 """Abstract class for buffers
579 Blocks a process when a put would cause buffer overflow or a get would cause
580 buffer underflow.
581 Default queuing discipline for blocked processes is FIFO."""
582
583 priorityDefault = 0
584 - def __init__(self, name = None, capacity = 'unbounded', unitName = 'units',
585 putQType = FIFO, getQType = FIFO,
586 monitored = False, monitorType = Monitor, initialBuffered = None,
587 sim = None):
588 if sim is None: sim = Globals.sim
589 self.sim = sim
590 if capacity == 'unbounded': capacity = sys.maxint
591 self.capacity = capacity
592 self.name = name
593 self.putQType = putQType
594 self.getQType = getQType
595 self.monitored = monitored
596 self.initialBuffered = initialBuffered
597 self.unitName = unitName
598 if self.monitored:
599
600 self.putQMon = monitorType(name = 'Producer Queue Monitor %s'%self.name,
601 ylab = 'nr in queue', tlab = 'time',
602 sim=self.sim)
603
604 self.getQMon = monitorType(name = 'Consumer Queue Monitor %s'%self.name,
605 ylab = 'nr in queue', tlab = 'time',
606 sim=self.sim)
607
608 self.bufferMon = monitorType(name = 'Buffer Monitor %s'%self.name,
609 ylab = 'nr in buffer', tlab = 'time',
610 sim=self.sim)
611 else:
612 self.putQMon = None
613 self.getQMon = None
614 self.bufferMon = None
615 self.putQ = self.putQType(res = self, moni = self.putQMon)
616 self.getQ = self.getQType(res = self, moni = self.getQMon)
617 if self.monitored:
618 self.putQMon.observe(y = len(self.putQ),t = self.sim.now())
619 self.getQMon.observe(y = len(self.getQ),t = self.sim.now())
620 self._putpriority={}
621 self._getpriority={}
622
623 def _put(self):
624 pass
625 def _get(self):
626 pass
627
629 """Models buffers for processes putting / getting un - distinguishable items.
630 """
633
636
637 theBuffer = property(gettheBuffer)
638
640 Buffer.__init__(self,**pars)
641 if self.name is None:
642 self.name = 'a_level'
643
644 if (type(self.capacity) != type(1.0) and\
645 type(self.capacity) != type(1)) or\
646 self.capacity < 0:
647 raise FatalSimerror\
648 ('Level: capacity parameter not a positive number: %s'\
649 %self.initialBuffered)
650
651 if type(self.initialBuffered) == type(1.0) or\
652 type(self.initialBuffered) == type(1):
653 if self.initialBuffered > self.capacity:
654 raise FatalSimerror('initialBuffered exceeds capacity')
655 if self.initialBuffered >= 0:
656 self.nrBuffered = self.initialBuffered
657
658 else:
659 raise FatalSimerror\
660 ('initialBuffered param of Level negative: %s'\
661 %self.initialBuffered)
662 elif self.initialBuffered is None:
663 self.initialBuffered = 0
664 self.nrBuffered = 0
665 else:
666 raise FatalSimerror\
667 ('Level: wrong type of initialBuffered (parameter=%s)'\
668 %self.initialBuffered)
669 if self.monitored:
670 self.bufferMon.observe(y = self.amount, t = self.sim.now())
671 amount = property(getamount)
672
673 - def _put(self, arg):
674 """Handles put requests for Level instances"""
675 obj = arg[1]
676 whichSim=self.sim
677
678 if __debug__:
679 if not (obj.sim == self.sim):
680 raise FatalSimerror,\
681 "put: Process %s, Level %s not in "\
682 "same Simulation instance"%(obj.name,self.name)
683 if len(arg[0]) == 5:
684 obj._putpriority[self] = arg[0][4]
685 whatToPut = arg[0][3]
686 elif len(arg[0]) == 4:
687 obj._putpriority[self] = Buffer.priorityDefault
688 whatToPut = arg[0][3]
689 else:
690 obj._putpriority[self] = Buffer.priorityDefault
691 whatToPut = 1
692 if type(whatToPut) != type(1) and type(whatToPut) != type(1.0):
693 raise FatalSimerror('Level: put parameter not a number')
694 if not whatToPut >= 0.0:
695 raise FatalSimerror('Level: put parameter not positive number')
696 whatToPutNr = whatToPut
697 if whatToPutNr + self.amount > self.capacity:
698 obj._nextTime = None
699 obj._whatToPut = whatToPutNr
700 self.putQ.enterPut(obj)
701 else:
702 self.nrBuffered += whatToPutNr
703 if self.monitored:
704 self.bufferMon.observe(y = self.amount, t = self.sim.now())
705
706
707
708 while len(self.getQ) and self.amount > 0:
709 proc = self.getQ[0]
710 if proc._nrToGet <= self.amount:
711 proc.got = proc._nrToGet
712 self.nrBuffered -= proc.got
713 if self.monitored:
714 self.bufferMon.observe(y = self.amount, t = self.sim.now())
715 self.getQ.takeout(proc)
716 whichSim._post(proc, at = whichSim._t)
717 else:
718 break
719 whichSim._post(obj, at = whichSim._t, prior = 1)
720
721 - def _get(self, arg):
722 """Handles get requests for Level instances"""
723 obj = arg[1]
724
725 if __debug__:
726 if not (obj.sim == self.sim):
727 raise FatalSimerror,\
728 "get: Process %s, Level %s not in "\
729 "same Simulation instance"%(obj.name,self.name)
730 obj.got = None
731 if len(arg[0]) == 5:
732 obj._getpriority[self] = arg[0][4]
733 nrToGet = arg[0][3]
734 elif len(arg[0]) == 4:
735 obj._getpriority[self] = Buffer.priorityDefault
736 nrToGet = arg[0][3]
737 else:
738 obj._getpriority[self] = Buffer.priorityDefault
739 nrToGet = 1
740 if type(nrToGet) != type(1.0) and type(nrToGet) != type(1):
741 raise FatalSimerror\
742 ('Level: get parameter not a number: %s'%nrToGet)
743 if nrToGet < 0:
744 raise FatalSimerror\
745 ('Level: get parameter not positive number: %s'%nrToGet)
746 if self.amount < nrToGet:
747 obj._nrToGet = nrToGet
748 self.getQ.enterGet(obj)
749
750 obj._nextTime = None
751 else:
752 obj.got = nrToGet
753 self.nrBuffered -= nrToGet
754 if self.monitored:
755 self.bufferMon.observe(y = self.amount, t = self.sim.now())
756 self.sim._post(obj, at = self.sim._t, prior = 1)
757
758
759
760 while len(self.putQ):
761 proc = self.putQ[0]
762 if proc._whatToPut + self.amount <= self.capacity:
763 self.nrBuffered += proc._whatToPut
764 if self.monitored:
765 self.bufferMon.observe(y = self.amount, t = self.sim.now())
766 self.putQ.takeout(proc)
767 self.sim._post(proc, at = self.sim._t)
768 else:
769 break
770
772 """Models buffers for processes coupled by putting / getting distinguishable
773 items.
774 Blocks a process when a put would cause buffer overflow or a get would cause
775 buffer underflow.
776 Default queuing discipline for blocked processes is priority FIFO.
777 """
780 nrBuffered = property(getnrBuffered)
781
784 buffered = property(getbuffered)
785
787 Buffer.__init__(self,**pars)
788 self.theBuffer = []
789 if self.name is None:
790 self.name = 'a_store'
791 if type(self.capacity) != type(1) or self.capacity <= 0:
792 raise FatalSimerror\
793 ('Store: capacity parameter not a positive integer: %s'\
794 %self.capacity)
795 if type(self.initialBuffered) == type([]):
796 if len(self.initialBuffered) > self.capacity:
797 raise FatalSimerror\
798 ('Store: number initialBuffered exceeds capacity')
799 else:
800
801 self.theBuffer[:] = self.initialBuffered
802 elif self.initialBuffered is None:
803 self.theBuffer = []
804 else:
805 raise FatalSimerror\
806 ('Store: initialBuffered not a list')
807 if self.monitored:
808 self.bufferMon.observe(y = self.nrBuffered, t = self.sim.now())
809 self._sort = None
810
811
812
814 """Adds buffer sorting to this instance of Store. It maintains
815 theBuffer sorted by the sortAttr attribute of the objects in the
816 buffer.
817 The user - provided 'sortFunc' must look like this:
818
819 def mySort(self, par):
820 tmplist = [(x.sortAttr, x) for x in par]
821 tmplist.sort()
822 return [x for (key, x) in tmplist]
823
824 """
825
826 self._sort = new.instancemethod(sortFunc, self, self.__class__)
827 self.theBuffer = self._sort(self.theBuffer)
828
829 - def _put(self, arg):
830 """Handles put requests for Store instances"""
831 obj = arg[1]
832
833 if __debug__:
834 if not (obj.sim == self.sim):
835 raise FatalSimerror,\
836 "put: Process %s, Store %s not in "\
837 "same Simulation instance"%(obj.name,self.name)
838 whichSim=self.sim
839 if len(arg[0]) == 5:
840 obj._putpriority[self] = arg[0][4]
841 whatToPut = arg[0][3]
842 elif len(arg[0]) == 4:
843 obj._putpriority[self] = Buffer.priorityDefault
844 whatToPut = arg[0][3]
845 else:
846 raise FatalSimerror('Item to put missing in yield put stmt')
847 if type(whatToPut) != type([]):
848 raise FatalSimerror('put parameter is not a list')
849 whatToPutNr = len(whatToPut)
850 if whatToPutNr + self.nrBuffered > self.capacity:
851 obj._nextTime = None
852 obj._whatToPut = whatToPut
853 self.putQ.enterPut(obj)
854 else:
855 self.theBuffer.extend(whatToPut)
856 if not(self._sort is None):
857 self.theBuffer = self._sort(self.theBuffer)
858 if self.monitored:
859 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now())
860
861
862
863
864 while self.nrBuffered > 0 and len(self.getQ):
865 proc = self.getQ[0]
866 if inspect.isfunction(proc._nrToGet):
867 movCand = proc._nrToGet(self.theBuffer)
868 if movCand:
869 proc.got = movCand[:]
870 for i in movCand:
871 self.theBuffer.remove(i)
872 self.getQ.takeout(proc)
873 if self.monitored:
874 self.bufferMon.observe(
875 y = self.nrBuffered, t = whichSim._t)
876 whichSim._post(what = proc, at = whichSim._t)
877 else:
878 break
879 else:
880 if proc._nrToGet <= self.nrBuffered:
881 nrToGet = proc._nrToGet
882 proc.got = []
883 proc.got[:] = self.theBuffer[0:nrToGet]
884 self.theBuffer[:] = self.theBuffer[nrToGet:]
885 if self.monitored:
886 self.bufferMon.observe(
887 y = self.nrBuffered, t = whichSim._t)
888
889 self.getQ.takeout(proc)
890 whichSim._post(what = proc, at = whichSim._t)
891 else:
892 break
893
894 whichSim._post(what = obj, at = whichSim._t, prior = 1)
895
896 - def _get(self, arg):
897 """Handles get requests"""
898 filtfunc = None
899 obj = arg[1]
900
901 if __debug__:
902 if not (obj.sim == self.sim):
903 raise FatalSimerror,\
904 "get: Process %s, Store %s not in "\
905 "same Simulation instance"%(obj.name,self.name)
906 whichSim=obj.sim
907 obj.got = []
908 if len(arg[0]) == 5:
909 obj._getpriority[self] = arg[0][4]
910 if inspect.isfunction(arg[0][3]):
911 filtfunc = arg[0][3]
912 else:
913 nrToGet = arg[0][3]
914 elif len(arg[0]) == 4:
915 obj._getpriority[self] = Buffer.priorityDefault
916 if inspect.isfunction(arg[0][3]):
917 filtfunc = arg[0][3]
918 else:
919 nrToGet = arg[0][3]
920 else:
921 obj._getpriority[self] = Buffer.priorityDefault
922 nrToGet = 1
923 if not filtfunc:
924 if nrToGet < 0:
925 raise FatalSimerror\
926 ('Store: get parameter not positive number: %s'%nrToGet)
927 if self.nrBuffered < nrToGet:
928 obj._nrToGet = nrToGet
929 self.getQ.enterGet(obj)
930
931 obj._nextTime = None
932 else:
933 for i in range(nrToGet):
934 obj.got.append(self.theBuffer.pop(0))
935
936 if self.monitored:
937 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now())
938 whichSim._post(obj, at = whichSim._t, prior = 1)
939
940
941
942 while len(self.putQ):
943 proc = self.putQ[0]
944 if len(proc._whatToPut) + self.nrBuffered <= self.capacity:
945 for i in proc._whatToPut:
946 self.theBuffer.append(i)
947 if not(self._sort is None):
948 self.theBuffer = self._sort(self.theBuffer)
949 if self.monitored:
950 self.bufferMon.observe(
951 y = self.nrBuffered, t = whichSim.now())
952 self.putQ.takeout(proc)
953 whichSim._post(proc, at = whichSim._t)
954 else:
955 break
956 else:
957 movCand = filtfunc(self.theBuffer)
958 if movCand:
959 whichSim._post(obj, at = whichSim._t, prior = 1)
960 obj.got = movCand[:]
961 for item in movCand:
962 self.theBuffer.remove(item)
963 if self.monitored:
964 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now())
965
966
967
968 while len(self.putQ):
969 proc = self.putQ[0]
970 if len(proc._whatToPut) + self.nrBuffered <= self.capacity:
971 for i in proc._whatToPut:
972 self.theBuffer.append(i)
973 if not(self._sort is None):
974 self.theBuffer = self._sort(self.theBuffer)
975 if self.monitored:
976 self.bufferMon.observe(
977 y = self.nrBuffered, t = whichSim.now())
978 self.putQ.takeout(proc)
979 whichSim._post(proc, at = whichSim._t)
980 else:
981 break
982 else:
983 obj._nrToGet = filtfunc
984 self.getQ.enterGet(obj)
985
986 obj._nextTime = None
987