b. Python and Jupyter notebook

This tutorial shows how to open ICON output saved in multiple files, setup a distributed computing environment to process the files and read a subset of the output. As an example we will process data from the Dyamond Winter project. It presupposes that you have set up a Jupyter notebook (as explained under Computing Infrastructure).

Note: Remember, to be able to import all libraries you'll have to use the Python 3 unstable kernel

from getpass import getuser # Libaray to copy things
from pathlib import Path # Object oriented libary to deal with paths
from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs
 
from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm
from dask.utils import format_bytes
from distributed import Client, progress # Libaray to orchestrate distributed resources
import numpy as np # Pythons standard array library
import xarray as xr # Libary to work with labeled n-dimensional data
import pandas as pd
import warnings
warnings.filterwarnings(action='ignore')

The data we are going to process is in the order of TB. On a single machine using a single core this can not only be slow but also the fields we are trying to process won't fit into a single computer memory. Therefore we're setting up a distributed cluster using the dask_jobqueue library. Specifically we will involve the Slurm workload manager to do that. More information on the dask_jobqueue library can be found here: https://jobqueue.dask.org/en/latest/ .

To create the slurm cluster we need some information, like the account that is going to be charged and the partition that is going to be used. In this example we are going to use the gpu queue but any other partition can be involved. More information on the hardware queues and their hardware specifications can be found on the dkrz user portal.

