Package Biskit :: Package PVM :: Module PVMThread
[hide private]
[frames] | no frames]

Source Code for Module Biskit.PVM.PVMThread

  1  ## 
  2  ## Biskit, a toolkit for the manipulation of macromolecular structures 
  3  ## Copyright (C) 2002-2005 Wolfgang Rieping 
  4  ## 
  5  ## This program is free software; you can redistribute it and/or 
  6  ## modify it under the terms of the GNU General Public License as 
  7  ## published by the Free Software Foundation; either version 2 of the 
  8  ## License, or any later version. 
  9  ## 
 10  ## This program is distributed in the hope that it will be useful, 
 11  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 13  ## General Public License for more details. 
 14  ## 
 15  ## You find a copy of the GNU General Public License in the file 
 16  ## license.txt along with this program; if not, write to the Free 
 17  ## Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 18   
 19  ## last $Author: leckner $ 
 20  ## last $Date: 2006/09/13 10:28:14 $ 
 21  ## $Revision: 2.3 $ 
 22   
 23  """ 
 24  Binding incoming pvm-messages to methods. 
 25  """ 
 26   
 27  from Biskit.PVM import pvm 
 28  from threading import Thread 
 29   
 30  MSG_PING = 999999 
 31  MSG_MESSAGELOOP_UP = 999998 
 32  MSG_INITIALIZE = 999997 
 33  MSG_INITIALIZATION_DONE = 999996 
 34  MSG_EXIT = 999995 
 35   
