vasp.runners module

Contents

vasp.runners module#

Runners for executing VASP calculations.

Runners handle the execution of VASP jobs, whether locally, on a cluster scheduler (SLURM), or on Kubernetes. They provide a uniform interface for submitting jobs and checking status.

Example

>>> from vasp.runners import LocalRunner, SlurmRunner
>>>
>>> # For local execution
>>> runner = LocalRunner(vasp_command='vasp_std')
>>>
>>> # For SLURM cluster
>>> runner = SlurmRunner(partition='compute', nodes=2)
class vasp.runners.Runner[source]#

Bases: ABC

Abstract base class for VASP execution backends.

Runners handle submitting and monitoring VASP calculations. They are designed for non-blocking operation: - run() submits/starts a job and returns immediately - status() checks current state without blocking - Exceptions signal state transitions to calling code

Subclasses must implement: - run(): Submit or start a calculation - status(): Check current job status

Example

>>> runner = LocalRunner(vasp_command='vasp_std')
>>> status = runner.run('/path/to/calc')
>>> if status.state == JobState.COMPLETE:
...     print("Done!")
abstract run(directory)[source]#

Start or submit a VASP calculation.

This method should NOT block waiting for completion. For async runners (SLURM, K8s), it submits the job. For local runners, behavior depends on configuration.

Parameters:

directory (str) – Path to calculation directory containing input files.

Returns:

JobStatus with current state after submission.

Raises:
Return type:

JobStatus

abstract status(directory)[source]#

Check status of a calculation without blocking.

This method never starts a new calculation. It only reports the current state based on job scheduler status and output files.

Parameters:

directory (str) – Path to calculation directory.

Returns:

JobStatus with current state.

Return type:

JobStatus

cancel(directory)[source]#

Cancel a running or queued job.

Parameters:

directory (str) – Path to calculation directory.

Returns:

True if cancellation was successful or job wasn’t running.

Return type:

bool

wait(directory, timeout=None, poll_interval=30.0)[source]#

Block until calculation completes (for interactive use).

This is a convenience method that polls status() until the job is done. For most workflows, you should use status() directly with your own retry logic.

Parameters:
  • directory (str) – Path to calculation directory.

  • timeout (float | None) – Maximum seconds to wait (None = forever).

  • poll_interval (float) – Seconds between status checks.

Returns:

Final JobStatus.

Raises:

TimeoutError – If timeout is reached before completion.

Return type:

JobStatus

get_logs(directory, tail_lines=100)[source]#

Get log output from the calculation.

Parameters:
  • directory (str) – Path to calculation directory.

  • tail_lines (int) – Number of lines to return from end of log.

Returns:

Log content as string.

Return type:

str

class vasp.runners.JobState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

Possible states of a VASP calculation.

NOT_STARTED = 'not_started'#
SUBMITTED = 'submitted'#
QUEUED = 'queued'#
RUNNING = 'running'#
COMPLETE = 'complete'#
FAILED = 'failed'#
UNKNOWN = 'unknown'#
class vasp.runners.JobStatus(state, jobid=None, message=None, metadata=<factory>)[source]#

Bases: object

Status information for a VASP calculation.

Parameters:
state#

Current state of the job.

Type:

vasp.runners.base.JobState

jobid#

Job identifier from the scheduler (if applicable).

Type:

str | None

message#

Additional status message or error description.

Type:

str | None

metadata#

Additional runner-specific data.

Type:

dict[str, Any]

state: JobState#
jobid: str | None = None#
message: str | None = None#
metadata: dict[str, Any]#
property is_done: bool#

Check if calculation has finished (success or failure).

property is_active: bool#

Check if calculation is queued or running.

property is_success: bool#

Check if calculation completed successfully.

class vasp.runners.LocalRunner(vasp_executable=None, nprocs='auto', mpi_command='mpirun', mpi_extra_args=None, background=False, vasp_command=None)[source]#

Bases: Runner

Run VASP on the local machine.

