Coverage for /builds/ase/ase/ase/io/trajectory.py : 89.23%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import warnings
2from typing import Tuple
4import numpy as np
6from ase import __version__
7from ase.calculators.singlepoint import SinglePointCalculator, all_properties
8from ase.constraints import dict2constraint
9from ase.calculators.calculator import PropertyNotImplementedError
10from ase.atoms import Atoms
11from ase.io.jsonio import encode, decode
12from ase.io.pickletrajectory import PickleTrajectory
13from ase.parallel import world
14from ase.utils import tokenize_version
17__all__ = ['Trajectory', 'PickleTrajectory']
20def Trajectory(filename, mode='r', atoms=None, properties=None, master=None):
21 """A Trajectory can be created in read, write or append mode.
23 Parameters:
25 filename: str
26 The name of the file. Traditionally ends in .traj.
27 mode: str
28 The mode. 'r' is read mode, the file should already exist, and
29 no atoms argument should be specified.
30 'w' is write mode. The atoms argument specifies the Atoms
31 object to be written to the file, if not given it must instead
32 be given as an argument to the write() method.
33 'a' is append mode. It acts as write mode, except that
34 data is appended to a preexisting file.
35 atoms: Atoms object
36 The Atoms object to be written in write or append mode.
37 properties: list of str
38 If specified, these calculator properties are saved in the
39 trajectory. If not specified, all supported quantities are
40 saved. Possible values: energy, forces, stress, dipole,
41 charges, magmom and magmoms.
42 master: bool
43 Controls which process does the actual writing. The
44 default is that process number 0 does this. If this
45 argument is given, processes where it is True will write.
47 The atoms, properties and master arguments are ignores in read mode.
48 """
49 if mode == 'r':
50 return TrajectoryReader(filename)
51 return TrajectoryWriter(filename, mode, atoms, properties, master=master)
54class TrajectoryWriter:
55 """Writes Atoms objects to a .traj file."""
57 def __init__(self, filename, mode='w', atoms=None, properties=None,
58 extra=[], master=None):
59 """A Trajectory writer, in write or append mode.
61 Parameters:
63 filename: str
64 The name of the file. Traditionally ends in .traj.
65 mode: str
66 The mode. 'r' is read mode, the file should already exist, and
67 no atoms argument should be specified.
68 'w' is write mode. The atoms argument specifies the Atoms
69 object to be written to the file, if not given it must instead
70 be given as an argument to the write() method.
71 'a' is append mode. It acts as write mode, except that
72 data is appended to a preexisting file.
73 atoms: Atoms object
74 The Atoms object to be written in write or append mode.
75 properties: list of str
76 If specified, these calculator properties are saved in the
77 trajectory. If not specified, all supported quantities are
78 saved. Possible values: energy, forces, stress, dipole,
79 charges, magmom and magmoms.
80 master: bool
81 Controls which process does the actual writing. The
82 default is that process number 0 does this. If this
83 argument is given, processes where it is True will write.
84 """
85 if master is None:
86 master = (world.rank == 0)
87 self.master = master
88 self.atoms = atoms
89 self.properties = properties
91 self.description = {}
92 self.header_data = None
93 self.multiple_headers = False
95 self._open(filename, mode)
97 def __enter__(self):
98 return self
100 def __exit__(self, exc_type, exc_value, tb):
101 self.close()
103 def set_description(self, description):
104 self.description.update(description)
106 def _open(self, filename, mode):
107 import ase.io.ulm as ulm
108 if mode not in 'aw':
109 raise ValueError('mode must be "w" or "a".')
110 if self.master:
111 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory')
112 if len(self.backend) > 0 and mode == 'a':
113 with Trajectory(filename) as traj:
114 atoms = traj[0]
115 self.header_data = get_header_data(atoms)
116 else:
117 self.backend = ulm.DummyWriter()
119 def write(self, atoms=None, **kwargs):
120 """Write the atoms to the file.
122 If the atoms argument is not given, the atoms object specified
123 when creating the trajectory object is used.
125 Use keyword arguments to add extra properties::
127 writer.write(atoms, energy=117, dipole=[0, 0, 1.0])
128 """
129 if atoms is None:
130 atoms = self.atoms
132 for image in atoms.iterimages():
133 self._write_atoms(image, **kwargs)
135 def _write_atoms(self, atoms, **kwargs):
136 b = self.backend
138 if self.header_data is None:
139 b.write(version=1, ase_version=__version__)
140 if self.description:
141 b.write(description=self.description)
142 # Atomic numbers and periodic boundary conditions are written
143 # in the header in the beginning.
144 #
145 # If an image later on has other numbers/pbc, we write a new
146 # header. All subsequent images will then have their own header
147 # whether or not their numbers/pbc change.
148 self.header_data = get_header_data(atoms)
149 write_header = True
150 else:
151 if not self.multiple_headers:
152 header_data = get_header_data(atoms)
153 self.multiple_headers = not headers_equal(self.header_data,
154 header_data)
155 write_header = self.multiple_headers
157 write_atoms(b, atoms, write_header=write_header)
159 calc = atoms.calc
161 if calc is None and len(kwargs) > 0:
162 calc = SinglePointCalculator(atoms)
164 if calc is not None:
165 if not hasattr(calc, 'get_property'):
166 calc = OldCalculatorWrapper(calc)
167 c = b.child('calculator')
168 c.write(name=calc.name)
169 if hasattr(calc, 'todict'):
170 c.write(parameters=calc.todict())
171 for prop in all_properties:
172 if prop in kwargs:
173 x = kwargs[prop]
174 else:
175 if self.properties is not None:
176 if prop in self.properties:
177 x = calc.get_property(prop, atoms)
178 else:
179 x = None
180 else:
181 try:
182 x = calc.get_property(prop, atoms,
183 allow_calculation=False)
184 except (PropertyNotImplementedError, KeyError):
185 # KeyError is needed for Jacapo.
186 # XXX We can perhaps remove this.
187 x = None
188 if x is not None:
189 if prop in ['stress', 'dipole']:
190 x = x.tolist()
191 c.write(prop, x)
193 info = {}
194 for key, value in atoms.info.items():
195 try:
196 encode(value)
197 except TypeError:
198 warnings.warn('Skipping "{0}" info.'.format(key))
199 else:
200 info[key] = value
201 if info:
202 b.write(info=info)
204 b.sync()
206 def close(self):
207 """Close the trajectory file."""
208 self.backend.close()
210 def __len__(self):
211 return world.sum(len(self.backend))
214class TrajectoryReader:
215 """Reads Atoms objects from a .traj file."""
217 def __init__(self, filename):
218 """A Trajectory in read mode.
220 The filename traditionally ends in .traj.
221 """
223 self.numbers = None
224 self.pbc = None
225 self.masses = None
227 self._open(filename)
229 def __enter__(self):
230 return self
232 def __exit__(self, exc_type, exc_value, tb):
233 self.close()
235 def _open(self, filename):
236 import ase.io.ulm as ulm
237 self.backend = ulm.open(filename, 'r')
238 self._read_header()
240 def _read_header(self):
241 b = self.backend
242 if b.get_tag() != 'ASE-Trajectory':
243 raise IOError('This is not a trajectory file!')
245 if len(b) > 0:
246 self.pbc = b.pbc
247 self.numbers = b.numbers
248 self.masses = b.get('masses')
249 self.constraints = b.get('constraints', '[]')
250 self.description = b.get('description')
251 self.version = b.version
252 self.ase_version = b.get('ase_version')
254 def close(self):
255 """Close the trajectory file."""
256 self.backend.close()
258 def __getitem__(self, i=-1):
259 if isinstance(i, slice):
260 return SlicedTrajectory(self, i)
261 b = self.backend[i]
262 if 'numbers' in b:
263 # numbers and other header info was written alongside the image:
264 atoms = read_atoms(b, traj=self)
265 else:
266 # header info was not written because they are the same:
267 atoms = read_atoms(b,
268 header=[self.pbc, self.numbers, self.masses,
269 self.constraints],
270 traj=self)
271 if 'calculator' in b:
272 results = {}
273 implemented_properties = []
274 c = b.calculator
275 for prop in all_properties:
276 if prop in c:
277 results[prop] = c.get(prop)
278 implemented_properties.append(prop)
279 calc = SinglePointCalculator(atoms, **results)
280 calc.name = b.calculator.name
281 calc.implemented_properties = implemented_properties
283 if 'parameters' in c:
284 calc.parameters.update(c.parameters)
285 atoms.calc = calc
287 return atoms
289 def __len__(self):
290 return len(self.backend)
292 def __iter__(self):
293 for i in range(len(self)):
294 yield self[i]
297class SlicedTrajectory:
298 """Wrapper to return a slice from a trajectory without loading
299 from disk. Initialize with a trajectory (in read mode) and the
300 desired slice object."""
302 def __init__(self, trajectory, sliced):
303 self.trajectory = trajectory
304 self.map = range(len(self.trajectory))[sliced]
306 def __getitem__(self, i):
307 if isinstance(i, slice):
308 # Map directly to the original traj, not recursively.
309 traj = SlicedTrajectory(self.trajectory, slice(0, None))
310 traj.map = self.map[i]
311 return traj
312 return self.trajectory[self.map[i]]
314 def __len__(self):
315 return len(self.map)
318def get_header_data(atoms):
319 return {'pbc': atoms.pbc.copy(),
320 'numbers': atoms.get_atomic_numbers(),
321 'masses': atoms.get_masses() if atoms.has('masses') else None,
322 'constraints': list(atoms.constraints)}
325def headers_equal(headers1, headers2):
326 assert len(headers1) == len(headers2)
327 eq = True
328 for key in headers1:
329 eq &= np.array_equal(headers1[key], headers2[key])
330 return eq
333class VersionTooOldError(Exception):
334 pass
337def read_atoms(backend,
338 header: Tuple = None,
339 traj: TrajectoryReader = None,
340 _try_except: bool = True) -> Atoms:
342 if _try_except:
343 try:
344 return read_atoms(backend, header, traj, False)
345 except Exception as ex:
346 if (traj is not None and tokenize_version(__version__) <
347 tokenize_version(traj.ase_version)):
348 msg = ('You are trying to read a trajectory file written '
349 f'by ASE-{traj.ase_version} from ASE-{__version__}. '
350 'It might help to update your ASE')
351 raise VersionTooOldError(msg) from ex
352 else:
353 raise
355 b = backend
356 if header:
357 pbc, numbers, masses, constraints = header
358 else:
359 pbc = b.pbc
360 numbers = b.numbers
361 masses = b.get('masses')
362 constraints = b.get('constraints', '[]')
364 atoms = Atoms(positions=b.positions,
365 numbers=numbers,
366 cell=b.cell,
367 masses=masses,
368 pbc=pbc,
369 info=b.get('info'),
370 constraint=[dict2constraint(d)
371 for d in decode(constraints)],
372 momenta=b.get('momenta'),
373 magmoms=b.get('magmoms'),
374 charges=b.get('charges'),
375 tags=b.get('tags'))
376 return atoms
379def write_atoms(backend, atoms, write_header=True):
380 b = backend
382 if write_header:
383 b.write(pbc=atoms.pbc.tolist(),
384 numbers=atoms.numbers)
385 if atoms.constraints:
386 if all(hasattr(c, 'todict') for c in atoms.constraints):
387 b.write(constraints=encode(atoms.constraints))
389 if atoms.has('masses'):
390 b.write(masses=atoms.get_masses())
392 b.write(positions=atoms.get_positions(),
393 cell=atoms.get_cell().tolist())
395 if atoms.has('tags'):
396 b.write(tags=atoms.get_tags())
397 if atoms.has('momenta'):
398 b.write(momenta=atoms.get_momenta())
399 if atoms.has('initial_magmoms'):
400 b.write(magmoms=atoms.get_initial_magnetic_moments())
401 if atoms.has('initial_charges'):
402 b.write(charges=atoms.get_initial_charges())
405def read_traj(fd, index):
406 trj = TrajectoryReader(fd)
407 for i in range(*index.indices(len(trj))):
408 yield trj[i]
411def write_traj(fd, images):
412 """Write image(s) to trajectory."""
413 trj = TrajectoryWriter(fd)
414 if isinstance(images, Atoms):
415 images = [images]
416 for atoms in images:
417 trj.write(atoms)
420class OldCalculatorWrapper:
421 def __init__(self, calc):
422 self.calc = calc
423 try:
424 self.name = calc.name
425 except AttributeError:
426 self.name = calc.__class__.__name__.lower()
428 def get_property(self, prop, atoms, allow_calculation=True):
429 try:
430 if (not allow_calculation and
431 self.calc.calculation_required(atoms, [prop])):
432 return None
433 except AttributeError:
434 pass
436 method = 'get_' + {'energy': 'potential_energy',
437 'magmom': 'magnetic_moment',
438 'magmoms': 'magnetic_moments',
439 'dipole': 'dipole_moment'}.get(prop, prop)
440 try:
441 result = getattr(self.calc, method)(atoms)
442 except AttributeError:
443 raise PropertyNotImplementedError
444 return result
447def convert(name):
448 import os
449 t = TrajectoryWriter(name + '.new')
450 for atoms in PickleTrajectory(name, _warn=False):
451 t.write(atoms)
452 t.close()
453 os.rename(name, name + '.old')
454 os.rename(name + '.new', name)
457def main():
458 import optparse
459 parser = optparse.OptionParser(usage='python -m ase.io.trajectory '
460 'a1.traj [a2.traj ...]',
461 description='Convert old trajectory '
462 'file(s) to new format. '
463 'The old file is kept as a1.traj.old.')
464 opts, args = parser.parse_args()
465 for name in args:
466 convert(name)
469if __name__ == '__main__':
470 main()