Coverage for /builds/ase/ase/ase/optimize/optimize.py : 92.31%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Structure optimization. """
3import collections.abc
4import time
5from math import sqrt
6from os.path import isfile
8from ase.calculators.calculator import PropertyNotImplementedError
9from ase.io.jsonio import read_json, write_json
10from ase.io.trajectory import Trajectory
11from ase.parallel import barrier, world
12from ase.utils import IOContext
15class RestartError(RuntimeError):
16 pass
19class Dynamics(IOContext):
20 """Base-class for all MD and structure optimization classes."""
22 def __init__(
23 self, atoms, logfile, trajectory, append_trajectory=False, master=None
24 ):
25 """Dynamics object.
27 Parameters:
29 atoms: Atoms object
30 The Atoms object to operate on.
32 logfile: file object or str
33 If *logfile* is a string, a file with that name will be opened.
34 Use '-' for stdout.
36 trajectory: Trajectory object or str
37 Attach trajectory object. If *trajectory* is a string a
38 Trajectory will be constructed. Use *None* for no
39 trajectory.
41 append_trajectory: boolean
42 Defaults to False, which causes the trajectory file to be
43 overwriten each time the dynamics is restarted from scratch.
44 If True, the new structures are appended to the trajectory
45 file instead.
47 master: boolean
48 Defaults to None, which causes only rank 0 to save files. If
49 set to true, this rank will save files.
50 """
52 self.atoms = atoms
53 self.logfile = self.openfile(logfile, mode='a', comm=world)
54 self.observers = []
55 self.nsteps = 0
56 # maximum number of steps placeholder with maxint
57 self.max_steps = 100000000
59 if trajectory is not None:
60 if isinstance(trajectory, str):
61 mode = "a" if append_trajectory else "w"
62 trajectory = self.closelater(Trajectory(
63 trajectory, mode=mode, master=master
64 ))
65 self.attach(trajectory, atoms=atoms)
67 self.trajectory = trajectory
69 def get_number_of_steps(self):
70 return self.nsteps
72 def insert_observer(
73 self, function, position=0, interval=1, *args, **kwargs
74 ):
75 """Insert an observer."""
76 if not isinstance(function, collections.abc.Callable):
77 function = function.write
78 self.observers.insert(position, (function, interval, args, kwargs))
80 def attach(self, function, interval=1, *args, **kwargs):
81 """Attach callback function.
83 If *interval > 0*, at every *interval* steps, call *function* with
84 arguments *args* and keyword arguments *kwargs*.
86 If *interval <= 0*, after step *interval*, call *function* with
87 arguments *args* and keyword arguments *kwargs*. This is
88 currently zero indexed."""
90 if hasattr(function, "set_description"):
91 d = self.todict()
92 d.update(interval=interval)
93 function.set_description(d)
94 if not hasattr(function, "__call__"):
95 function = function.write
96 self.observers.append((function, interval, args, kwargs))
98 def call_observers(self):
99 for function, interval, args, kwargs in self.observers:
100 call = False
101 # Call every interval iterations
102 if interval > 0:
103 if (self.nsteps % interval) == 0:
104 call = True
105 # Call only on iteration interval
106 elif interval <= 0:
107 if self.nsteps == abs(interval):
108 call = True
109 if call:
110 function(*args, **kwargs)
112 def irun(self):
113 """Run dynamics algorithm as generator. This allows, e.g.,
114 to easily run two optimizers or MD thermostats at the same time.
116 Examples:
117 >>> opt1 = BFGS(atoms)
118 >>> opt2 = BFGS(StrainFilter(atoms)).irun()
119 >>> for _ in opt2:
120 >>> opt1.run()
121 """
123 # compute initial structure and log the first step
124 self.atoms.get_forces()
126 # yield the first time to inspect before logging
127 yield False
129 if self.nsteps == 0:
130 self.log()
131 self.call_observers()
133 # run the algorithm until converged or max_steps reached
134 while not self.converged() and self.nsteps < self.max_steps:
136 # compute the next step
137 self.step()
138 self.nsteps += 1
140 # let the user inspect the step and change things before logging
141 # and predicting the next step
142 yield False
144 # log the step
145 self.log()
146 self.call_observers()
148 # finally check if algorithm was converged
149 yield self.converged()
151 def run(self):
152 """Run dynamics algorithm.
154 This method will return when the forces on all individual
155 atoms are less than *fmax* or when the number of steps exceeds
156 *steps*."""
158 for converged in Dynamics.irun(self):
159 pass
160 return converged
162 def converged(self, *args):
163 """" a dummy function as placeholder for a real criterion, e.g. in
164 Optimizer """
165 return False
167 def log(self, *args):
168 """ a dummy function as placeholder for a real logger, e.g. in
169 Optimizer """
170 return True
172 def step(self):
173 """this needs to be implemented by subclasses"""
174 raise RuntimeError("step not implemented.")
177class Optimizer(Dynamics):
178 """Base-class for all structure optimization classes."""
180 # default maxstep for all optimizers
181 defaults = {'maxstep': 0.2}
183 def __init__(
184 self,
185 atoms,
186 restart,
187 logfile,
188 trajectory,
189 master=None,
190 append_trajectory=False,
191 force_consistent=False,
192 ):
193 """Structure optimizer object.
195 Parameters:
197 atoms: Atoms object
198 The Atoms object to relax.
200 restart: str
201 Filename for restart file. Default value is *None*.
203 logfile: file object or str
204 If *logfile* is a string, a file with that name will be opened.
205 Use '-' for stdout.
207 trajectory: Trajectory object or str
208 Attach trajectory object. If *trajectory* is a string a
209 Trajectory will be constructed. Use *None* for no
210 trajectory.
212 master: boolean
213 Defaults to None, which causes only rank 0 to save files. If
214 set to true, this rank will save files.
216 append_trajectory: boolean
217 Appended to the trajectory file instead of overwriting it.
219 force_consistent: boolean or None
220 Use force-consistent energy calls (as opposed to the energy
221 extrapolated to 0 K). If force_consistent=None, uses
222 force-consistent energies if available in the calculator, but
223 falls back to force_consistent=False if not.
224 """
225 Dynamics.__init__(
226 self,
227 atoms,
228 logfile,
229 trajectory,
230 append_trajectory=append_trajectory,
231 master=master,
232 )
234 self.force_consistent = force_consistent
235 if self.force_consistent is None:
236 self.set_force_consistent()
238 self.restart = restart
240 # initialize attribute
241 self.fmax = None
243 if restart is None or not isfile(restart):
244 self.initialize()
245 else:
246 self.read()
247 barrier()
249 def todict(self):
250 description = {
251 "type": "optimization",
252 "optimizer": self.__class__.__name__,
253 }
254 # add custom attributes from subclasses
255 for attr in ('maxstep', 'alpha', 'max_steps', 'restart'):
256 if hasattr(self, attr):
257 description.update({attr: getattr(self, attr)})
258 return description
260 def initialize(self):
261 pass
263 def irun(self, fmax=0.05, steps=None):
264 """ call Dynamics.irun and keep track of fmax"""
265 self.fmax = fmax
266 if steps:
267 self.max_steps = steps
268 return Dynamics.irun(self)
270 def run(self, fmax=0.05, steps=None):
271 """ call Dynamics.run and keep track of fmax"""
272 self.fmax = fmax
273 if steps:
274 self.max_steps = steps
275 return Dynamics.run(self)
277 def converged(self, forces=None):
278 """Did the optimization converge?"""
279 if forces is None:
280 forces = self.atoms.get_forces()
281 if hasattr(self.atoms, "get_curvature"):
282 return (forces ** 2).sum(
283 axis=1
284 ).max() < self.fmax ** 2 and self.atoms.get_curvature() < 0.0
285 return (forces ** 2).sum(axis=1).max() < self.fmax ** 2
287 def log(self, forces=None):
288 if forces is None:
289 forces = self.atoms.get_forces()
290 fmax = sqrt((forces ** 2).sum(axis=1).max())
291 e = self.atoms.get_potential_energy(
292 force_consistent=self.force_consistent
293 )
294 T = time.localtime()
295 if self.logfile is not None:
296 name = self.__class__.__name__
297 if self.nsteps == 0:
298 args = (" " * len(name), "Step", "Time", "Energy", "fmax")
299 msg = "%s %4s %8s %15s %12s\n" % args
300 self.logfile.write(msg)
302 # if self.force_consistent:
303 # msg = "*Force-consistent energies used in optimization.\n"
304 # self.logfile.write(msg)
306 # XXX The "force consistent" handling is really arbitrary.
307 # Let's disable the special printing for now.
308 #
309 # ast = {1: "*", 0: ""}[self.force_consistent]
310 ast = ''
311 args = (name, self.nsteps, T[3], T[4], T[5], e, ast, fmax)
312 msg = "%s: %3d %02d:%02d:%02d %15.6f%1s %15.6f\n" % args
313 self.logfile.write(msg)
315 self.logfile.flush()
317 def dump(self, data):
318 if world.rank == 0 and self.restart is not None:
319 with open(self.restart, 'w') as fd:
320 write_json(fd, data)
322 def load(self):
323 with open(self.restart) as fd:
324 try:
325 return read_json(fd, always_array=False)
326 except Exception as ex:
327 msg = ('Could not decode restart file as JSON. '
328 'You may need to delete the restart file '
329 f'{self.restart}')
330 raise RestartError(msg) from ex
332 def set_force_consistent(self):
333 """Automatically sets force_consistent to True if force_consistent
334 energies are supported by calculator; else False."""
335 try:
336 self.atoms.get_potential_energy(force_consistent=True)
337 except PropertyNotImplementedError:
338 self.force_consistent = False
339 else:
340 self.force_consistent = True