By default runs synchronously (blocking until complete). Use background=True for non-blocking execution.

The runner intelligently determines the number of MPI processes: - If nprocs=’auto’ (default): detects based on CPUs and system size - If nprocs is an int: uses that exact number - Respects NCORE settings in INCAR

Environment variables (in order of precedence): - VASP_EXECUTABLE: Path to VASP binary (e.g., /opt/vasp/bin/vasp_std) - VASP_COMMAND: Full command (legacy, e.g., ‘mpirun -np 8 vasp_std’) - VASP_NPROCS: Number of MPI processes (or ‘auto’) - VASP_MPI_EXTRA_ARGS: Extra MPI arguments (e.g., ‘–map-by hwthread’)

Parameters:
  • vasp_executable (str | None) – Path to VASP binary. Defaults to $VASP_EXECUTABLE, then extracts from $VASP_COMMAND, or ‘vasp_std’.

  • nprocs (int | str) – Number of MPI processes. ‘auto’ (default) detects optimal count, or specify an integer. Set to 1 for serial execution.

  • mpi_command (str | None) – MPI launcher (default: ‘mpirun’). Set to None to disable MPI and run serial VASP.

  • mpi_extra_args (str | None) – Extra arguments for MPI launcher.

  • background (bool) – If True, run in background and return immediately.

  • vasp_command (str | None)

Example

>>> # Auto-detect optimal parallelization
>>> runner = LocalRunner()
>>> calc = Vasp('my_calc', runner=runner, ...)
>>> # Specify exact process count
>>> runner = LocalRunner(nprocs=16)
>>> # Serial execution (no MPI)
>>> runner = LocalRunner(nprocs=1, mpi_command=None)
>>> # Custom MPI settings
>>> runner = LocalRunner(
...     nprocs=8,
...     mpi_extra_args='--bind-to core --map-by socket'
... )
run(directory)[source]#

Run VASP in the specified directory.

Parameters:

directory (str) – Path to calculation directory with input files.

Returns:

JobStatus indicating outcome.

Raises:
Return type:

JobStatus

status(directory)[source]#

Check status of calculation.

Parameters:

directory (str)

Return type:

JobStatus

cancel(directory)[source]#

Kill running VASP process.

Parameters:

directory (str)

Return type:

bool

vasp.runners.get_optimal_nprocs(directory=None)[source]#

Determine optimal number of MPI processes.

Uses heuristics based on available CPUs and system size: - Reads NCORE/NPAR from INCAR if available - Checks number of atoms from POSCAR - Returns reasonable default based on CPU count

Parameters:

directory (str | None) – Calculation directory (optional, for reading INCAR/POSCAR)

Returns:

Recommended number of MPI processes

Return type:

int

class vasp.runners.MockRunner(results=None, energy=None, forces=None, stress=None, state_sequence=None, delay_calls=0, delay=0, fail_on_run=False, error_message='Mock error', write_outputs=True)[source]#

Bases: Runner

Mock runner for testing without VASP.

Simulates VASP execution by writing fake output files. Useful for testing calculator logic, workflow integration, and development when VASP is not available.

Parameters:
  • results (MockResults | None) – MockResults object with calculation outputs.

  • energy (float | None) – Shortcut to set results.energy.

  • forces (list | np.ndarray | None) – Shortcut to set results.forces.

  • state_sequence (list[JobState] | None) – List of JobStates to cycle through on each status() call. Useful for testing async workflows.

  • delay_calls (int) – Number of status() calls before returning COMPLETE. Alternative to state_sequence for simple cases.

  • fail_on_run (bool) – If True, immediately fail when run() is called.

  • error_message (str) – Error message if fail_on_run is True.

  • write_outputs (bool) – If True, write mock OUTCAR/vasprun.xml files.

  • stress (list | np.ndarray | None)

  • delay (float)

Example

