Distributed Computing

Table of Contents

Run APSIM simulation

What is distributed computed ?

Distributed computing (or parallelism) is the practice of dividing computing tasks among multiple processing resources to speed up computations. See Li, Z., Qi, Z., Liu, Y., Zheng, Y., & Yang, Y. (2023). A modularized parallel distributed High–Performance computing framework for simulating seasonal frost dynamics in Canadian croplands. Computers and Electronics in Agriculture, 212, 108057.

In apsimNGpy, this is achieved through the MultiCoreManager API, which abstracts away most of the setup required for distributing tasks.

Below, we’ll walk through a step-by-step example of using this API

from apsimNGpy.core.multi_cores import MultiCoreManager
from apsimNGpy.core.apsim import ApsimModel
from pathlib import Path

In this example (v0.39.03.11+), we assume your APSIM files are already prepared (or available in various locations) and you simply want a speed boost when running them and processing results. For demonstration purposes, we’ll generate some example jobs:

# Best to supply a generator so jobs are created on the fly
create_jobs = (
    ApsimModel('Maize').path
    for _ in range(100)
)

Here we use the ApsimModel class to clone from the template Maize model shipped with APSIM. If you do not specify the out_path argument, each file is assigned a random filename. This is critical in multi-processing—you must ensure that no two processes share the same filename, otherwise the run will fail.

To explicitly set unique filenames for each simulation:

