Package Biskit :: Module TrajFlexMaster
[hide private]
[frames] | no frames]

Source Code for Module Biskit.TrajFlexMaster

  1  ## 
  2  ## Biskit, a toolkit for the manipulation of macromolecular structures 
  3  ## Copyright (C) 2004-2005 Raik Gruenberg & Johan Leckner 
  4  ## 
  5  ## This program is free software; you can redistribute it and/or 
  6  ## modify it under the terms of the GNU General Public License as 
  7  ## published by the Free Software Foundation; either version 2 of the 
  8  ## License, or any later version. 
  9  ## 
 10  ## This program is distributed in the hope that it will be useful, 
 11  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 13  ## General Public License for more details. 
 14  ## 
 15  ## You find a copy of the GNU General Public License in the file 
 16  ## license.txt along with this program; if not, write to the Free 
 17  ## Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 18  ## 
 19  ## 
 20  ## 
 21  ## last $Author: graik $ 
 22  ## last $Date: 2006/11/12 20:24:12 $ 
 23  ## $Revision: 2.8 $ 
 24   
 25  """ 
 26  Parallize calculation of pairwise rmsd between the frames of a trajectory 
 27  """ 
 28  import Biskit.hosts as hosts 
 29   
 30  import Biskit.tools as T 
 31  import Biskit.settings as settings 
 32  import Biskit.mathUtils as mathUtils 
 33  from Biskit.Errors import BiskitError 
 34  from Biskit.LogFile import ErrLog, LogFile 
 35  from Biskit.EnsembleTraj import EnsembleTraj, traj2ensemble 
 36   
 37  import tempfile 
 38  import Numeric as N 
 39  import os, time 
 40   
 41  ## PVM imports 
 42  ## from Biskit.PVM.TrackingJobMaster import TrackingJobMaster 
 43  from Biskit.PVM import TrackingJobMaster 
 44   
 45   