>>> # Simple successful calculation
>>> runner = MockRunner(energy=-15.3)
>>>
>>> # Simulate job that takes 3 status checks to complete
>>> runner = MockRunner(
...     energy=-15.3,
...     state_sequence=[
...         JobState.QUEUED,
...         JobState.RUNNING,
...         JobState.RUNNING,
...         JobState.COMPLETE,
...     ]
... )
>>>
>>> # Simulate failed calculation
>>> runner = MockRunner(fail_on_run=True, error_message="ZBRENT error")
run(directory)[source]#

Simulate running VASP.

Creates mock output files and tracks job state.

Parameters:

directory (str)

Return type:

JobStatus

status(directory)[source]#

Check mock job status.

Cycles through state_sequence or delay_calls to simulate job progression.

Parameters:

directory (str)

Return type:

JobStatus

cancel(directory)[source]#

Cancel mock job.

Parameters:

directory (str)

Return type:

bool

reset()[source]#

Reset all state tracking (useful between tests).

Return type:

None

class vasp.runners.MockResults(energy=-10.0, forces=None, stress=None, magmom=0.0, magmoms=None, fermi_level=-5.0, eigenvalues=None, occupations=None, ibz_kpts=None, kpt_weights=None, converged=True, metadata=<factory>)[source]#

Bases: object

Container for mock calculation results.

Parameters:
energy#

Total energy in eV.

Type:

float

forces#

Forces array of shape (natoms, 3) in eV/Angstrom.

Type:

list[list[float]] | numpy.ndarray | None

stress#

Stress tensor as 6-element Voigt array in kBar.

Type:

list[float] | numpy.ndarray | None

magmom#

Total magnetic moment in Bohr magnetons.

Type:

float

magmoms#

Per-atom magnetic moments.

Type:

list[float] | numpy.ndarray | None

fermi_level#

Fermi energy in eV.

Type:

float

eigenvalues#

Eigenvalue array (nspin, nkpts, nbands).

Type:

numpy.ndarray | None

occupations#

Occupation numbers (nspin, nkpts, nbands).

Type:

numpy.ndarray | None

ibz_kpts#

IBZ k-points array (nkpts, 3).

Type:

numpy.ndarray | None

kpt_weights#

K-point weights (nkpts,).

Type:

numpy.ndarray | None

converged#

Whether calculation converged.

Type:

bool

metadata#

Additional data.

Type:

dict[str, Any]

energy: float = -10.0#
forces: list[list[float]] | ndarray | None = None#
stress: list[float] | ndarray | None = None#
magmom: float = 0.0#
magmoms: list[float] | ndarray | None = None#
fermi_level: float = -5.0#
eigenvalues: ndarray | None = None#
occupations: ndarray | None = None#
ibz_kpts: ndarray | None = None#
kpt_weights: ndarray | None = None#
converged: bool = True#
metadata: dict[str, Any]#
class vasp.runners.SlurmRunner(partition='normal', nodes=1, ntasks_per_node=24, time='24:00:00', memory=None, account=None, qos=None, vasp_command=None, modules=None, extra_sbatch=None, constraint=None)[source]#

Bases: Runner

Run VASP via SLURM scheduler.

Submits jobs to SLURM and monitors status. All operations are non-blocking - run() submits and returns immediately.

Parameters:
  • partition (str) – SLURM partition name.

  • nodes (int) – Number of nodes.

  • ntasks_per_node (int) – MPI tasks per node.

  • time (str) – Wall time limit (HH:MM:SS format).

  • memory (str | None) – Memory per node (e.g., ‘64G’).

  • account (str | None) – SLURM account for billing.

  • qos (str | None) – Quality of service.

  • vasp_command (str | None) – Command to run VASP. Defaults to ‘srun $VASP_COMMAND’ if VASP_COMMAND is set, or ‘srun vasp_std’ otherwise.

  • modules (list[str] | None) – List of modules to load before running.

  • extra_sbatch (list[str] | None) – Additional #SBATCH directives as list of strings.

  • constraint (str | None) – Node constraint (e.g., ‘gpu’ or ‘skylake’).

Example

