Package Biskit :: Package PVM :: Module TrackingJobMaster
[hide private]
[frames] | no frames]

Source Code for Module Biskit.PVM.TrackingJobMaster

  1  ## 
  2  ## Biskit, a toolkit for the manipulation of macromolecular structures 
  3  ## Copyright (C) 2004-2005 Raik Gruenberg & Johan Leckner 
  4  ## 
  5  ## This program is free software; you can redistribute it and/or 
  6  ## modify it under the terms of the GNU General Public License as 
  7  ## published by the Free Software Foundation; either version 2 of the 
  8  ## License, or any later version. 
  9  ## 
 10  ## This program is distributed in the hope that it will be useful, 
 11  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 13  ## General Public License for more details. 
 14  ## 
 15  ## You find a copy of the GNU General Public License in the file 
 16  ## license.txt along with this program; if not, write to the Free 
 17  ## Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 18  ## 
 19  ## 
 20  ## $Revision: 2.3 $ 
 21  ## last $Author: leckner $ 
 22  ## last $Date: 2006/09/13 10:28:14 $ 
 23   
 24  """ 
 25  Add some extra functionality to JobMaster 
 26  """ 
 27   
 28  from Biskit.PVM.dispatcher import JobMaster 
 29  import pvmTools as pvm 
 30  from Biskit.PVM.Status import Status 
 31  import Biskit.tools as T 
 32   
 33  from threading import Thread, RLock, _RLock, Condition, _Condition 
 34  import time 
 35  import copy 
 36   
