Package Biskit :: Package PVM :: Module Status
[hide private]
[frames] | no frames]

Source Code for Module Biskit.PVM.Status

  1  ## 
  2  ## Biskit, a toolkit for the manipulation of macromolecular structures 
  3  ## Copyright (C) 2004-2005 Raik Gruenberg & Johan Leckner 
  4  ## 
  5  ## This program is free software; you can redistribute it and/or 
  6  ## modify it under the terms of the GNU General Public License as 
  7  ## published by the Free Software Foundation; either version 2 of the 
  8  ## License, or any later version. 
  9  ## 
 10  ## This program is distributed in the hope that it will be useful, 
 11  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 13  ## General Public License for more details. 
 14  ## 
 15  ## You find a copy of the GNU General Public License in the file 
 16  ## license.txt along with this program; if not, write to the Free 
 17  ## Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 18  ## 
 19  ## 
 20  ## Contributions: Raik Gruenberg 
 21  ## last $Author: leckner $ 
 22  ## last $Date: 2006/02/01 14:36:38 $ 
 23  ## $Revision: 2.2 $ 
 24   
 25  """ 
 26  Thread - save job handling for JobMaster (see L{dispatcher}). 
 27  """ 
 28   
 29  from threading import Thread, RLock, Condition 
 30   
31 -class Status:
32 """ 33 Keep track of objects that are processed by JobMaster. 34 35 Thread-savety: 36 - next_chunk() is synchronized. 37 - deactivate() must be synchronized to the same Status.lock 38 - activate() is not any longer supposed to be called from outside. 39 """ 40
41 - def __init__(self, objects, redistribute=1):
42 """ 43 @param objects: e.g. job IDs 44 @type objects: any 45 @param redistribute: send out started but not yet finished jobs 46 to idle slaves (to not wait for slow slaves) 47 (default: 1) 48 @type redistribute: 1|0 49 """ 50 self.objects = {} 51 map(lambda k, o = self.objects: o.update({k: None}), objects) 52 53 self.redistribute = redistribute 54 55 self.lock = RLock()
56 57
58 - def __activate(self, item):
59 """ 60 not thread-save, synchronize on self.lock! 61 """ 62 if self.objects[item] is None: 63 self.objects[item] = 1 64 else: 65 self.objects[item] += 1
66 67
68 - def deactivate(self, item):
69 """ 70 mark item as processed. 71 not thread-save, synchronize on self.lock! 72 """ 73 if self.objects[item] is None: 74 raise '%s has never been activated.' % item 75 76 self.objects[item] = 0
77 78
79 - def not_done(self):
80 """ 81 Not yet finished items. The not-yet-started come first, then 82 come the active items ordered by how often they have already been 83 distributed. 84 85 @return: list of not yet finished items (started or not) 86 @rtype: [object] 87 """ 88 o = self.objects 89 r = [ k for k in o.keys() if (o[k] != 0) ] 90 91 ## sort by number of times that the item has already been distributed 92 pairs = [(o[k], k) for k in r ] 93 pairs.sort() 94 r = [ x[1] for x in pairs ] 95 96 return r
97 98
99 - def not_started(self):
100 """ 101 @return: list of not yet started items 102 @rtype: [object] 103 """ 104 r = filter(lambda key, s = self: s.objects[key] is None, 105 self.objects.keys()) 106 107 return r
108 109
110 - def activ(self):
111 """ 112 @return: list of currently active (started) items 113 @rtype: [object] 114 """ 115 o = self.objects 116 r = [ k for k in o.keys() if (o[k] != None) and (o[k] > 0) ] 117 118 return r
119 120
121 - def done(self):
122 """ 123 @return: 1 if all items have been processed and finished 124 @rtype: 1|0 125 """ 126 r = not len(filter(lambda v: v <> 0, self.objects.values())) 127 128 return r
129 130
131 - def next_chunk(self, nmax ):
132 """ 133 Get next chunk of at most nmax items that need to be processed. 134 Thread-save. 135 136 @param nmax: size of chunk 137 @type nmax: int 138 139 @return: chunk of items to be processed, number of unproc OR 140 None if all items have been processed 141 @rtype: ([object], int) OR None 142 """ 143 self.lock.acquire() 144 145 queue = self.not_started() 146 147 ## at the end, re-distribute running jobs 148 if not queue and self.redistribute: 149 queue = self.not_done() 150 151 n_left = len(queue) 152 153 if n_left > nmax: 154 queue = queue[:nmax] 155 156 for item in queue: 157 self.__activate(item) 158 159 self.lock.release() 160 161 return queue, n_left
162 163
164 - def __str__(self):
165 names = { None: 'not_started', 166 0: 'finished', 167 1: 'active'} 168 169 l = [] 170 171 for key, value in self.objects.items(): 172 173 key = str(key) 174 175 l.append(key + ': ' + names.get(value, 'active+%s'%str(value))) 176 177 l.append('Status: ' + 'not ' * (not self.done()) + 'done') 178 179 return '\n'.join(l)
180 181 __repr__ = __str__
182