>>> runner = SlurmRunner(
...     partition='compute',
...     nodes=2,
...     ntasks_per_node=48,
...     time='24:00:00',
...     modules=['vasp/6.3.0'],
... )
>>>
>>> calc = Vasp('my_calc', runner=runner, atoms=atoms)
>>> try:
...     energy = calc.potential_energy
... except VaspSubmitted as e:
...     print(f"Submitted job {e.jobid}")
run(directory)[source]#

Submit VASP job to SLURM.

Parameters:

directory (str)

Return type:

JobStatus

status(directory)[source]#

Check SLURM job status.

Parameters:

directory (str)

Return type:

JobStatus

cancel(directory)[source]#

Cancel SLURM job.

Parameters:

directory (str)

Return type:

bool

get_logs(directory, tail_lines=100)[source]#

Get SLURM job output.

Parameters:
  • directory (str)

  • tail_lines (int)

Return type:

str

class vasp.runners.KubernetesRunner(namespace='default', image='vasp:latest', pvc_name='vasp-pvc', pvc_mount_path='/data', cpu_request='4', cpu_limit=None, memory_request='8Gi', memory_limit=None, gpu_limit=None, node_selector=None, tolerations=None, service_account=None, image_pull_secrets=None, vasp_command='mpirun -np $NPROCS vasp_std', env_vars=None, ttl_seconds_after_finished=86400, active_deadline_seconds=None, backoff_limit=0, use_in_cluster_config=False)[source]#

Bases: Runner

Run VASP on Kubernetes cluster.

Submits VASP calculations as Kubernetes Job resources. Input/output files must be on shared storage (PVC).

Parameters:
  • namespace (str) – Kubernetes namespace.

  • image (str) – Container image with VASP.

  • pvc_name (str) – PersistentVolumeClaim name for data.

  • pvc_mount_path (str) – Mount path in container.

  • cpu_request (str) – CPU request (e.g., ‘4’).

  • cpu_limit (str | None) – CPU limit (defaults to request).

  • memory_request (str) – Memory request (e.g., ‘16Gi’).

  • memory_limit (str | None) – Memory limit (defaults to request).

  • gpu_limit (int | None) – Number of GPUs (for vasp_gpu).

  • node_selector (dict[str, str] | None) – Dict of node labels for scheduling.

  • tolerations (list[dict] | None) – List of tolerations for scheduling.

  • service_account (str | None) – ServiceAccount name.

  • image_pull_secrets (list[str] | None) – List of image pull secret names.

  • vasp_command (str) – Command to run VASP.

  • env_vars (dict[str, str] | None) – Environment variables dict.

  • ttl_seconds_after_finished (int) – Job cleanup time (default 1 day).

  • active_deadline_seconds (int | None) – Job timeout.

  • backoff_limit (int) – Retry count (default 0 = no retries).

  • use_in_cluster_config (bool) – Use in-cluster config (for pods).

Example

>>> runner = KubernetesRunner(
...     namespace='vasp-jobs',
...     image='my-registry/vasp:6.3.0',
...     pvc_name='vasp-data',
...     cpu_request='8',
...     memory_request='32Gi',
... )
>>>
>>> calc = Vasp('/data/calculations/my_calc', runner=runner, ...)
run(directory)[source]#

Submit VASP job to Kubernetes.

Parameters:

directory (str)

Return type:

JobStatus

status(directory)[source]#

Check Kubernetes job status.

Parameters:

directory (str)

Return type:

JobStatus

cancel(directory)[source]#

Delete Kubernetes job.

Parameters:

directory (str)

Return type:

bool

get_logs(directory, tail_lines=100)[source]#

Get logs from VASP pod.

Parameters:
  • directory (str)

  • tail_lines (int)

Return type:

str

class vasp.runners.InteractiveRunner(vasp_command=None, mpi_command=None, timeout=3600, parse_stress=False)[source]#

Bases: Runner

Run VASP in interactive mode with persistent process.

