Coverage for /builds/ase/ase/ase/ga/multiprocessingrun.py : 29.63%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1""" Class for handling several simultaneous jobs.
2The class has been tested on Niflheim-opteron4.
3"""
4from multiprocessing import Pool
5import time
6from ase.io import write, read
9class MultiprocessingRun:
10 """Class that allows for the simultaneous relaxation of
11 several candidates on a cluster. Best used if each individual
12 calculation is too small for using a queueing system.
14 Parameters:
16 data_connection: DataConnection object.
18 tmp_folder: Folder for temporary files.
20 n_simul: The number of simultaneous relaxations.
22 relax_function: The relaxation function. This needs to return
23 the filename of the relaxed structure.
24 """
26 def __init__(self, data_connection, relax_function,
27 tmp_folder, n_simul=None):
28 self.dc = data_connection
29 self.pool = Pool(n_simul)
30 self.relax_function = relax_function
31 self.tmp_folder = tmp_folder
32 self.results = []
34 def relax(self, a):
35 """Relax the atoms object a by submitting the relaxation
36 to the pool of cpus."""
37 self.dc.mark_as_queued(a)
38 fname = '{0}/cand{1}.traj'.format(self.tmp_folder,
39 a.info['confid'])
40 write(fname, a)
41 self.results.append(self.pool.apply_async(self.relax_function,
42 [fname]))
43 self._cleanup()
45 def _cleanup(self):
46 for r in self.results:
47 if r.ready() and r.successful():
48 fname = r.get()
49 a = read(fname)
50 self.dc.add_relaxed_step(a)
51 self.results.remove(r)
53 def finish_all(self):
54 """Checks that all calculations are finished, if not
55 wait and check again. Return when all are finished."""
56 while len(self.results) > 0:
57 self._cleanup()
58 time.sleep(2.)