Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import warnings 

2from typing import Tuple 

3 

4import numpy as np 

5 

6from ase import __version__ 

7from ase.calculators.singlepoint import SinglePointCalculator, all_properties 

8from ase.constraints import dict2constraint 

9from ase.calculators.calculator import PropertyNotImplementedError 

10from ase.atoms import Atoms 

11from ase.io.jsonio import encode, decode 

12from ase.io.pickletrajectory import PickleTrajectory 

13from ase.parallel import world 

14from ase.utils import tokenize_version 

15 

16 

17__all__ = ['Trajectory', 'PickleTrajectory'] 

18 

19 

20def Trajectory(filename, mode='r', atoms=None, properties=None, master=None): 

21 """A Trajectory can be created in read, write or append mode. 

22 

23 Parameters: 

24 

25 filename: str 

26 The name of the file. Traditionally ends in .traj. 

27 mode: str 

28 The mode. 'r' is read mode, the file should already exist, and 

29 no atoms argument should be specified. 

30 'w' is write mode. The atoms argument specifies the Atoms 

31 object to be written to the file, if not given it must instead 

32 be given as an argument to the write() method. 

33 'a' is append mode. It acts as write mode, except that 

34 data is appended to a preexisting file. 

35 atoms: Atoms object 

36 The Atoms object to be written in write or append mode. 

37 properties: list of str 

38 If specified, these calculator properties are saved in the 

39 trajectory. If not specified, all supported quantities are 

40 saved. Possible values: energy, forces, stress, dipole, 

41 charges, magmom and magmoms. 

42 master: bool 

43 Controls which process does the actual writing. The 

44 default is that process number 0 does this. If this 

45 argument is given, processes where it is True will write. 

46 

47 The atoms, properties and master arguments are ignores in read mode. 

48 """ 

49 if mode == 'r': 

50 return TrajectoryReader(filename) 

51 return TrajectoryWriter(filename, mode, atoms, properties, master=master) 

52 

53 

54class TrajectoryWriter: 

55 """Writes Atoms objects to a .traj file.""" 

56 

57 def __init__(self, filename, mode='w', atoms=None, properties=None, 

58 extra=[], master=None): 

59 """A Trajectory writer, in write or append mode. 

60 

61 Parameters: 

62 

63 filename: str 

64 The name of the file. Traditionally ends in .traj. 

65 mode: str 

66 The mode. 'r' is read mode, the file should already exist, and 

67 no atoms argument should be specified. 

68 'w' is write mode. The atoms argument specifies the Atoms 

69 object to be written to the file, if not given it must instead 

70 be given as an argument to the write() method. 

71 'a' is append mode. It acts as write mode, except that 

72 data is appended to a preexisting file. 

73 atoms: Atoms object 

74 The Atoms object to be written in write or append mode. 

75 properties: list of str 

76 If specified, these calculator properties are saved in the 

77 trajectory. If not specified, all supported quantities are 

78 saved. Possible values: energy, forces, stress, dipole, 

79 charges, magmom and magmoms. 

80 master: bool 

81 Controls which process does the actual writing. The 

82 default is that process number 0 does this. If this 

83 argument is given, processes where it is True will write. 

84 """ 

85 if master is None: 

86 master = (world.rank == 0) 

87 self.master = master 

88 self.atoms = atoms 

89 self.properties = properties 

90 

91 self.description = {} 

92 self.header_data = None 

93 self.multiple_headers = False 

94 

95 self._open(filename, mode) 

96 

97 def __enter__(self): 

98 return self 

99 

100 def __exit__(self, exc_type, exc_value, tb): 

101 self.close() 

102 

103 def set_description(self, description): 

104 self.description.update(description) 

105 

106 def _open(self, filename, mode): 

107 import ase.io.ulm as ulm 

108 if mode not in 'aw': 

109 raise ValueError('mode must be "w" or "a".') 

110 if self.master: 

111 self.backend = ulm.open(filename, mode, tag='ASE-Trajectory') 

