b. Python and Jupyter notebook

Although methods exist that can plot unstructured ICON data from its native data layout (https://psyplot.readthedocs.io/projects/psy-maps/en/latest/examples/example_ugrid.html), these methods become very slow with increasing resolution. The most common approach is to process the data as much as possible and remap the processed field variables to regular latitude-longitude mesh for displaying the data. This tutorial shows how this can be achieved using distributed computing resources.

Again import the libraries we are going to use. Remember to use the Python 3 unstable kernel to be able to import all libraries:

from getpass import getuser # Libaray to copy things
from pathlib import Path # Object oriented libary to deal with paths
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs
from subprocess import run, PIPE
import sys
 
import dask # Distributed data libary
from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm
from distributed import Client, progress, wait # Libaray to orchestrate distributed resources
import xarray as xr # Libary to work with labeled n-dimensional data and dask
import warnings
warnings.filterwarnings(action='ignore')

This step is covered in great detail in the Open/Read section.

# Set some user specific variables
scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir
# Create a temp directory where the output of distributed cluster will be written to, after this notebook
# is closed the temp directory will be closed
dask_tmp_dir = TemporaryDirectory(dir=scratch_dir, prefix='PostProc')
cluster = SLURMCluster(memory='500GiB',
                       cores=72,
                       project='mh0731',
                       walltime='1:00:00',
                       queue='gpu',
                       name='PostProc',
                       scheduler_options={'dashboard_address': ':12435'},
                       local_directory=dask_tmp_dir.name,
                       job_extra=[f'-J PostProc', 
                                  f'-D {dask_tmp_dir.name}',
                                  f'--begin=now',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='ib0')
cluster.scale(jobs=2)
dask_client = Client(cluster)
dask_client.wait_for_workers(18)
data_path = Path('/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/')
glob_pattern_2d = 'atm_2d_ml'
 
# Collect all file names with pathlib's rglob and list compressions 
file_names = sorted([str(f) for f in data_path.rglob(f'*{glob_pattern_2d}*.nc')])[1:]
dset = xr.open_mfdataset(file_names, combine='by_coords', parallel=True)
dset_subset = dset.sel(time=slice('2020-01-20', '2020-11-30')).resample(time='1D').mean()
var_names = ['ts', 'rsus', 'rlus', 'rlds', 'rsdt', 'hfls', 'hfss']
dset_subset = dset_subset[var_names].persist()
dset_subset
<xarray.Dataset>
Dimensions:  (ncells: 20971520, time: 72)
Coordinates:
  * time     (time) datetime64[ns] 2020-01-20 2020-01-21 ... 2020-03-31
Dimensions without coordinates: ncells
Data variables:
    ts       (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsus     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rlus     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rlds     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    rsdt     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    hfls     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
    hfss     (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray>
progress(dset_subset, notebook=False)
[########################################] | 100% Completed | 51.6s

To quickly recap we've gotten all 6 hourly single level data files and opened them in a combined virtual view using xarray. We then created daily averages using the resample method. We then pushed the data to the distributed memory, where it is available for processing.

Before regridding we are going to create averages along the field and time dimensions for later visualization use. Averages along the field dimensions will result in a time series. We obviously don't need to regrid this data. But the averages along the time dimensions will have to be regridded. Since ICON data is stored on a equal area triangular mesh we do not have to care about weighting when averaging on the native grid. Applying averages is easy to achieve using the mean method

# Create averges along the time and field dimensions
time_mean = dset_subset.mean(dim='time').persist()
field_mean = dset_subset.mean(dim='ncells').persist()

We have immediately triggered the calculation on the cluster using the persist method. We can watch the status of the calculation with progress.

progress([time_mean, field_mean], notebook=False)
[########################################] | 100% Completed | 29.5s

Since the data is already on the distributed memory the calculation is quite fast in our case just under 30 seconds.

Since we are going to use distance weighted remapping with CDO, we need a target grid description and a weight file. The target grid description is a text file that contains the information of the regular latitude-longitude grid. Let's assume that we want to create a grid of 0.1 x 0.1 degrees. The target grid file would look likes this:

#
# gridID 1
#
gridtype  = lonlat
gridsize  = 6480000
xsize     = 3600
ysize     = 1800
xname     = lon
xlongname = "longitude"
xunits    = "degrees_east"
yname     = lat
ylongname = "latitude"
yunits    = "degrees_north"
xfirst    = -179.95
xinc      = 0.1
yfirst    = -89.95
yinc      = 0.1

instead of hard coding this grid information we can create a function that calculates the grid information:

def get_griddes(y_res, x_res, x_first=-180, y_first=-90):
    """Create a description for a regular global grid at given x, y resolution."""
 
    xsize = 360 / x_res
    ysize = 180 / y_res
    xfirst = -180 + x_res / 2
    yfirst = -90 + x_res / 2
 
    return f'''
#
# gridID 1
#
gridtype  = lonlat
gridsize  = {int(xsize * ysize)}
xsize     = {int(xsize)}
ysize     = {int(ysize)}
xname     = lon
xlongname = "longitude"
xunits    = "degrees_east"
yname     = lat
ylongname = "latitude"
yunits    = "degrees_north"
xfirst    = {xfirst}
xinc      = {x_res}
yfirst    = {yfirst}
yinc      = {y_res}
 
 
    '''

The best way to remap unstructured data is a weighted remap. We do not have a weight file let's create one first. For this we define a function that will call the cdo gendis command. To execute the function on the cluster we use dask.delayed to tell the code it should be executed remotely. We will create another function run_cmd that executes a shell command.

@dask.delayed
def gen_dis(dataset, xres, yres, gridfile):
    '''Create a distance weights using cdo.'''
    scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir
    with TemporaryDirectory(dir=scratch_dir, prefix='Weights_') as td:
        in_file = Path(td) / 'in_file.nc'
        weightfile = Path(td) / 'weight_file.nc'
        griddes = Path(td) / 'griddes.txt'
        with griddes.open('w') as f:
            f.write(get_griddes(xres, yres))
        dataset.to_netcdf(in_file, mode='w') # Write the file to a temorary netcdf file
        cmd = ('cdo', '-O', f'gendis,{griddes}', f'-setgrid,{gridfile}', str(in_file), str(weightfile))
        run_cmd(cmd)
        df = xr.open_dataset(weightfile).load()
        wait(df)
        return df
 
def run_cmd(cmd, path_extra=Path(sys.exec_prefix)/'bin'):
    '''Run a bash command.'''
    env_extra = os.environ.copy()
    env_extra['PATH'] = str(path_extra) + ':' + env_extra['PATH']
    status = run(cmd, check=False, stderr=PIPE, stdout=PIPE, env=env_extra)
    if status.returncode != 0:
        error = f'''{' '.join(cmd)}: {status.stderr.decode('utf-8')}'''
        raise RuntimeError(f'{error}')
    return status.stdout.decode('utf-8')

Dask collects the future results before executing them. This way we can setup a task tree. For example we could now define a function that gets the output of gen_dis and applies the remapping.

@dask.delayed
def remap(dataset, x_res, y_res, weights, gridfile):
    """Perform a weighted remapping.
 
    Parameters
    ==========
 
    dataset : xarray.dataset
        The dataset that will be regridded
    griddes : Path, str
        Path to the grid description file
    weights : xarray.dataset
        Distance weights
 
    Returns
    =======
    xarray.dataset : Remapped dataset
    """
    if isinstance(dataset, xr.DataArray):
        # If a dataArray is given create a dataset
        dataset = xr.Dataset(data_vars={dataset.name: dataset})
    scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir
    with TemporaryDirectory(dir=scratch_dir, prefix='Remap_') as td:
        infile = Path(td) / 'input_file.nc'
        weightfile = Path(td) / 'weight_file.nc'
        griddes = Path(td) / 'griddes.txt'
        outfile = Path(td) / 'remaped_file.nc'
        with griddes.open('w') as f:
            f.write(get_griddes(x_res, y_res))
        dataset.to_netcdf(infile, mode='w') # Write the file to a temorary netcdf file
        weights.to_netcdf(weightfile, mode='w')
        cmd = ('cdo', '-O', f'remap,{griddes},{weightfile}', f'-setgrid,{gridfile}',
               str(infile), str(outfile))
        run_cmd(cmd)
        return xr.open_dataset(outfile).load()

Calling only thegen_dis function defined in step 2 will not trigger any computation. But first we need a grid description file of our dpp0016 simulation:

grid_file = '/pool/data/ICON/grids/public/mpim/0015/icon_grid_0015_R02B09_G.nc'
weights_future = gen_dis(time_mean, 0.5, 0.5, grid_file)
weights_future
Delayed('gen_dis-379f0fee-deb2-480e-8527-82914b9de2a8')
remap_futures = []
# Process each variable in parallel.
for var_name in time_mean.data_vars:
    remap_futures.append(remap(time_mean[var_name], 0.5, 0.5, weights_future, grid_file))
remap_futures
[Delayed('remap-bf89d25d-1bdc-4eb9-b0ad-3a40ac2c328d'),
 Delayed('remap-d8576790-d46a-4d3c-b4d9-05041720c17c'),
 Delayed('remap-76e0316f-0ab5-473c-b353-df99a69647a3'),
 Delayed('remap-7787725d-98f4-4b45-9edf-b3a019c91401'),
 Delayed('remap-aaee97d7-3cc8-41c6-81ec-b6e7c22d2438'),
 Delayed('remap-8d0c091f-92d4-4705-8e0d-a4b2d14de6b1'),
 Delayed('remap-f4d38530-1477-460d-9e7f-98ab22dbeefc')]

The remap futures list if a collection of task that will be executed, nothing has been executed yet. The execution of each task depends on the outcome of gen_dis. Dask will take care of this when we execute the tasks. Calling the dask.persist will trigger the computation in the cluster in parallel:

remap_jobs = dask.persist(remap_futures)
progress(remap_jobs, notebook=False)
[########################################] | 100% Completed |  30.5s

Now we just have to merge the results back together into one dataset

time_mean_remap = xr.merge(list(dask.compute(*remap_futures)))
time_mean_remap
<xarray.Dataset>
Dimensions:  (lat: 360, lon: 720)
Coordinates:
  * lon      (lon) float64 -179.8 -179.2 -178.8 -178.2 ... 178.8 179.2 179.8
  * lat      (lat) float64 -89.75 -89.25 -88.75 -88.25 ... 88.75 89.25 89.75
Data variables:
    ts       (lat, lon) float32 232.50095 232.49681 ... 237.39883 237.39789
    rsus     (lat, lon) float32 130.34554 130.34743 ... 2.7430184 2.7431903
    rlus     (lat, lon) float32 165.59589 165.58539 ... 180.20906 180.20578
    rlds     (lat, lon) float32 120.99219 120.99084 ... 157.13567 157.13335
    rsdt     (lat, lon) float32 225.16904 225.16885 ... 7.7745104 7.774646
    hfls     (lat, lon) float32 -0.0069637517 -0.006892924 ... -1.5673956
    hfss     (lat, lon) float32 5.974602 5.947811 ... 1.8291237 1.8200014

Finally we are going to save the remapped data along with the time series (field_mean) to a new netcdf file for further processing in the next session about visualizing. To do this we will use two different groups one for the time series and one for the remapped data:

# 1 Save the time-series
out_file = Path(scratch_dir) / 'dpp0016_PreProc.nc'
field_mean.to_netcdf(out_file, mode='w', group='field_mean')
time_mean_remap.to_netcdf(out_file, mode='a', group='time_mean')