36 -class Logfile:
37
38 - def __init__(self, filename = None):
39 40 if filename is None: 41 42 import os 43 44 filename = '/tmp/pvm_thread.log.' + str(os.getpid()) 45 46 self.filename = filename 47 self.lines = []
48 49
50 - def post(self, message):
51 52 import time 53 54 self.lines.append('\n' + str(time.time()) + ':') 55 self.lines.append(message) 56 57 file = open(self.filename, 'w') 58 file.write('\n'.join(self.lines)) 59 file.close()
60 61
62 - def write(self):
63 print '\n'.join(self.lines)
64 65
66 - def rm(self):
67 import os 68 if os.path.exists(self.filename): 69 os.unlink(self.filename)
70 71 72 ## This is the old implementation of the PVMThread class 73
74 -class PVMThread(Thread):
75 """ 76 a simple class binding incoming pvm-messages to methods. 77 """ 78 79
80 - def __init__(self, log = None):
81 from threading import Event 82 83 Thread.__init__(self) 84 85 self.__mytid = pvm.mytid() 86 87 try: 88 ## get process ID of parent 89 self.__parent = pvm.parent() 90 except: 91 self.__parent = None 92 93 self.__bindings = {} 94 self.__stop = 0 95 self.__tasks = {} 96 97 self.setMessageLoopDelay(0.1) 98 self.__loopEvent = Event() 99 100 if log: self.__log = Logfile() 101 else: self.__log = None 102 103 ## add ping-mechanism 104 105 self.setPingTimeout(5.) 106 self.bind(MSG_PING, -1, self._ping)
107 108
109 - def getTID(self):
110 return self.__mytid
111 112
113 - def getParent(self):
114 """ 115 @return: process ID of parent 116 @rtype: int 117 """ 118 return self.__parent
119 120
121 - def getBindings(self):
122 return self.__bindings
123 124
125 - def bind(self, message_tag, tid, method):
126 self.getBindings()[(tid, message_tag)] = method
127 128
129 - def unbind(self, tid_message_tag):
130 del self.getBindings()[tid_message_tag]
131 132
133 - def setMessageLoopDelay(self, delay):
134 """ 135 time is in seconds 136 """ 137 self.__delay = delay
138 139
140 - def getMessageLoopDelay(self):
141 return self.__delay
142 143
144 - def stopMessageLoop(self):
145 self.__loopEvent.clear()
146 147
148 - def startMessageLoop(self):
149 self.__loopEvent.set()
150 151
152 - def messageLoopIsStopped(self):
153 return not self.__loopEvent.isSet()
154 155
156 - def setPingTimeout(self, t):
157 self.__pingTimeout = t
158 159
160 - def getPingTimeout(self):
161 return self.__pingTimeout
162 163
164 - def stop(self):
165 self.__stop = 1 166 ## otherwise this would pause the message-loop 167 self.startMessageLoop()
168 169
170 - def isStopped(self):
171 return self.__stop
172 173
174 - def getTasks(self):
175 return self.__tasks
176 177
178 - def nicknameFromTID(self, tid):
179 for nickname, TID in self.getTasks().items(): 180 if TID == tid: 181 return nickname 182 183 raise NameError, 'tid not known'
184 185
186 - def spawn(self, pvm_task, nickname = None):
187 child_tid = pvm.spawn(*pvm_task)[0] 188 189 if child_tid > 0: 190 191 if nickname is None: 192 nickname = child_tid 193 194 self.__tasks[nickname] = child_tid 195 196 return child_tid
197 198
199 - def send_primitive(self, tid, msg_tag, value):
200 result = pvm.pack_and_send(tid, msg_tag, value) 201 202 self.post_message_sent(msg_tag, tid, value)
203 204
205 - def send(self, task, msg_tag, value = None):
206 """ 207 if 'task' is a tuple, msg_tag is send to all the tids 208 given in that list. 209 """ 210 from time import time 211 212 if not type(task) == type(()): 213 task = (task,) 214 215 hash = self.getTasks() 216 217 time_stamp = time() 218 219 for t in task: 220 ## if 'task' is a nickname, try to convert it into 221 ## a valid tid 222 223 try: 224 tid = hash[t] 225 except: 226 tid = t 227 228 self.send_primitive(tid, msg_tag, (time_stamp, value))
229 230
231 - def sendToAll(self, msg_tag, value):
232 from time import time 233 234 tids = tuple(self.getTasks().values()) 235 236 time_stamp = time() 237 238 for tid in tids: 239 self.send_primitive(tid, msg_tag, (time_stamp, value))
240
241 - def run(self):
242 import time 243 from Numeric import argsort 244 245 while not self.isStopped(): 246 247 bindings = self.getBindings() 248 249 ## loop through all messages and check for 250 ## incoming events 251 252 incoming = {} 253 254 for tid, message in bindings.keys(): 255 256 if pvm.probe(tid, message): 257 258 pvm.recv(tid, message) 259 260 ## parameters must be tuple 261 262 parameters = pvm.unpack() 263 264 ## self.post_message_received(message, tid, parameters) 265 266 value = (message, parameters[0], parameters[1]) 267 268 try: 269 incoming[tid].append(value) 270 except: 271 incoming[tid] = [value] 272 273 ## for every incoming message call 274 ## bound method 275 276 ## TODO: do this in a specific order! 277 ## probably pvm supports a sort of time-stamp or so. 278 279 for tid, values in incoming.items(): 280 281 time_stamps = map(lambda v: v[1], values) 282 indices = argsort(time_stamps) 283 284 for i in indices: 285 286 message = values[i][0] 287 parameters = values[i][2] 288 289 ## self.post_execute_method(message, tid, parameters) 290 291 if parameters is None: 292 bindings[(tid, message)]() 293 else: 294 bindings[(tid, message)](*parameters) 295 296 ## wait some time 297 298 time.sleep(self.getMessageLoopDelay()) 299 300 ## if message-loop is stopped, wait until 301 ## it is continued 302 303 self.__loopEvent.wait()
304 305
306 - def ping(self, nickname):
307 self.send(nickname, MSG_PING, (self.getTID(),)) 308 309 tid = self.getTasks()[nickname] 310 311 result = pvm.trecv(self.getPingTimeout(), MSG_PING, tid) 312 313 try: 314 pvm.unpack() 315 except: 316 pass 317 318 if result <= 0: 319 return 0 320 else: 321 return 1
322 323
324 - def _ping(self, sender_tid):
325 self.send(sender_tid, MSG_PING, None)
326 327 328 ## additional functionality for better debuging 329
330 - def log(self):
331 if self.__log: 332 self.__log.write()
333 334
335 - def post(self, message):
336 if self.__log: 337 self.__log.post(message)
338 339
340 - def post_message_received(self, msg_tag, tid, params):
341 log_msg = 'PVMThread: received and unpacked: ' + \ 342 'msg_tag = %d, tid = %d\n' %(msg_tag, tid) 343 log_msg += '[params = %s]' %str(params) 344 self.post(log_msg)
345 346
347 - def post_message_sent(self, msg_tag, tid, params):
348 log_msg = 'PVMThread: message sent: ' + \ 349 'msg_tag = %d, tid = %d\n' %(msg_tag, tid) 350 log_msg += '[params = %s]' %str(params) 351 self.post(log_msg)
352 353
354 - def post_execute_method(self, msg_tag, tid, params):
355 log_msg = 'PVMThread: execute method bound to: ' + \ 356 'msg_tag = %d, tid = %d\n' %(msg_tag, tid) 357 log_msg += '[params = %s]' %str(params) 358 self.post(log_msg)
359
360 - def rm_log(self):
361 if self.__log: 362 self.__log.rm()
363 364
365 -class PVMMasterSlave(PVMThread):
366
367 - def __init__(self, verbose=1, *arg, **kw):
368 PVMThread.__init__(self, *arg, **kw) 369 370 self.verbose= verbose 371 372 ## bind messages 373 374 parent = self.getParent() 375 376 if parent is not None: 377 378 self.bind(MSG_INITIALIZE, parent, self.__initialize) 379 self.bind(MSG_EXIT, parent, self.exit)
380 381
382 - def spawn(self, pvm_task, nickname = None):
383 child_tid = PVMThread.spawn(self, pvm_task, nickname) 384 385 ## bind messages being nessecary for automatic 386 ## slave start-up determination 387 388 self.bind(MSG_MESSAGELOOP_UP, child_tid, self.messageLoopIsUp) 389 390 ## this message could also be bound later, before we send 391 ## the MSG_INITIALIZE message. 392 393 self.bind(MSG_INITIALIZATION_DONE, child_tid, self.initializationDone) 394 395 return child_tid
396 397
398 - def startMessageLoop(self):
399 PVMThread.startMessageLoop(self) 400 401 ## send message to parent to indicate that 402 ## we are now ready for message transfer. 403 404 parent = self.getParent() 405 406 if parent is not None: 407 self.send(parent, MSG_MESSAGELOOP_UP, (self.getTID(),))
408 409
410 - def messageLoopIsUp(self, slave):
411 """ 412 called by slave when its messge-loop has been started 413 """ 414 ## initialize slave 415 ## get init parameters for slave 416 417 init_params = self.getInitParameters(slave) 418 419 if init_params is not None: 420 init_params = (init_params,) 421 422 self.send(slave, MSG_INITIALIZE, init_params)
423 424
425 - def getInitParameters(self, slave_tid):
426 return None
427 428
429 - def initialize(self, parameters):
430 pass
431 432
433 - def __initialize(self, parameters):
434 435 self.initialize(parameters) 436 437 ## send message to parent, that are initialized properly 438 parent = self.getParent() 439 440 if parent is not None: 441 self.send(parent, MSG_INITIALIZATION_DONE, (self.getTID(),))
442 443
444 - def initializationDone(self, slave_tid):
445 pass
446 447
448 - def exit(self):
449 parent = self.getParent() 450 451 ## if we are the master, kill slaves (but don't send signal to self) 452 if parent is None: 453 454 for nickname, tid in self.getTasks().items(): 455 456 if tid != self.getTID(): 457 458 self.send(nickname, MSG_EXIT, None) 459 if self.verbose: print nickname, 'shutting down...' 460 else: 461 pvm.kill(self.getTID()) 462 463 ## stop message-loop 464 465 self.stop()
466