112 if len(self.backend) > 0 and mode == 'a': 

113 with Trajectory(filename) as traj: 

114 atoms = traj[0] 

115 self.header_data = get_header_data(atoms) 

116 else: 

117 self.backend = ulm.DummyWriter() 

118 

119 def write(self, atoms=None, **kwargs): 

120 """Write the atoms to the file. 

121 

122 If the atoms argument is not given, the atoms object specified 

123 when creating the trajectory object is used. 

124 

125 Use keyword arguments to add extra properties:: 

126 

127 writer.write(atoms, energy=117, dipole=[0, 0, 1.0]) 

128 """ 

129 if atoms is None: 

130 atoms = self.atoms 

131 

132 for image in atoms.iterimages(): 

133 self._write_atoms(image, **kwargs) 

134 

135 def _write_atoms(self, atoms, **kwargs): 

136 b = self.backend 

137 

138 if self.header_data is None: 

139 b.write(version=1, ase_version=__version__) 

140 if self.description: 

141 b.write(description=self.description) 

142 # Atomic numbers and periodic boundary conditions are written 

143 # in the header in the beginning. 

144 # 

145 # If an image later on has other numbers/pbc, we write a new 

146 # header. All subsequent images will then have their own header 

147 # whether or not their numbers/pbc change. 

148 self.header_data = get_header_data(atoms) 

149 write_header = True 

150 else: 

151 if not self.multiple_headers: 

152 header_data = get_header_data(atoms) 

153 self.multiple_headers = not headers_equal(self.header_data, 

154 header_data) 

155 write_header = self.multiple_headers 

156 

157 write_atoms(b, atoms, write_header=write_header) 

158 

159 calc = atoms.calc 

160 

161 if calc is None and len(kwargs) > 0: 

162 calc = SinglePointCalculator(atoms) 

163 

164 if calc is not None: 

165 if not hasattr(calc, 'get_property'): 

166 calc = OldCalculatorWrapper(calc) 

167 c = b.child('calculator') 

168 c.write(name=calc.name) 

169 if hasattr(calc, 'todict'): 

170 c.write(parameters=calc.todict()) 

171 for prop in all_properties: 

172 if prop in kwargs: 

173 x = kwargs[prop] 

174 else: 

175 if self.properties is not None: 

176 if prop in self.properties: 

177 x = calc.get_property(prop, atoms) 

178 else: 

179 x = None 

180 else: 

181 try: 

182 x = calc.get_property(prop, atoms, 

183 allow_calculation=False) 

184 except (PropertyNotImplementedError, KeyError): 

185 # KeyError is needed for Jacapo. 

186 # XXX We can perhaps remove this. 

187 x = None 

188 if x is not None: 

189 if prop in ['stress', 'dipole']: 

190 x = x.tolist() 

191 c.write(prop, x) 

192 

193 info = {} 

194 for key, value in atoms.info.items(): 

195 try: 

196 encode(value) 

197 except TypeError: 

198 warnings.warn('Skipping "{0}" info.'.format(key)) 

199 else: 

200 info[key] = value 

201 if info: 

202 b.write(info=info) 

203 

204 b.sync() 

205 

206 def close(self): 

207 """Close the trajectory file.""" 

208 self.backend.close() 

209 

210 def __len__(self): 

211 return world.sum(len(self.backend)) 

212 

213 

214class TrajectoryReader: 

215 """Reads Atoms objects from a .traj file.""" 

216 

217 def __init__(self, filename): 

218 """A Trajectory in read mode. 

219 

220 The filename traditionally ends in .traj. 

221 """ 

222 

223 self.numbers = None 

224 self.pbc = None 

225 self.masses = None 

226 

227 self._open(filename) 

228 

229 def __enter__(self): 

230 return self 

231 

232 def __exit__(self, exc_type, exc_value, tb): 

233 self.close() 

234 

235 def _open(self, filename): 

236 import ase.io.ulm as ulm 