46 -class FlexError( BiskitError ):
47 pass
48 49
50 -class TrajFlexMaster(TrackingJobMaster):
51 """ 52 Parallize calculation of pairwise rmsd between the frames of one or two 53 trajectories. 54 """ 55 56 slave_script = T.projectRoot() + '/Biskit/TrajFlexSlave.py' 57
58 - def __init__(self, traj1, traj2=None, aMask=None, hosts=hosts.cpus_all, 59 niceness=hosts.nice_dic, show_output=0, add_hosts=0, 60 log=None, slaveLog=None, verbose=1, 61 only_off_diagonal=1, only_cross_member=0):
62 """ 63 @param traj1: Trajectory or EnsembleTraj, traj1 and 2 must have the 64 same atom content. If only traj1 is given, the pairwise 65 rms is calculated between its frames. 66 @type traj1: Trajectory OR EnsembleTraj 67 @param traj2: see traj1 68 @type traj2: Trajectory OR EnsembleTraj 69 @param aMask: atom mask, consider only subset of atoms (default: all) 70 @type aMask: [0|1] 71 @param hosts: slave hosts to be used 72 (default: L{Biskit.hosts.cpus_all}) 73 @type hosts: [str] 74 @param niceness: { str:int, 'default':int }, nice value for each 75 host 76 @type niceness: dict 77 @param show_output: open xterm window for each slave (default: 0) 78 @type show_output: 0|1 79 @param add_hosts: add hosts to PVM before starting (default: 0) 80 @type add_hosts: 1|0 81 @param log: log file for master (default: None-> StdErr ) 82 @type log: LogFile 83 @param slaveLog: slave log (default: None->'TrajFlexSlave_errors.log') 84 @type slaveLog: LogFile 85 @param verbose: print progress infos (default: 1) 86 @type verbose: 0|1 87 @param only_off_diagonal: Don't calculate self-rms of frames. 88 Only considered for a single trajectory 89 (default: 1) 90 @type only_off_diagonal: 0|1 91 @param only_cross_member: Don't calculate rms between frames from 92 same member trajectory only considered for 93 a single trajectory(requires EnsembleTraj) 94 (default: 0) 95 @type only_cross_member: 0|1 96 """ 97 ## create temporary folder accessible to all slaves 98 self.outFolder = tempfile.mktemp('trajFlex_', 99 dir=settings.tempDirShared ) 100 os.mkdir( self.outFolder ) 101 102 self.log = log or ErrLog() 103 self.slaveLog = slaveLog or LogFile('TrajFlexSlave_errors.log') 104 self.verbose = verbose 105 self.hosts = hosts 106 107 self.traj_1 = traj1 108 self.traj_2 = traj2 109 110 self.only_off_diagonal = traj2 is None and only_off_diagonal 111 self.only_cross_member = traj2 is None and only_cross_member 112 113 self.trajMap = None 114 if traj2 is None: 115 self.trajMap = self.memberMap( traj1 ) 116 117 ## pickle chunks of coordinate frames 118 frame_files_1 = self.__dumpFrames( traj1, self.outFolder, 'traj1' ) 119 ## None if traj2 is None 120 frame_files_2 = self.__dumpFrames( traj2, self.outFolder, 'traj2' ) 121 122 ## assemble job dict 123 self.tasks = self.__taskDict( frame_files_1, frame_files_2) 124 125 chunk_size = 1 126 TrackingJobMaster.__init__(self, self.tasks, chunk_size, 127 hosts, niceness, self.slave_script, 128 show_output=show_output, verbose=verbose, 129 add_hosts=add_hosts)
130 131
132 - def getInitParameters(self, slave_tid):
133 """ 134 @param slave_tid: slave task id 135 @type slave_tid: int 136 137 @return: dictionary with init parameters 138 @rtype: {param:value} 139 """ 140 return {'ferror':self.slaveLog.fname, 141 'trajMap':self.trajMap, 142 'only_off_diagonal':self.only_off_diagonal, 143 'only_cross_member':self.only_cross_member }
144 145
146 - def __windowSize( self, n_per_node, n_nodes, n_frames ):
147 """ 148 @param n_per_node: how many chunks should be generated per node 149 @type n_per_node: int 150 @param n_nodes: number of slave nodes 151 @type n_nodes: int 152 @param n_frames: length of trajectory 153 @type n_frames: int 154 155 @return: calculate number of frames per chunk 156 @rtype: int 157 """ 158 r = int(round( n_frames * 1.0 / N.sqrt(n_per_node * n_nodes) )) 159 if r > 25: 160 return r 161 162 return 25
163 164
165 - def cleanup( self ):
166 """ 167 Tidy up. 168 """ 169 T.tryRemove( self.outFolder, verbose=self.verbose, tree=1 )
170 171
172 - def __getFrameWindows( self, traj, n_frames ):
173 """ 174 Divide frame indices into chunks. 175 176 @param n_frames: number of frames per chunk 177 @type n_frames: int 178 179 @return: list with start and stop frame index of each chunk 180 @rtype: [(int, int)] 181 """ 182 if traj is None: 183 return None 184 185 l = len( traj ) 186 ## number of windows 187 n, rest = l / n_frames, l % n_frames 188 if rest: 189 n += 1 190 191 ## get start and end frame for all windows 192 i_windows = [] 193 194 for i in range(0,n): 195 196 start, stop = i*n_frames, i*n_frames + n_frames 197 198 if stop > l: stop = l 199 if i == n-1: 200 stop = l 201 202 i_windows.append( (start, stop) ) 203 204 return i_windows
205 206
207 - def __taskDict( self, f_frames_1, f_frames_2 ):
208 """ 209 @param f_frames_1: file name of chunk 1 of frames 210 @type f_frames_1: {(int, int) : str} 211 @param f_frames_2: file name of chunk 2 of frames 212 @type f_frames_2: {(int, int) : str} 213 214 @return: { ((int, int),(int, int)) : (str, str) } 215 @rtype: {((int, int),(int, int)) : (str, str)} 216 """ 217 intra_traj = f_frames_2 is None 218 if intra_traj: 219 if self.verbose: 220 self.log.add('Intra-trajectory calculation requested.') 221 f_frames_2 = f_frames_1 222 223 if self.verbose: self.log.add_nobreak('setting up task list...') 224 225 i_windows = f_frames_1.keys() 226 j_windows = f_frames_2.keys() 227 228 ## assemble dict with all combinations of windows 229 tasks = {} 230 231 for i in range( len(i_windows) ): 232 for j in range( i * intra_traj, len(j_windows) ): 233 234 start_i, stop_i = i_windows[i] 235 start_j, stop_j = j_windows[j] 236 237 key = ((i_windows[i], j_windows[j])) 238 239 tasks[key] = ( f_frames_1[ i_windows[i] ], 240 f_frames_2[ j_windows[j] ] ) 241 242 if self.verbose: self.log.add('done') 243 244 return tasks
245 246
247 - def __dumpFrames(self, traj, outFolder, prefix ):
248 """ 249 @param traj: Trajectory 250 @type traj: Trajectory 251 @param outFolder: folder for pickled arrays 252 @type outFolder: str 253 @param prefix: file name prefix 254 @type prefix: str 255 @return: { (int,int) : str } OR None, if traj is None 256 @rtype: {(int,int) : str} 257 """ 258 if traj is None: 259 return None 260 261 if self.verbose: self.log.add_nobreak('dumping frame chunks...') 262 263 n_frames = self.__windowSize( 20, len( self.hosts ), len( traj ) ) 264 265 i_windows = self.__getFrameWindows( traj, n_frames ) 266 267 r = {} 268 269 for i in range( len(i_windows) ): 270 271 w = i_windows[i] 272 273 a = traj.frames[ w[0]:w[1] ] 274 f = outFolder + '/%s_%i_to_%i.dat' % ((prefix,) + w) 275 T.Dump( a, f ) 276 r[w] = f 277 278 if self.verbose and i % (len(i_windows)/50 + 1) == 0: 279 self.log.add_nobreak('#') 280 281 if self.verbose: self.log.add('done') 282 283 return r
284 285
286 - def memberMap(self, traj):
287 """ 288 Tell which traj frame belongs to which member trajectory. 289 290 @param traj: Trajectory 291 @type traj: Trajectory 292 293 @return: member index of each frame OR None if traj is 294 not a EnsembleTraj 295 @rtype: [ int ] OR None 296 """ 297 if not isinstance( traj, EnsembleTraj ): 298 return None 299 300 r = N.zeros( len(traj), 'i' ) 301 302 for i in range( traj.n_members ): 303 304 mi = traj.memberIndices( i ) 305 N.put( r, mi, i ) 306 307 return r.tolist()
308 309
310 - def getResult( self, mirror=0 ):
311 """ 312 Get result matrix ordered such as input trajectory. 313 314 @param mirror: mirror the matrix at diagonal (default: 1) 315 (only for intra-traj) 316 @type mirror: 1|0 317 318 @return: array( (n_frames, n_frames), 'f'), matrix of pairwise rms 319 @rtype: array 320 """ 321 if self.verbose: self.log.add_nobreak('assembling result matrix...') 322 323 intra_traj = self.traj_2 is None 324 325 n1 = n2 = len( self.traj_1 ) 326 if self.traj_2 is not None: 327 n2 = len( self.traj_2 ) 328 329 a = N.zeros( (n1,n2), 'f' ) 330 331 if self.verbose: self.log.add_nobreak('#') 332 333 for key, value in self.result.items(): 334 i_start, i_stop = key[0] 335 j_start, j_stop = key[1] 336 337 window = N.reshape( value, (i_stop-i_start, j_stop-j_start) ) 338 window = window.astype('f') 339 340 a[i_start:i_stop, j_start:j_stop] = window 341 342 if self.verbose: self.log.add_nobreak('#') 343 344 if intra_traj: 345 for i in range( N.shape(a)[0] ): 346 for j in range( i, N.shape(a)[1] ): 347 if a[j,i] == 0: 348 a[j,i] = a[i,j] 349 else: 350 a[i,j] = a[j,i] 351 352 if self.verbose: self.log.add_nobreak('#') 353 354 if intra_traj and not mirror: 355 for i in range( N.shape(a)[0] ): 356 for j in range( i, N.shape(a)[1] ): 357 a[j,i] = 0. 358 359 if self.verbose: self.log.add('done') 360 361 return a
362 363
364 - def rmsMatrixByMember( self, mirror=0, step=1 ):
365 """ 366 Get result matrix ordered first by member then by time. (requires 367 EnsembleTraj) 368 369 @param mirror: mirror matrix at diagonal (only for intra-traj. rms) 370 (default: 0) 371 @type mirror: 0|1 372 373 @param step: take only every step frame [1] 374 @type step: int 375 """ 376 intra_traj = self.traj_2 is None 377 378 m = self.getResult( mirror=intra_traj ) 379 380 i1 = i2 = self.traj_1.argsortMember( step=step ) 381 382 if self.traj_2 is not None: 383 i2 = self.traj_2.argsortMember( step=step ) 384 385 a = N.take( m, i1, 0 ) 386 a = N.take( a, i2, 1 ) 387 388 if intra_traj and not mirror: 389 for i in range( N.shape(a)[0] ): 390 for j in range( i, N.shape(a)[1] ): 391 a[j,i] = 0. 392 393 return a
394 395
396 - def rmsList( self ):
397 """ 398 @return: list of all calculated pairwise rms values 399 @rtype: [float] 400 401 @raise FlexError: if there are no results yet 402 """ 403 r = [] 404 for v in self.result.values(): 405 r.extend( v ) 406 407 if not r: 408 raise FlexError, "No results yet." 409 410 return r
411 412
413 - def averageRms( self ):
414 """ 415 @return: average pairwise rmsd and it's standard deviation 416 @rtype: (float, float) 417 418 @raise FlexError: if there are no results yet 419 """ 420 r = self.rmsList() 421 return N.average(r), mathUtils.SD(r)
422 423 424 ############# 425 ## TESTING 426 ############# 427
428 -class Test:
429 """ 430 Test class 431 """ 432
433 - def run( self, local=0 ):
434 """ 435 run function test 436 437 @param local: transfer local variables to global and perform 438 other tasks only when run locally 439 @type local: 1|0 440 441 @return: 1 442 @rtype: int 443 """ 444 from Biskit.MatrixPlot import MatrixPlot 445 from RandomArray import random 446 447 traj_1 = T.Load( T.testRoot() + '/lig_pcr_00/traj.dat' ) 448 traj_1 = traj2ensemble( traj_1 ) 449 450 ## create fake second trajectory by adding 451 ## increasing noise to first 452 frames = [] 453 for i in range( len( traj_1 ) ): 454 f = traj_1.frames[i] 455 d = N.zeros( N.shape( f ), 'f') 456 if i > 0: 457 d = random( N.shape( f ) ) * ((i / 10) + 1) 458 frames += [f + d] 459 460 traj_2 = traj_1.clone() 461 traj_2.frames = frames 462 463 master = TrajFlexMaster( traj_1, traj_2, 464 hosts=hosts.cpus_all, 465 show_output=local, 466 add_hosts=1, 467 log=None, 468 slaveLog=None, 469 verbose=local, 470 only_cross_member=0 ) 471 472 r = master.calculateResult( mirror=0 ) 473 474 if local: 475 p = MatrixPlot( r, palette='plasma2', legend=1 ) 476 p.show() 477 globals().update( locals() ) 478 479 return 1
480 481
482 - def expected_result( self ):
483 """ 484 Precalculated result to check for consistent performance. 485 486 @return: 1 487 @rtype: int 488 """ 489 return 1
490 491 492 493 if __name__ == '__main__': 494 495 test = Test() 496 497 assert test.run( local=1 ) == test.expected_result() 498