This runner maintains a live VASP process and communicates via stdin/stdout. It’s designed for geometry optimizations where reusing wavefunctions between steps provides significant speedup.

Parameters:
  • vasp_command (str | None) – Command to run VASP. Defaults to $VASP_COMMAND environment variable, or ‘vasp_std’ if not set.

  • mpi_command (str | None) – MPI launcher command, e.g., ‘mpirun -np 4’.

  • timeout (float) – Seconds to wait for VASP response (default: 3600).

  • parse_stress (bool) – Whether to parse stress tensor (default: False).

Example

>>> runner = InteractiveRunner(vasp_command='vasp_std')
>>> runner.start('/path/to/calc')
>>>
>>> # Optimization loop
>>> for step in range(100):
...     results = runner.step(atoms)
...     if converged(results.forces):
...         break
...     atoms.positions = optimize(atoms, results.forces)
>>>
>>> runner.close()

Note

The INCAR must NOT contain NSW > 0 or IBRION settings. This runner handles the ionic loop externally.

property is_running: bool#

Check if VASP process is active.

start(directory, atoms)[source]#

Start VASP interactive session.

Parameters:
  • directory (str) – Path to calculation directory with input files.

  • atoms (Atoms) – Initial atomic structure.

Returns:

Results from initial SCF calculation.

Raises:
Return type:

InteractiveResults

step(atoms)[source]#

Send new positions and get updated results.

Parameters:

atoms (Atoms) – Updated atomic structure.

Returns:

Energy, forces, and optionally stress from new SCF.

Raises:

VaspError – If VASP process died or communication failed.

Return type:

InteractiveResults

close()[source]#

Gracefully close the VASP session.

Writes STOPCAR to trigger clean shutdown, then waits for VASP to finish writing output files.

Return type:

None

run(directory)[source]#

Standard runner interface - not used for interactive mode.

For interactive mode, use start(), step(), close() directly. This method exists for compatibility with the Runner interface.

Parameters:

directory (str)

Return type:

JobStatus

status(directory)[source]#

Check status of interactive session.

Parameters:

directory (str)

Return type:

JobStatus

class vasp.runners.InteractiveResults(energy, forces, stress=None, converged=True)[source]#

Bases: object

Results from a single interactive step.

Parameters:
energy: float#
forces: ndarray#
stress: ndarray | None = None#
converged: bool = True#
class vasp.runners.SocketServer(config=None)[source]#

Bases: object

Socket server for driving VASP calculations.

This server implements the driver side of the i-PI protocol. It sends atomic positions to a VASP client and receives energies and forces.

Parameters:

config (SocketConfig | None) – Socket configuration.

Example

>>> server = SocketServer(SocketConfig(port=31415))
>>> server.start()
>>> server.wait_for_client()
>>>
>>> # Send positions, get forces
>>> results = server.calculate(atoms)
>>>
>>> server.close()
start()[source]#

Start the socket server.

Return type:

None

wait_for_client()[source]#

Wait for a client to connect.

Return type:

None

calculate(atoms)[source]#

Send positions and receive energy/forces.

Parameters:

atoms (Atoms) – Atomic structure with positions.

Returns:

Dict with ‘energy’, ‘forces’, and optionally ‘stress’.

Return type:

dict

close()[source]#

Close the server and client connections.

Return type:

None

class vasp.runners.SocketClient(config, runner=None)[source]#

Bases: object

Socket client for VASP to receive positions and send forces.

This is typically run by a wrapper around VASP. It receives positions from a driver, passes them to VASP, and sends back energies and forces.

Parameters:

Example

>>> from vasp.runners import InteractiveRunner
>>>
>>> runner = InteractiveRunner()
>>> client = SocketClient(config, runner)
>>> client.connect()
>>> client.run()  # Main loop
connect()[source]#

Connect to the server.

Return type:

None

run(atoms_template, directory)[source]#

Main client loop.

Parameters:
  • atoms_template (Atoms) – Template atoms object (for element types).

  • directory (str) – VASP calculation directory.

