Package Biskit :: Package PVM :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module Biskit.PVM.dispatcher

  1  ## 
  2  ## Biskit, a toolkit for the manipulation of macromolecular structures 
  3  ## Copyright (C) 2002-2005 Wolfgang Rieping 
  4  ## 
  5  ## This program is free software; you can redistribute it and/or 
  6  ## modify it under the terms of the GNU General Public License as 
  7  ## published by the Free Software Foundation; either version 2 of the 
  8  ## License, or any later version. 
  9  ## 
 10  ## This program is distributed in the hope that it will be useful, 
 11  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 13  ## General Public License for more details. 
 14  ## 
 15  ## You find a copy of the GNU General Public License in the file 
 16  ## license.txt along with this program; if not, write to the Free 
 17  ## Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 18   
 19  ## Contributions: Wolfgang Rieping, Raik Gruenberg 
 20  ## $Revision: 2.4 $ 
 21  ## last $Date: 2006/12/18 15:07:22 $ 
 22  ## last $Author: graik $ 
 23   
 24  """ 
 25  Manage Master/Slave tasks. 
 26  """ 
 27   
 28   
 29  from PVMThread import PVMMasterSlave 
 30  import Biskit.settings as settings 
 31  from Status import Status 
 32  from Biskit.PVM import pvmTools 
 33  import pvm, socket 
 34   
 35  MSG_JOB_START = 1 
 36  MSG_JOB_DONE = 2 
 37   