37 -class TrackingJobMaster( JobMaster ):
38 """ 39 TrackingJobMaster 40 41 This class extends JobMaster with the following extras: 42 - reporting of the average time each slave spends on a job 43 - automatic adding of slave computers to PVM 44 - different ways to be notified of a completed calculation 45 - restarting of interrupted calculations 46 47 The calculation is performed non-blocking in a thread after a call 48 to master.start(). 49 The end of calculation is signalled on master.lock / master.lockMsg. 50 The result can then be obtained with getResult(). 51 52 Alternatively, a callback method can be registered that is called 53 after the calculation finished (master.setCallback()). 54 55 The perhaps easiest (but also least flexible) way is to instead use the 56 calculateResult() method. This starts the calculation and blocks execution 57 until the result is returned. 58 59 Consider overriding cleanup(), done() and getResult(). 60 61 An interrupted calculation can be restarted from a restart file: 62 - during calculation, pickle the result of getRst() to a file 63 - call the script Biskit/restartPVM -i |file_name| 64 65 Manual restart is possible as follows: 66 1. pickle master.data, master.result, master.status.objects 67 2. master.exit() / Exception / kill, etc. 68 3. initialize master with same parameters as before 69 4. unpickle and re-assign master.data, master.result, 70 master.status.objects 71 5. master.start() 72 73 @note: The master sends out an exit signal to all slaves but doesn't 74 wait for a response (there isn't any) and continues in the finish() 75 method. Since, at the end, the same job is distributed to several slaves, 76 some of them might still be running when cleanup() or done() are 77 executed. The slave script must tolerate errors that, e.g., happen 78 if cleanup() is called while it is running. 79 80 @todo: try finding some solution to the problem where the master 81 sends out an exit signal to all slaves but doesn't wait for 82 a response (see note) 83 @todo: test restart function 84 @todo: restart data are not automatically saved (e.g. in intervals) 85 """ 86
87 - def __init__(self, data={}, chunk_size=5, 88 hosts=[], niceness={'default':20}, 89 slave_script='', verbose=1, 90 show_output=0, add_hosts=1, redistribute=1 ):
91 """ 92 @param data: dict of items to be processed 93 @type data: {str_id:any} 94 @param chunk_size: number of items that are processed per job 95 @type chunk_size: int 96 @param hosts: list of host-names 97 @type hosts: [str] 98 @param niceness: host niceness dictionary {str_host-name: int_niceness} 99 @type niceness: {str:int} 100 @param slave_script: absolute path to slave-script 101 @type slave_script: str 102 @param verbose: verbosity level (default: 1) 103 @type verbose: 1|0 104 @param show_output: display one xterm per slave (default: 0) 105 @type show_output: 1|0 106 @param add_hosts: add hosts to PVM before starting (default: 1) 107 @type add_hosts: 1|0 108 @param redistribute: at the end, send same job out several times 109 (default: 1) 110 @type redistribute: 1|0 111 """ 112 if add_hosts: 113 if verbose: T.errWrite('adding %i hosts to pvm...' % len(hosts) ) 114 pvm.addHosts( hosts=hosts ) 115 if verbose: T.errWriteln('done') 116 117 JobMaster.__init__( self, data, chunk_size, hosts, niceness, 118 slave_script, show_output=show_output, 119 redistribute=redistribute, verbose=verbose ) 120 121 self.progress = {} 122 123 self.disabled_hosts = [] 124 self.slow_hosts = {} 125 126 self.verbose = verbose 127 128 ## end of calculation is signalled on lockMsg 129 self.lock = RLock() 130 self.lockMsg = Condition( self.lock ) 131 132 ## this method is called when everything is calculated 133 self.call_done = None
134 135
136 - def hostnameFromTID( self, slave_tid ):
137 """ 138 Get nickname of host from TaskID. 139 140 @param slave_tid: slave task tid 141 @type slave_tid: int 142 """ 143 nickname = self.nicknameFromTID( slave_tid ) 144 return nickname.split('_')[0]
145 146
147 - def is_valid_slave( self, slave_tid ):
148 """ 149 Override JobMaster method to disable slow nodes on the fly 150 151 @param slave_tid: slave task tid 152 @type slave_tid: int 153 """ 154 return self.hostnameFromTID( slave_tid ) not in self.disabled_hosts
155 156
157 - def mark_slow_slaves( self, host_list, slow_factor ):
158 """ 159 @param host_list: list of hosts 160 @type host_list: [str] 161 @param slow_factor: factor describing the calculation speed of a node 162 @type slow_factor: float 163 """ 164 for h in host_list: 165 self.slow_hosts[h] = slow_factor
166 167
168 - def start_job( self, slave_tid ):
169 """ 170 Overriding JobMaster method 171 172 @param slave_tid: slave task tid 173 @type slave_tid: int 174 """ 175 host = self.nicknameFromTID( slave_tid ) 176 177 d = {'given':0, 'done':0, 'time':0 } 178 if self.progress.has_key( host ): 179 d = self.progress[ host ] 180 181 d['given'] += 1 182 d['timeStart'] = time.time() 183 184 self.progress[ host ] = d
185 186
187 - def job_done( self, slave_tid, result ):
188 """ 189 Overriding JobMaster method 190 191 @param slave_tid: slave task tid 192 @type slave_tid: int 193 @param result: slave result dictionary 194 @type result: dict 195 """ 196 host = self.nicknameFromTID( slave_tid ) 197 198 self.progress[host]['done'] += 1 199 self.progress[host]['time'] = time.time() \ 200 - self.progress[host]['timeStart']
201 202
203 - def reportProgress( self ):
204 """ 205 Report how many jobs were processed in what time per host. 206 """ 207 if self.verbose: 208 print 'host \tgiven\tdone\t time' 209 for host in self.progress: 210 211 d = self.progress[host] 212 print '%-25s\t%i\t%i\t%6.2f s' %\ 213 (host, d['given'], d['done'], d['time'])
214 215
216 - def setCallback( self, funct ):
217 """ 218 Register function to be called after calculation is finished. 219 @param funct: will be called with an instance of the master 220 as single argument 221 @type funct: function 222 """ 223 self.call_done = funct
224 225
226 - def cleanup( self ):
227 """ 228 Called after exit. Override. 229 """ 230 pass
231 232
233 - def done( self ):
234 """ 235 Called by finish() after exit(), cleanup(), and reportProgress(), but 236 before thread notification (notifyAll() ) and before executing 237 the callBack method. Override. 238 """ 239 pass
240 241
242 - def notifyAll( self ):
243 """ 244 Notify thread waiting on self.lockMsg that master has finished. 245 """ 246 self.lock.acquire() 247 self.lockMsg.notifyAll() 248 self.lock.release()
249 250
251 - def finish(self):
252 """ 253 Called one time, after last job result has been received. It should 254 not be necessary to override this further. Override done() instead. 255 """ 256 self.exit() 257 self.cleanup() 258 259 self.reportProgress() 260 261 self.done() 262 263 self.notifyAll() 264 265 if self.call_done: 266 self.call_done( self )
267 268
269 - def getResult( self, **arg ):
270 """ 271 Return result dict, if it is available. 272 Override to return something else - which will also be the return value 273 of calculateResult(). 274 275 @param arg: keyword-value pairs, for subclass implementations 276 @type arg: {key:value} 277 278 @return: {any:any} 279 @rtype: {any:any} 280 """ 281 return self.result
282 283
284 - def calculateResult( self, **arg ):
285 """ 286 Convenience function that is starting the parallel calculation and 287 blocks execution until it is finished. 288 289 @param arg: keyword-value pairs, for subclass implementations 290 @type arg: {key:value} 291 292 @return: array( (n_frames, n_frames), 'f'), matrix of pairwise rms 293 @rtype: array 294 """ 295 self.start() 296 297 self.lock.acquire() 298 self.lockMsg.wait() 299 self.lock.release() 300 301 return self.getResult( **arg )
302 303
304 - def getRst( self ):
305 """ 306 Get data necessary for a restart of the running calculation. 307 Locks, file handles and private data are *NOT* saved. 308 Override if necessary but call this method in child method. 309 310 @return: {..}, dict with 'pickleable' fields of master 311 @rtype: dict 312 """ 313 self.status.lock.acquire() 314 315 ## collect master parameters that can be pickled 316 rst = {} 317 for k,v in self.__dict__.items(): 318 319 skip = 0 320 for t in [ Thread, _RLock, _Condition, Status, file ]: 321 if isinstance( v, t ): 322 skip = 1 323 324 if str(k)[0] == '_': 325 skip = 1 326 327 if not skip: 328 rst[k] = copy.copy( v ) 329 330 rst['status_objects'] = copy.deepcopy( self.status.objects ) 331 rst['master_class'] = self.__class__ 332 333 self.status.lock.release() 334 335 return rst
336 337
338 - def saveRst( self, fname ):
339 """ 340 Pickle data necessary for a restart of the running calculation. 341 342 @param fname: file name 343 @type fname: str 344 """ 345 T.Dump( self.getRst(), fname )
346 347
348 - def setRst( self, rst_data ):
349 """ 350 Prepare this master for restart, called by restart(). 351 Override if necessary but call in child. 352 353 @param rst_data: {..}, parameters for master.__dict__ + some 354 special fields 355 @type rst_data: dict 356 357 @return: {..}, parameters for master.__dict__ without special fields 358 @rtype: dict 359 """ 360 self.__class__ = rst_data['master_class'] 361 self.status.objects = rst_data['status_objects'] 362 363 del rst_data['master_class'] 364 del rst_data['status_objects'] 365 366 return rst_data
367 368
369 -def restart( rst_data, **params ):
370 """ 371 @param rst_data: restart data 372 @type rst_data: dict 373 @param params: key-value pairs, for subclass implementations 374 @type params: {key:value} 375 """ 376 ## create empty master 377 master = TrackingJobMaster( **params ) 378 379 ## switch to required subclass and handle special information 380 rst_data = master.setRst( rst_data ) 381 382 ## set all remaining fields of master 383 master.__dict__.update( rst_data ) 384 385 return master
386