Return type:

None

close()[source]#

Close the connection.

Return type:

None

class vasp.runners.SocketConfig(host='localhost', port=31415, unix_socket=None, timeout=3600.0)[source]#

Bases: object

Configuration for socket connection.

Parameters:
host: str = 'localhost'#
port: int = 31415#
unix_socket: str | None = None#
timeout: float = 3600.0#

Base Classes#

class vasp.runners.Runner[source]#

Bases: ABC

Abstract base class for VASP execution backends.

Runners handle submitting and monitoring VASP calculations. They are designed for non-blocking operation: - run() submits/starts a job and returns immediately - status() checks current state without blocking - Exceptions signal state transitions to calling code

Subclasses must implement: - run(): Submit or start a calculation - status(): Check current job status

Example

>>> runner = LocalRunner(vasp_command='vasp_std')
>>> status = runner.run('/path/to/calc')
>>> if status.state == JobState.COMPLETE:
...     print("Done!")
abstract run(directory)[source]#

Start or submit a VASP calculation.

This method should NOT block waiting for completion. For async runners (SLURM, K8s), it submits the job. For local runners, behavior depends on configuration.

Parameters:

directory (str) – Path to calculation directory containing input files.

Returns:

JobStatus with current state after submission.

Raises:
Return type:

JobStatus

abstract status(directory)[source]#

Check status of a calculation without blocking.

This method never starts a new calculation. It only reports the current state based on job scheduler status and output files.

Parameters:

directory (str) – Path to calculation directory.

Returns:

JobStatus with current state.

Return type:

JobStatus

cancel(directory)[source]#

Cancel a running or queued job.

Parameters:

directory (str) – Path to calculation directory.

Returns:

True if cancellation was successful or job wasn’t running.

Return type:

bool

wait(directory, timeout=None, poll_interval=30.0)[source]#

Block until calculation completes (for interactive use).

This is a convenience method that polls status() until the job is done. For most workflows, you should use status() directly with your own retry logic.

Parameters:
  • directory (str) – Path to calculation directory.

  • timeout (float | None) – Maximum seconds to wait (None = forever).

  • poll_interval (float) – Seconds between status checks.

Returns:

Final JobStatus.

Raises:

TimeoutError – If timeout is reached before completion.

Return type:

JobStatus

get_logs(directory, tail_lines=100)[source]#

Get log output from the calculation.

Parameters:
  • directory (str) – Path to calculation directory.

  • tail_lines (int) – Number of lines to return from end of log.

Returns:

Log content as string.

Return type:

str

class vasp.runners.JobStatus(state, jobid=None, message=None, metadata=<factory>)[source]#

Bases: object

Status information for a VASP calculation.

Parameters:
state#

Current state of the job.

Type:

vasp.runners.base.JobState

jobid#

Job identifier from the scheduler (if applicable).

Type:

str | None

message#

Additional status message or error description.

Type:

str | None

metadata#

Additional runner-specific data.

Type:

dict[str, Any]

state: JobState#
jobid: str | None = None#
message: str | None = None#
metadata: dict[str, Any]#
property is_done: bool#

Check if calculation has finished (success or failure).

property is_active: bool#

Check if calculation is queued or running.

property is_success: bool#

Check if calculation completed successfully.

Runners#

class vasp.runners.LocalRunner(vasp_executable=None, nprocs='auto', mpi_command='mpirun', mpi_extra_args=None, background=False, vasp_command=None)[source]#

Bases: Runner

Run VASP on the local machine.

By default runs synchronously (blocking until complete). Use background=True for non-blocking execution.

The runner intelligently determines the number of MPI processes: - If nprocs=’auto’ (default): detects based on CPUs and system size - If nprocs is an int: uses that exact number - Respects NCORE settings in INCAR