237 self.backend = ulm.open(filename, 'r') 

238 self._read_header() 

239 

240 def _read_header(self): 

241 b = self.backend 

242 if b.get_tag() != 'ASE-Trajectory': 

243 raise IOError('This is not a trajectory file!') 

244 

245 if len(b) > 0: 

246 self.pbc = b.pbc 

247 self.numbers = b.numbers 

248 self.masses = b.get('masses') 

249 self.constraints = b.get('constraints', '[]') 

250 self.description = b.get('description') 

251 self.version = b.version 

252 self.ase_version = b.get('ase_version') 

253 

254 def close(self): 

255 """Close the trajectory file.""" 

256 self.backend.close() 

257 

258 def __getitem__(self, i=-1): 

259 if isinstance(i, slice): 

260 return SlicedTrajectory(self, i) 

261 b = self.backend[i] 

262 if 'numbers' in b: 

263 # numbers and other header info was written alongside the image: 

264 atoms = read_atoms(b, traj=self) 

265 else: 

266 # header info was not written because they are the same: 

267 atoms = read_atoms(b, 

268 header=[self.pbc, self.numbers, self.masses, 

269 self.constraints], 

270 traj=self) 

271 if 'calculator' in b: 

272 results = {} 

273 implemented_properties = [] 

274 c = b.calculator 

275 for prop in all_properties: 

276 if prop in c: 

277 results[prop] = c.get(prop) 

278 implemented_properties.append(prop) 

279 calc = SinglePointCalculator(atoms, **results) 

280 calc.name = b.calculator.name 

281 calc.implemented_properties = implemented_properties 

282 

283 if 'parameters' in c: 

284 calc.parameters.update(c.parameters) 

285 atoms.calc = calc 

286 

287 return atoms 

288 

289 def __len__(self): 

290 return len(self.backend) 

291 

292 def __iter__(self): 

293 for i in range(len(self)): 

294 yield self[i] 

295 

296 

297class SlicedTrajectory: 

298 """Wrapper to return a slice from a trajectory without loading 

299 from disk. Initialize with a trajectory (in read mode) and the 

300 desired slice object.""" 

301 

302 def __init__(self, trajectory, sliced): 

303 self.trajectory = trajectory 

304 self.map = range(len(self.trajectory))[sliced] 

305 

306 def __getitem__(self, i): 

307 if isinstance(i, slice): 

308 # Map directly to the original traj, not recursively. 

309 traj = SlicedTrajectory(self.trajectory, slice(0, None)) 

310 traj.map = self.map[i] 

311 return traj 

312 return self.trajectory[self.map[i]] 

313 

314 def __len__(self): 

315 return len(self.map) 

316 

317 

318def get_header_data(atoms): 

319 return {'pbc': atoms.pbc.copy(), 

320 'numbers': atoms.get_atomic_numbers(), 

321 'masses': atoms.get_masses() if atoms.has('masses') else None, 

322 'constraints': list(atoms.constraints)} 

323 

324 

325def headers_equal(headers1, headers2): 

326 assert len(headers1) == len(headers2) 

327 eq = True 

328 for key in headers1: 

329 eq &= np.array_equal(headers1[key], headers2[key]) 

330 return eq 

331 

332 

333class VersionTooOldError(Exception): 

334 pass 

335 

336 

337def read_atoms(backend, 

338 header: Tuple = None, 

339 traj: TrajectoryReader = None, 

340 _try_except: bool = True) -> Atoms: 

341 

342 if _try_except: 

343 try: 

344 return read_atoms(backend, header, traj, False) 

345 except Exception as ex: 

346 if (traj is not None and tokenize_version(__version__) < 

347 tokenize_version(traj.ase_version)): 

348 msg = ('You are trying to read a trajectory file written ' 

349 f'by ASE-{traj.ase_version} from ASE-{__version__}. ' 

350 'It might help to update your ASE') 

351 raise VersionTooOldError(msg) from ex 