# Set some user specific variables
account_name = 'mh0731' # Account that is going to be 'charged' fore the computation
queue = 'gpu' # Name of the partition we want to use
job_name = 'PostProc' # Job name that is submitted via sbatch
memory = "500GiB" # Max memory per node that is going to be used - this depends on the partition
cores = 72 # Max number of cores per task that are reserved - also partition dependend
walltime = '1:00:00' # Walltime - also partition dependent
scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir
# Create a temp directory where the output of distributed cluster will be written to, after this notebook
# is closed the temp directory will be closed
dask_tmp_dir = TemporaryDirectory(dir=scratch_dir, prefix=job_name)
cluster = SLURMCluster(memory=memory,
                       cores=cores,
                       project=account_name,
                       walltime=walltime,
                       queue=queue,
                       name=job_name,
                       scheduler_options={'dashboard_address': ':12435'},
                       local_directory=dask_tmp_dir.name,
                       job_extra=[f'-J {job_name}', 
                                  f'-D {dask_tmp_dir.name}',
                                  f'--begin=now',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='ib0')

Under the hood the job_queue library will create a job script that is going to be submitted via sbatch. Let's have a look at this job-script:

print(cluster.job_script())
#!/usr/bin/env bash

#SBATCH -J lab-post
#SBATCH -p gpu
#SBATCH -A mh0731
#SBATCH -n 1
#SBATCH --cpus-per-task=72
#SBATCH --mem=500G
#SBATCH -t 8:00:00
#SBATCH -J PostProc
#SBATCH -D /scratch/m/m300765/PostProc9pex9k_y
#SBATCH --begin=now
#SBATCH --output=/scratch/m/m300765/PostProc9pex9k_y/LOG_cluster.%j.o
#SBATCH --output=/scratch/m/m300765/PostProc9pex9k_y/LOG_cluster.%j.o

JOB_ID=${SLURM_JOB_ID%;*}

/home/mpim/m300765/.anaconda3/bin/python -m distributed.cli.dask_worker tcp://10.50.32.30:46411 --nthreads 72 --memory-limit 536.87GB --name name --nanny --death-timeout 60 --local-directory /scratch/m/m300765/PostProc9pex9k_y --interface ib0

So far nothing has happened, lets order 2 nodes that will give us 1 TB of distributed memory and 144 cores to work on.

cluster.scale(jobs=2)
cluster

Now we have submitted 2 jobs that establish the distributed computing resources. After some queuing time, depending on how busy the machine is the resources will be available. This can be seen when the above interfaces changes from

Workers 0/2

to

Workers 2

We can also check the status by calling the squeue command from bash:

! squeue -u $USER
             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          23835542       gpu PostProc  m300765  R       1:02      1 mg200
          23835543       gpu PostProc  m300765  R       1:02      1 mg205

Now that the computing resources are made available we have to connect a dask distributed client to it. This client serves as an interface between the commands we are going to use and the cluster where they will be executed. Let's create the client. This can be done by calling the Client function with the created cluster as an argument. This will tell the dask distributed library to do all calculations on the cluster.

dask_client = Client(cluster)
dask_client
<Client: 'tcp://10.50.32.30:46411' processes=2 threads=144, memory=1.07 TB>

The distributed computing resources are ready, now we can do the actual task of this tutorial:

Let's define the paths and the variables we are going to read. We want to read files from a coupled atmosphere-ocean simulation. The data has a resolution of 5 km, the internal experiment ID is dpp0016. In this example we will make use of six hourly data which is store in <exp>_2d_atm_ml_<timestep>.nc file patterns. Let's define the paths and global pattern that will tell python which files to open:

# Define the paths and get all files 
data_path = Path('/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/')
glob_pattern_2d = 'atm_2d_ml'
 
# Collect all file names with pathlib's rglob and list compression 
file_names = sorted([str(f) for f in data_path.rglob(f'*{glob_pattern_2d}*.nc')])[1:]
file_names[:10]
['/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200120T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200121T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200122T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200123T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200124T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200125T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200126T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200127T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200128T000000Z.nc',
 '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200129T000000Z.nc']

The above syntax uses the rglob method of the data_path object to find all files with a name according to a specified global pattern. We were also using list compression here. That is combining a for-loop into one line. Which makes the execution of that loop much faster.

Now we use xarray.mfdataset to open all the data and inspect their content:

dset = xr.open_mfdataset(file_names, combine='by_coords', parallel=True)
dset
<xarray.Dataset>
Dimensions:  (ncells: 20971520, time: 437)
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20 2020-01-20T06:00:00 ... 2020-12-05
Dimensions without coordinates: ncells
Data variables:
    ps       (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    psl      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rsdt     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rsut     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rsutcs   (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rlut     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rlutcs   (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rsds     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rsdscs   (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rlds     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rldscs   (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rsus     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rsuscs   (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    rlus     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    ts       (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    sic      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    sit      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    clt      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    prlr     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    prls     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    pr       (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    prw      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    cllvi    (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    clivi    (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    qgvi     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    qrvi     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    qsvi     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    hfls     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    hfss     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    evspsbl  (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    tauu     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    tauv     (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    sfcwind  (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    uas      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    vas      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
    tas      (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray>
Attributes:
    CDI:                  Climate Data Interface version 1.8.3rc (http://mpim...
    Conventions:          CF-1.6
    number_of_grid_used:  15
    grid_file_uri:        http://icon-downloads.mpimet.mpg.de/grids/public/mp...
    uuidOfHGrid:          0f1e7d66-637e-11e8-913b-51232bb4d8f9
    title:                ICON simulation
    institution:          Max Planck Institute for Meteorology/Deutscher Wett...
    source:               git@gitlab.dkrz.de:icon/icon-aes.git@b582fb87edbd30...
    history:              /work/mh0287/k203123/GIT/icon-aes-dyw_albW/bin/icon...
    references:           see MPIM/DWD publications
    comment:              Sapphire Dyamond (k203123) on m21322 (Linux 2.6.32-...

Let's inspect this data a little further:

dset['tas']
<xarray.DataArray 'tas' (time: 437, ncells: 20971520)>
dask.array<concatenate, shape=(437, 20971520), dtype=float32, chunksize=(4, 20971520), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20 2020-01-20T06:00:00 ... 2020-12-05
Dimensions without coordinates: ncells
Attributes:
    standard_name:                tas
    long_name:                    temperature in 2m
    units:                        K
    param:                        0.0.0
    CDI_grid_type:                unstructured
    number_of_grid_in_reference:  1
dset['tas'].data
dask.array<concatenate, shape=(437, 20971520), dtype=float32, chunksize=(4, 20971520), chunktype=numpy.ndarray>

An important concept of xarray and dask is that the data hasn't been read into memory yet. We only see a representation of what the data will look like. This is also called a future, a very important concept for distributed computing. In our case the representation of the data is a dask array. Dask is a library that can split up the data into chunks and evenly spreads the data chunks across different cpu's and computers. In our example we can see that the surface temperature dataset is split up into 314 chunks. Reading the array would take 942 tasks the total dataset would take 36.66 GB of memory. We can also ask xarray how much memory the whole dataset would consume:

format_bytes(dset.nbytes)
'1.32 TB'

This means that the total dataset (both experiments) would need 1.32 TB of memory. Way too much for a local computer but we do only have 1 TB of distributed memory. Let's reduce the data further by creating daily averages and sub setting by only selecting data until April 2020.

timesf = pd.date_range(start='2020-01-20', periods=dset.time.size ,freq='6H')
dset['time'] = timesf
dset_subset = dset.sel(time=slice('2020-01-20', '2020-02-29')).resample(time='1D').mean()
dset_subset
<xarray.Dataset>
Dimensions:  (ncells: 20971520, time: 72)
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20 2020-01-21 ... 2020-03-31
Dimensions without coordinates: ncells
Data variables:
    ps       (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    psl      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsdt     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsut     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsutcs   (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rlut     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rlutcs   (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsds     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsdscs   (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rlds     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rldscs   (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsus     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsuscs   (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rlus     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    ts       (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    sic      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    sit      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    clt      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    prlr     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    prls     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    pr       (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    prw      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    cllvi    (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    clivi    (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    qgvi     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    qrvi     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    qsvi     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    hfls     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    hfss     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    evspsbl  (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    tauu     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    tauv     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    sfcwind  (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    uas      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    vas      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    tas      (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>

The resample method splits up the data into daily chunks while mean applies an average over the chunks. Consequently we get daily data. The same way we could calculate monthly or any kind of arbitrary averages.

So far no data has been read. We can trigger reading the data by using the persist method. Persist will start pushing the data to the distributed memory. There the netcdf-files will be read and the data will be copied into memory and the averaging will be done. If no distributed memory is available persist uses the local memory. Please refer to https://distributed.readthedocs.io/en/latest/manage-computation.html#dask-collections-to-futures for more information.

We can subset the data a little further by only selecting certain variables:

var_names = ['ts', 'rsus', 'rlus', 'rlds', 'rsdt']
dset_subset = dset_subset[var_names]
format_bytes(dset_subset.nbytes)
'30.20 GB'

Now let's read the data from files and push it to the cluster with the persist method:

dset_subset = dset_subset.persist()
# Let's inspect the progress
progress(dset_subset, notebook=True)
[########################################] | 100% Completed |  3 min 36 s

We can see the tasks that are worked on in the background on the cluster. We can continue working with the data. Setting the notebook=False keyword will block further executions until all calculations are done. The paradigm is that all calculations are collected and not executed until we explicitly instruct xarray / dask to trigger computations.