Environment variables (in order of precedence): - VASP_EXECUTABLE: Path to VASP binary (e.g., /opt/vasp/bin/vasp_std) - VASP_COMMAND: Full command (legacy, e.g., ‘mpirun -np 8 vasp_std’) - VASP_NPROCS: Number of MPI processes (or ‘auto’) - VASP_MPI_EXTRA_ARGS: Extra MPI arguments (e.g., ‘–map-by hwthread’)

Parameters:
  • vasp_executable (str | None) – Path to VASP binary. Defaults to $VASP_EXECUTABLE, then extracts from $VASP_COMMAND, or ‘vasp_std’.

  • nprocs (int | str) – Number of MPI processes. ‘auto’ (default) detects optimal count, or specify an integer. Set to 1 for serial execution.

  • mpi_command (str | None) – MPI launcher (default: ‘mpirun’). Set to None to disable MPI and run serial VASP.

  • mpi_extra_args (str | None) – Extra arguments for MPI launcher.

  • background (bool) – If True, run in background and return immediately.

  • vasp_command (str | None)

Example

>>> # Auto-detect optimal parallelization
>>> runner = LocalRunner()
>>> calc = Vasp('my_calc', runner=runner, ...)
>>> # Specify exact process count
>>> runner = LocalRunner(nprocs=16)
>>> # Serial execution (no MPI)
>>> runner = LocalRunner(nprocs=1, mpi_command=None)
>>> # Custom MPI settings
>>> runner = LocalRunner(
...     nprocs=8,
...     mpi_extra_args='--bind-to core --map-by socket'
... )
run(directory)[source]#

Run VASP in the specified directory.

Parameters:

directory (str) – Path to calculation directory with input files.

Returns:

JobStatus indicating outcome.

Raises:
Return type:

JobStatus

status(directory)[source]#

Check status of calculation.

Parameters:

directory (str)

Return type:

JobStatus

cancel(directory)[source]#

Kill running VASP process.

Parameters:

directory (str)

Return type:

bool

class vasp.runners.MockRunner(results=None, energy=None, forces=None, stress=None, state_sequence=None, delay_calls=0, delay=0, fail_on_run=False, error_message='Mock error', write_outputs=True)[source]#

Bases: Runner

Mock runner for testing without VASP.

Simulates VASP execution by writing fake output files. Useful for testing calculator logic, workflow integration, and development when VASP is not available.

Parameters:
  • results (MockResults | None) – MockResults object with calculation outputs.

  • energy (float | None) – Shortcut to set results.energy.

  • forces (list | np.ndarray | None) – Shortcut to set results.forces.

  • state_sequence (list[JobState] | None) – List of JobStates to cycle through on each status() call. Useful for testing async workflows.

  • delay_calls (int) – Number of status() calls before returning COMPLETE. Alternative to state_sequence for simple cases.

  • fail_on_run (bool) – If True, immediately fail when run() is called.

  • error_message (str) – Error message if fail_on_run is True.

  • write_outputs (bool) – If True, write mock OUTCAR/vasprun.xml files.

  • stress (list | np.ndarray | None)

  • delay (float)

Example

>>> # Simple successful calculation
>>> runner = MockRunner(energy=-15.3)
>>>
>>> # Simulate job that takes 3 status checks to complete
>>> runner = MockRunner(
...     energy=-15.3,
...     state_sequence=[
...         JobState.QUEUED,
...         JobState.RUNNING,
...         JobState.RUNNING,
...         JobState.COMPLETE,
...     ]
... )
>>>
>>> # Simulate failed calculation
>>> runner = MockRunner(fail_on_run=True, error_message="ZBRENT error")
run(directory)[source]#

Simulate running VASP.

Creates mock output files and tracks job state.

Parameters:

directory (str)

Return type:

JobStatus

status(directory)[source]#

Check mock job status.

Cycles through state_sequence or delay_calls to simulate job progression.

Parameters:

directory (str)

Return type:

JobStatus

cancel(directory)[source]#

Cancel mock job.

Parameters:

directory (str)

Return type:

bool

reset()[source]#

Reset all state tracking (useful between tests).

Return type:

None