38 -class JobMaster(PVMMasterSlave):
39
40 - def __init__(self, data, chunk_size, hosts, niceness, slave_script, 41 show_output = 0, result = None, redistribute=1, verbose=1 ):
42 """ 43 @param data: dict of items to be proessed {id:object}. 44 @type data: dict 45 @param chunk_size: number of items that are processed by a job 46 @type chunk_size: int 47 @param hosts: list of host-names 48 @type hosts: [str] 49 @param niceness: nice dictionary [host-name: niceness] 50 @type niceness: dict 51 @param slave_script: absolute path to slave-script 52 @type slave_script: str 53 @param result: items which have already been processed 54 (ie. they are contained in result) are not 55 processed again. 56 @type result: dict 57 @param redistribute: at the end, send same job out several times 58 (default: 1) 59 @type redistribute: 1|0 60 @param verbose: verbosity level (default: 1) 61 @type verbose: 1|0 62 """ 63 PVMMasterSlave.__init__(self, verbose=verbose) 64 65 ## change names of multiple hosts 66 d = {} 67 68 for host in hosts: 69 if host in d: 70 d[host] += 1 71 else: 72 d[host] = 1 73 74 unique_list = [] 75 76 for host, number in d.items(): 77 if number > 1: 78 79 for i in range(number): 80 nickname = host + '_%d' % i 81 82 d = {'host': host, 83 'nickname': nickname} 84 85 unique_list.append(d) 86 else: 87 d = {'host': host, 88 'nickname': host} 89 90 unique_list.append(d) 91 92 self.hosts = unique_list 93 self.niceness = niceness 94 self.data = data 95 self.slave_script = slave_script 96 self.chunk_size = chunk_size 97 98 self.current_pos = 0 99 self.show_output = show_output 100 101 self.verbose = verbose 102 103 if result is None: 104 result = {} 105 106 self.result = result 107 108 ## set-up status for results 109 items = [] 110 111 for key in data.keys(): 112 if not key in self.result: 113 items.append(key) 114 115 self.status = Status(items, redistribute=redistribute ) 116 117 self.__finished = 0 118 119 if verbose: print 'Processing %d items ...' % len(items)
120 121
122 - def start(self):
123 """ 124 Start slave job 125 """ 126 self.finished = 0 127 128 PVMMasterSlave.start(self) 129 130 self.startMessageLoop() 131 self.spawnAll(self.niceness, self.show_output)
132 133
134 - def getInitParameters(self, slave_tid):
135 """ 136 Override to collect slave initiation parameters. 137 """ 138 return None
139 140
141 - def done(self):
142 """ 143 Override to do something after last job result has been received. 144 """ 145 print "Done"
146 147
148 - def finish( self ):
149 """ 150 This method is called one time, after the master has 151 received the last missing result. 152 """ 153 self.done()
154 155
156 - def __finish(self):
157 """ 158 Call finish(); but only call it once. 159 """ 160 self.status.lock.acquire() 161 162 if not self.__finished: 163 self.finish() 164 self.__finished = 1 165 166 self.status.lock.release()
167 168
169 - def bindMessages(self, slave_tid):
170 """ 171 @param slave_tid: slave task tid 172 @type slave_tid: int 173 """ 174 self.bind(MSG_JOB_DONE, slave_tid, self.__job_done)
175 176
177 - def spawn(self, host, nickname, niceness, show_output = 0):
178 """ 179 Spawn a job. 180 181 @param host: host name 182 @type host: str 183 @param nickname: host nickname (for uniqueness, i.e more than 184 on job vill be run on a multiple cpu machine) 185 @type nickname: str 186 @param niceness: nice dictionary [host-name: niceness] 187 @type niceness: int 188 189 @return: slave task tid 190 @rtype: int 191 """ 192 if show_output: 193 display = socket.gethostname() + ':0.0' 194 195 command = settings.xterm_bin 196 argv = ['-title', str(host), '-geometry', '30x10', 197 '-display', display, '-e', 198 settings.python_bin, '-i', self.slave_script, 199 str(niceness)] 200 else: 201 command = settings.python_bin 202 argv = ['-i', self.slave_script, str(niceness)] 203 204 args = (command, argv, pvm.spawnOpts['TaskHost'], host, 1) 205 206 return PVMMasterSlave.spawn(self, args, nickname)
207 208
209 - def spawnAll(self, niceness, show_output = 0):
210 """ 211 Spawn many jobs. 212 213 @param niceness: nice dictionary [host-name: niceness] 214 @type niceness: dict 215 """ 216 self.slaves = {} 217 218 for d in self.hosts: 219 host = d['host'] 220 nickname = d['nickname'] 221 222 try: 223 nice = niceness[host] 224 except: 225 nice = niceness.get('default', 0) 226 227 slave_tid = self.spawn(host, nickname, nice, show_output) 228 229 if slave_tid <= 0: 230 print 'error spawning', host 231 try: 232 print '\t', pvmTools.pvmerrors[ slave_tid ] 233 except Exception, error: 234 print 'unknown error', error 235 236 else: 237 self.bindMessages(slave_tid) 238 self.slaves[slave_tid] = d 239 if self.verbose: print slave_tid, nickname, 'spawned.'
240 241
242 - def initializationDone(self, slave_tid):
243 """ 244 is called by a slave that has been initialized and 245 is now ready for start-up. 246 247 @param slave_tid: slave task tid 248 @type slave_tid: int 249 """ 250 ## start processing 251 self.__start_job(slave_tid)
252 253
254 - def start_job(self, slave_tid):
255 """ 256 Called when a new job is about to be started. Override to add 257 other startup tasks than the default, see L{__start_job} 258 259 @param slave_tid: slave task tid 260 @type slave_tid: int 261 """ 262 pass
263 264
265 - def get_slave_chunk(self, data_keys ):
266 """ 267 Assemble task dictionary that is send to the slave for a single job. 268 Override this, if the values of self.data are to be changed/created on 269 the fly. 270 271 @param data_keys: subset of keys to self.data 272 @type data_keys: [any] 273 274 @return: dict mapping the data keys to the actual data values 275 @rtype: {any:any} 276 """ 277 chunk = {} 278 279 for id in data_keys: 280 chunk[id] = self.data[id] 281 282 return chunk
283 284
285 - def __start_job(self, slave_tid):
286 """ 287 Tasks performed befor the job is launched. 288 289 @param slave_tid: slave task tid 290 @type slave_tid: int 291 """ 292 self.start_job(slave_tid) 293 294 ## get items that have not been processed 295 queue, n_left = self.status.next_chunk( self.chunk_size) 296 297 if not queue: 298 return 299 300 nickname = self.slaves[slave_tid]['nickname'] 301 if self.verbose: 302 print '%d (%s) %d items left'%(slave_tid, nickname, n_left) 303 304 chunk = self.get_slave_chunk( queue ) 305 306 self.send(slave_tid, MSG_JOB_START, (chunk,))
307 308
309 - def is_valid_slave(self, slave_tid):
310 """ 311 Checked each time before a new job is given to a slave, if 0, the 312 job is given to another slave. Override. 313 314 @param slave_tid: slave task tid 315 @type slave_tid: int 316 """ 317 return 1
318 319
320 - def job_done(self, slave_tid, result):
321 """ 322 Override to add tasks to be preformend when the job is done 323 (other than the default, see L{__job_done}). 324 325 @param slave_tid: slave task tid 326 @type slave_tid: int 327 """ 328 pass
329 330
331 - def __job_done(self, slave_tid, result):
332 """ 333 Tasks that are preformed when the job is done. 334 335 @param slave_tid: slave task tid 336 @type slave_tid: int 337 @param result: slave result dictionary 338 @type result: dict 339 """ 340 ## synchronize on internal lock of Status to avoid the distribution 341 ## of new items while processed ones are not yet marked "finished" 342 self.status.lock.acquire() 343 344 self.job_done(slave_tid, result) 345 346 self.result.update(result) 347 348 ## mark result as finished. 349 for item in result.keys(): 350 self.status.deactivate(item) 351 352 self.status.lock.release() 353 354 ## once again 355 if not self.is_valid_slave(slave_tid): 356 return 357 358 if not self.status.done(): 359 self.__start_job(slave_tid) 360 else: 361 self.__finish()
362 363
364 -class JobSlave(PVMMasterSlave):
365
366 - def __init__(self):
367 """ 368 """ 369 PVMMasterSlave.__init__(self) 370 self.setMessageLoopDelay(0.5)
371 372
373 - def start(self):
374 """ 375 """ 376 PVMMasterSlave.start(self) 377 378 self.bindMessages() 379 self.startMessageLoop()
380 381
382 - def bindMessages(self):
383 """ 384 """ 385 parent = self.getParent() 386 self.bind(MSG_JOB_START, parent, self.__go)
387 388
389 - def initialize(self, params):
390 """ 391 Automatically invoked by parent after slave's 392 message-loop is up. Override to use. 393 """ 394 pass
395 396
397 - def go(self, *args, **kw):
398 """ 399 Must be overridden in order to do the actual work. 400 Result should be returned. 401 Default tasks are defined in L{__go}. 402 403 @param args: arguments 404 @type args: (any) 405 @param kw: dictionary with key=value pairs 406 @type kw: {key:value} 407 """ 408 pass
409 410
411 - def __go(self, *args, **kw):
412 """ 413 Startup tasks. 414 415 @param args: arguments 416 @type args: (any) 417 @param kw: dictionary with key=value pairs 418 @type kw: {key:value} 419 """ 420 result = self.go(*args, **kw) 421 422 ## send result back to parent 423 my_tid = self.getTID() 424 425 self.send(self.getParent(), MSG_JOB_DONE, (my_tid, result))
426 427 428 if __name__ == '__main__': 429 430 import os, sys 431 432 if len(sys.argv) == 2: 433 434 niceness = int(sys.argv[1]) 435 os.nice(niceness) 436 437 slave = JobSlave() 438 slave.start() 439