create_jobs = (ApsimModel('Maize', out_path = Path(f"_{i}.apsimx").resolve()).path

Tip

The key idea: every file must have a unique identifier to avoid race conditions during parallel execution.

Job batching

In newer apsimNGpy versions, each job must specify the APSIM .apsimx model to execute and may include additional metadata.

Supported job definitions include:

1. Plain job batching (no metadata, no edits)

This assumes that each model file is unique and has already been edited externally.

jobs = {
    'model_0.apsimx',
    'model_1.apsimx',
    'model_2.apsimx',
    'model_3.apsimx',
    'model_4.apsimx',
    'model_5.apsimx',
    'model_6.apsimx',
    'model_7.apsimx'
}

Note

In the newer apsimNGpy version v1.1.0+, when engine=’csharp’, jobs must be defined with ID description see below

2. Job batching with metadata

This format allows attaching identifiers or other metadata to each job. Models are assumed to be unique and pre-edited.

[
    {'model': 'model_0.apsimx', 'ID': 0},
    {'model': 'model_1.apsimx', 'ID': 1},
    {'model': 'model_2.apsimx', 'ID': 2},
    {'model': 'model_3.apsimx', 'ID': 3},
    {'model': 'model_4.apsimx', 'ID': 4},
    {'model': 'model_5.apsimx', 'ID': 5},
    {'model': 'model_6.apsimx', 'ID': 6},
    {'model': 'model_7.apsimx', 'ID': 7}
]

3. Job batching with internal model edits

In this format, each job specifies an inputs list with dicts representing each node to be edited internally by the runner. These edits must follow the rules of edit_model_by_path(). The input dictionary is treated as metadata and is attached to the results’ tables. When both inputs and additional metadata are provided, they are merged into a single metadata mapping prior to attachment, with former entries overriding earlier metadata keys and thereby avoiding duplicate keys in the results’ tables.

jobs=  [
     {
         'model': 'model_0.apsimx',
         'ID': 0,
         'inputs': [{
             'path': '.Simulations.Simulation.Field.Fertilise at sowing',
             'Amount': 0
         }]
     },
     {
         'model': 'model_1.apsimx',
         'ID': 1,
         'inputs': [{
             'path': '.Simulations.Simulation.Field.Fertilise at sowing',
             'Amount': 50
         }]
     },
     {
         'model': 'model_2.apsimx',
         'ID': 2,
         'inputs': [{
             'path': '.Simulations.Simulation.Field.Fertilise at sowing',
             'Amount': 100
         }]
     }
 ]

Instantiating and Running the batches

if __name__ == '__main__': # a guard is required
    # create jobs
    create_jobs = (ApsimModel('Maize', out_path = Path(f"_{i}.apsimx").resolve()).path
    # initialize
    task_manager = MultiCoreManager(db_path=Path('test.db').resolve(), agg_func=None)
    # Run all the jobs
    task_manager.run_all_jobs(create_jobs, n_cores=16, threads=False, clear_db=True)

If agg_func is specified, it can be one of: mean, median, sum, min, or max. Each results table will then be summarized using the selected aggregation function.

clear_db is used to clear the database tables before all new entries are added

threads (bool): If True, use threads; if False, use processes. For CPU-bound tasks like this one, processes are preferred as they prevent resource contention and blocking inherent to threads.

n_cores (int): Specifies the number of worker cores to use for the task. The workload will be divided among these workers. If the number of cores is large but the number of tasks is small, some scheduling overhead will occur, and workers may remain idle while waiting for available tasks.

Tracking completed jobs

MultiCoreManager API displays the number of completed jobs and percentage of the total submitted, time per simulation(sim) failed jobs (f) elapsed time and seconds per sim

APSIM running![0f] :  |██████████| 100.0%| [100/100]| Complete | 1.07s/sim | Elapsed time: 00:01:46.850

Retrieving results

Results can be loaded to memory by get_simulated_output() or results

df = task_manager.get_simulated_output(axis=0)
# same as
data = task_manager.results  # defaults is axis =0

Results can also be transferred to an sql database or to csv as follows

from sqlite2 import connect
db  = connect(':memory:")
or
db = 'test.db"
task_manager.save_tosql(db_or_con=db, table_name='agg_table', if_exist='replace', chunk_size=1000)
# to scv
task_manager.save_to_csv('test.csv')

csharp or python engine selections

The latest apsimNGpy versions allows the user to select between python or csharp engine as follows. In addition any of the word inputs or payload are accepted while providing the editing data.

Parallel = MultiCoreManager(db_path=db, agg_func='mean', table_prefix='di', )
Parallel = MultiCoreManager(db_path=db, agg_func='mean', table_prefix='di', )
jobs = ({'model': 'Maize', 'ID': i, 'payload': [{'path': '.Simulations.Simulation.Field.Fertilise at sowing',
                                                'Amount': i}]} for i in range(200))
start = time.perf_counter()
Parallel.run_all_jobs(jobs=jobs, n_cores=8, engine='csharp', threads=False, chunk_size=100,
                     subset=['Yield'],
                     progressbar=True)
dff = Parallel.results
      Yield source_table   ID  Amount  MetaProcessID
80   1747.866065       Report    0       0          63672
81   1773.798050       Report    1       1          62028
40   1792.630425       Report    2       2          60976
3    1822.193813       Report    3       3          36152
184  1854.471650       Report    4       4          13056
..           ...          ...  ...     ...            ...
103  5602.499247       Report  195     195          57804
94   5601.896106       Report  196     196          61980
93   5601.294697       Report  197     197          69492
130  5600.687519       Report  198     198          64844
101  5600.078263       Report  199     199          66580
[200 rows x 5 columns]

Note

To specify pure python we use the string python as follows:

Parallel.run_all_jobs(jobs=jobs, n_cores=8, engine='python', threads=False, chunk_size=100,
                      subset=['Yield'],
                      progressbar=True)
Out[4]:
           Yield  MetaProcessID   ID  Amount                   MetaExecutionID
             0   1747.866065           5572    0       0  09f90023a10aa7408899b57e420180b7
             0   1773.798050          60752    1       1                                 1
             0   1792.630425          69532    2       2                                 2
             0   1822.193813          59972    3       3                                 3
             0   1854.471650          47032    4       4                                 4
             ..          ...            ...  ...     ...                               ...
             24  5602.499247          68876  195     195                               195
             24  5601.896106          69532  196     196                               196
             24  5601.294697          69432  197     197                               197
             24  5600.687519          47032  198     198                               198
             24  5600.078263           5572  199     199                               199
             [200 rows x 5 columns]

When no aggregation is applied, the number of rows increases because each simulation contributes multiple records. For example, if each simulation spans 10 years, the resulting DataFrame will contain 10 × 200 = 2,000 rows.

Benchmarking computation speed across the different simulation engines

Batch size

Python (m)

C# (m)

Speedup (×)

100

2:30

1:25

~1.76

200

4:44

2:54

~1.63

300

7:13

4:23

~1.65

400

9:24

5:26

~1.73

500

11:55

6:58

~1.71

m = minutes, C# =csharp

Note

Benchmark results were generated on the following system:

  • Processor: 12th Gen Intel® Core™ i7-12700 @ 2.10 GHz

  • Installed RAM: 32.0 GB (31.7 GB usable)

  • System type: 64-bit operating system, x64-based processor

Tip

Reported speedups are indicative and may vary depending on system hardware, operating system, available memory, number of CPU cores, background workload, and simulation configuration.