352 else: 

353 raise 

354 

355 b = backend 

356 if header: 

357 pbc, numbers, masses, constraints = header 

358 else: 

359 pbc = b.pbc 

360 numbers = b.numbers 

361 masses = b.get('masses') 

362 constraints = b.get('constraints', '[]') 

363 

364 atoms = Atoms(positions=b.positions, 

365 numbers=numbers, 

366 cell=b.cell, 

367 masses=masses, 

368 pbc=pbc, 

369 info=b.get('info'), 

370 constraint=[dict2constraint(d) 

371 for d in decode(constraints)], 

372 momenta=b.get('momenta'), 

373 magmoms=b.get('magmoms'), 

374 charges=b.get('charges'), 

375 tags=b.get('tags')) 

376 return atoms 

377 

378 

379def write_atoms(backend, atoms, write_header=True): 

380 b = backend 

381 

382 if write_header: 

383 b.write(pbc=atoms.pbc.tolist(), 

384 numbers=atoms.numbers) 

385 if atoms.constraints: 

386 if all(hasattr(c, 'todict') for c in atoms.constraints): 

387 b.write(constraints=encode(atoms.constraints)) 

388 

389 if atoms.has('masses'): 

390 b.write(masses=atoms.get_masses()) 

391 

392 b.write(positions=atoms.get_positions(), 

393 cell=atoms.get_cell().tolist()) 

394 

395 if atoms.has('tags'): 

396 b.write(tags=atoms.get_tags()) 

397 if atoms.has('momenta'): 

398 b.write(momenta=atoms.get_momenta()) 

399 if atoms.has('initial_magmoms'): 

400 b.write(magmoms=atoms.get_initial_magnetic_moments()) 

401 if atoms.has('initial_charges'): 

402 b.write(charges=atoms.get_initial_charges()) 

403 

404 

405def read_traj(fd, index): 

406 trj = TrajectoryReader(fd) 

407 for i in range(*index.indices(len(trj))): 

408 yield trj[i] 

409 

410 

411def write_traj(fd, images): 

412 """Write image(s) to trajectory.""" 

413 trj = TrajectoryWriter(fd) 

414 if isinstance(images, Atoms): 

415 images = [images] 

416 for atoms in images: 

417 trj.write(atoms) 

418 

419 

420class OldCalculatorWrapper: 

421 def __init__(self, calc): 

422 self.calc = calc 

423 try: 

424 self.name = calc.name 

425 except AttributeError: 

426 self.name = calc.__class__.__name__.lower() 

427 

428 def get_property(self, prop, atoms, allow_calculation=True): 

429 try: 

430 if (not allow_calculation and 

431 self.calc.calculation_required(atoms, [prop])): 

432 return None 

433 except AttributeError: 

434 pass 

435 

436 method = 'get_' + {'energy': 'potential_energy', 

437 'magmom': 'magnetic_moment', 

438 'magmoms': 'magnetic_moments', 

439 'dipole': 'dipole_moment'}.get(prop, prop) 

440 try: 

441 result = getattr(self.calc, method)(atoms) 

442 except AttributeError: 

443 raise PropertyNotImplementedError 

444 return result 

445 

446 

447def convert(name): 

448 import os 

449 t = TrajectoryWriter(name + '.new') 

450 for atoms in PickleTrajectory(name, _warn=False): 

451 t.write(atoms) 

452 t.close() 

453 os.rename(name, name + '.old') 

454 os.rename(name + '.new', name) 

455 

456 

457def main(): 

458 import optparse 

459 parser = optparse.OptionParser(usage='python -m ase.io.trajectory ' 

460 'a1.traj [a2.traj ...]', 

461 description='Convert old trajectory ' 

462 'file(s) to new format. ' 

463 'The old file is kept as a1.traj.old.') 

464 opts, args = parser.parse_args() 

465 for name in args: 

466 convert(name) 

467 

468 

469if __name__ == '__main__': 

470 main()