Show pageOld revisionsBacklinksExport to PDFBack to top This page is read only. You can view the source, but not change it. Ask your administrator if you think this is wrong. ====== b. Python and Jupyter notebook ====== This tutorial shows how to open ICON output saved in multiple files, setup a distributed computing environment to process the files and read a subset of the output. As an example we will process data from the [[https://swift.dkrz.de/v1/dkrz_34406075a1684be9b56a2669a90730f2/Dyamond_plus/dyamondWinter_html/index.html|Dyamond Winter project]]. It presupposes that you have set up a Jupyter notebook (as explained under [[analysis:postprocessing_icon:0._computing_inf:start|Computing Infrastructure]]). ==== Step 1 : Import all libraries that are needed ==== **Note:** Remember, to be able to import all libraries you'll have to use the //Python 3 unstable// kernel <code python> from getpass import getuser # Libaray to copy things from pathlib import Path # Object oriented libary to deal with paths from tempfile import NamedTemporaryFile, TemporaryDirectory # Creating temporary Files/Dirs from dask_jobqueue import SLURMCluster # Setting up distributed memories via slurm from dask.utils import format_bytes from distributed import Client, progress # Libaray to orchestrate distributed resources import numpy as np # Pythons standard array library import xarray as xr # Libary to work with labeled n-dimensional data </code> <code> import warnings warnings.filterwarnings(action='ignore') </code> ==== Step 2: Setup a distributed computing cluster where we can process the output ==== The data we are going to process is in the order of TB. On a single machine using a single core this can not only be slow but also the fields we are trying to process won't fit into a single computer memory. Therefore we're setting up a distributed cluster using the ''%%dask_jobqueue%%'' library. Specifically we will involve the Slurm workload manager to do that. More information on the ''%%dask_jobqueue%%'' library can be found here: https://jobqueue.dask.org/en/latest/ . To create the slurm cluster we need some information, like the account that is going to be charged and the partition that is going to be used. In this example we are going to use the gpu queue but any other partition can be involved. More information on the hardware queues and their hardware specifications can be found on the [[https://www.dkrz.de/up/systems/mistral/configuration|dkrz user portal]]. <code python> # Set some user specific variables account_name = 'mh0731' # Account that is going to be 'charged' fore the computation queue = 'gpu' # Name of the partition we want to use job_name = 'PostProc' # Job name that is submitted via sbatch memory = "500GiB" # Max memory per node that is going to be used - this depends on the partition cores = 72 # Max number of cores per task that are reserved - also partition dependend walltime = '1:00:00' # Walltime - also partition dependent </code> <code python> scratch_dir = Path('/scratch') / getuser()[0] / getuser() # Define the users scratch dir # Create a temp directory where the output of distributed cluster will be written to, after this notebook # is closed the temp directory will be closed dask_tmp_dir = TemporaryDirectory(dir=scratch_dir, prefix=job_name) cluster = SLURMCluster(memory=memory, cores=cores, project=account_name, walltime=walltime, queue=queue, name=job_name, scheduler_options={'dashboard_address': ':12435'}, local_directory=dask_tmp_dir.name, job_extra=[f'-J {job_name}', f'-D {dask_tmp_dir.name}', f'--begin=now', f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o', f'--output={dask_tmp_dir.name}/LOG_cluster.%j.o' ], interface='ib0') </code> Under the hood the ''%%job_queue%%'' library will create a job script that is going to be submitted via sbatch. Let's have a look at this job-script: <code python> print(cluster.job_script()) </code> <code> #!/usr/bin/env bash #SBATCH -J lab-post #SBATCH -p gpu #SBATCH -A mh0731 #SBATCH -n 1 #SBATCH --cpus-per-task=72 #SBATCH --mem=500G #SBATCH -t 8:00:00 #SBATCH -J PostProc #SBATCH -D /scratch/m/m300765/PostProc9pex9k_y #SBATCH --begin=now #SBATCH --output=/scratch/m/m300765/PostProc9pex9k_y/LOG_cluster.%j.o #SBATCH --output=/scratch/m/m300765/PostProc9pex9k_y/LOG_cluster.%j.o JOB_ID=${SLURM_JOB_ID%;*} /home/mpim/m300765/.anaconda3/bin/python -m distributed.cli.dask_worker tcp://10.50.32.30:46411 --nthreads 72 --memory-limit 536.87GB --name name --nanny --death-timeout 60 --local-directory /scratch/m/m300765/PostProc9pex9k_y --interface ib0 </code> So far nothing has happened, lets order 2 nodes that will give us 1 TB of distributed memory and 144 cores to work on. <code python> cluster.scale(jobs=2) cluster </code> Now we have submitted 2 jobs that establish the distributed computing resources. After some queuing time, depending on how busy the machine is the resources will be available. This can be seen when the above interfaces changes from Workers 0/2 to Workers 2 We can also check the status by calling the squeue command from bash: <code python> ! squeue -u $USER </code> <code> JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 23835542 gpu PostProc m300765 R 1:02 1 mg200 23835543 gpu PostProc m300765 R 1:02 1 mg205 </code> Now that the computing resources are made available we have to connect a dask distributed client to it. This client serves as an interface between the commands we are going to use and the cluster where they will be executed. Let's create the client. This can be done by calling the Client function with the created cluster as an argument. This will tell the dask distributed library to do all calculations on the cluster. <code python> dask_client = Client(cluster) dask_client </code> <code> <Client: 'tcp://10.50.32.30:46411' processes=2 threads=144, memory=1.07 TB> </code> The distributed computing resources are ready, now we can do the actual task of this tutorial: ===== Step 3: Reading the data ===== Let's define the paths and the variables we are going to read. We want to read files from a coupled atmosphere-ocean simulation. The data has a resolution of 5 km, the internal experiment ID is ''%%dpp0016%%''. In this example we will make use of six hourly data which is store in ''%%<exp>_2d_atm_ml_<timestep>.nc%%'' file patterns. Let's define the paths and global pattern that will tell python which files to open: <code python> # Define the paths and get all files data_path = Path('/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/') glob_pattern_2d = 'atm_2d_ml' # Collect all file names with pathlib's rglob and list compression file_names = sorted([str(f) for f in data_path.rglob(f'*{glob_pattern_2d}*.nc')])[1:] file_names[:10] </code> <code> ['/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200120T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200121T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200122T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200123T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200124T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200125T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200126T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200127T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200128T000000Z.nc', '/work/mh0287/k203123/GIT/icon-aes-dyw_albW/experiments/dpp0016/dpp0016_atm_2d_ml_20200129T000000Z.nc'] </code> The above syntax uses the ''%%rglob%%'' method of the data_path object to find all files with a name according to a specified global pattern. We were also using list compression here. That is combining a ''%%for-loop%%'' into one line. Which makes the execution of that loop much faster. Now we use ''%%xarray.mfdataset%%'' to open all the data and inspect their content: <code python> dset = xr.open_mfdataset(file_names, combine='by_coords', parallel=True) dset </code> <code> <xarray.Dataset> Dimensions: (ncells: 20971520, time: 437) Coordinates: * time (time) datetime64[ns] 2020-01-20 2020-01-20T06:00:00 ... 2020-12-05 Dimensions without coordinates: ncells Data variables: ps (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> psl (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rsdt (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rsut (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rsutcs (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rlut (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rlutcs (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rsds (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rsdscs (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rlds (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rldscs (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rsus (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rsuscs (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> rlus (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> ts (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> sic (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> sit (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> clt (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> prlr (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> prls (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> pr (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> prw (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> cllvi (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> clivi (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> qgvi (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> qrvi (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> qsvi (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> hfls (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> hfss (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> evspsbl (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> tauu (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> tauv (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> sfcwind (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> uas (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> vas (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> tas (time, ncells) float32 dask.array<chunksize=(4, 20971520), meta=np.ndarray> Attributes: CDI: Climate Data Interface version 1.8.3rc (http://mpim... Conventions: CF-1.6 number_of_grid_used: 15 grid_file_uri: http://icon-downloads.mpimet.mpg.de/grids/public/mp... uuidOfHGrid: 0f1e7d66-637e-11e8-913b-51232bb4d8f9 title: ICON simulation institution: Max Planck Institute for Meteorology/Deutscher Wett... source: git@gitlab.dkrz.de:icon/icon-aes.git@b582fb87edbd30... history: /work/mh0287/k203123/GIT/icon-aes-dyw_albW/bin/icon... references: see MPIM/DWD publications comment: Sapphire Dyamond (k203123) on m21322 (Linux 2.6.32-... </code> Let's inspect this data a little further: <code python> dset['tas'] </code> <code> <xarray.DataArray 'tas' (time: 437, ncells: 20971520)> dask.array<concatenate, shape=(437, 20971520), dtype=float32, chunksize=(4, 20971520), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 2020-01-20 2020-01-20T06:00:00 ... 2020-12-05 Dimensions without coordinates: ncells Attributes: standard_name: tas long_name: temperature in 2m units: K param: 0.0.0 CDI_grid_type: unstructured number_of_grid_in_reference: 1 </code> <code python> dset['tas'].data </code> <code> dask.array<concatenate, shape=(437, 20971520), dtype=float32, chunksize=(4, 20971520), chunktype=numpy.ndarray> </code> An important concept of xarray and dask is that the data hasn't been read into memory yet. We only see a representation of what the data //will// look like. This is also called a //future//, a very important concept for distributed computing. In our case the representation of the data is a dask array. Dask is a library that can split up the data into chunks and evenly spreads the data chunks across different cpu's and computers. In our example we can see that the surface temperature dataset is split up into 314 chunks. Reading the array would take 942 tasks the total dataset would take 36.66 GB of memory. We can also ask xarray how much memory the whole dataset would consume: <code python> format_bytes(dset.nbytes) </code> <code> '1.32 TB' </code> This means that the total dataset (both experiments) would need 1.32 TB of memory. Way too much for a local computer but we do //only// have 1 TB of distributed memory. Let's reduce the data further by creating daily averages and sub setting by only selecting data until April 2020. <code python> timesf = pd.date_range(start='2020-01-20', periods=dset.time.size ,freq='6H') dset['time'] = timesf dset_subset = dset.sel(time=slice('2020-01-20', '2020-02-29')).resample(time='1D').mean() dset_subset </code> <code> <xarray.Dataset> Dimensions: (ncells: 20971520, time: 72) Coordinates: * time (time) datetime64[ns] 2020-01-20 2020-01-21 ... 2020-03-31 Dimensions without coordinates: ncells Data variables: ps (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> psl (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rsdt (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rsut (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rsutcs (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rlut (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rlutcs (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rsds (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rsdscs (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rlds (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rldscs (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rsus (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rsuscs (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> rlus (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> ts (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> sic (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> sit (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> clt (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> prlr (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> prls (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> pr (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> prw (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> cllvi (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> clivi (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> qgvi (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> qrvi (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> qsvi (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> hfls (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> hfss (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> evspsbl (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> tauu (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> tauv (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> sfcwind (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> uas (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> vas (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> tas (time, ncells) float32 dask.array<chunksize=(1, 20971520), meta=np.ndarray> </code> The ''%%resample%%'' method splits up the data into daily chunks while ''%%mean%%'' applies an average over the chunks. Consequently we get daily data. The same way we could calculate monthly or any kind of arbitrary averages. So far no data has been read. We can trigger reading the data by using the persist method. Persist will start pushing the data to the distributed memory. There the netcdf-files will be read and the data will be copied into memory and the averaging will be done. If no distributed memory is available persist uses the local memory. Please refer to https://distributed.readthedocs.io/en/latest/manage-computation.html#dask-collections-to-futures for more information. We can subset the data a little further by only selecting certain variables: <code python> var_names = ['ts', 'rsus', 'rlus', 'rlds', 'rsdt'] dset_subset = dset_subset[var_names] format_bytes(dset_subset.nbytes) </code> <code> '30.20 GB' </code> Now let's read the data from files and push it to the cluster with the ''%%persist%%'' method: <code python> dset_subset = dset_subset.persist() # Let's inspect the progress progress(dset_subset, notebook=True) </code> <code> [########################################] | 100% Completed | 3 min 36 s </code> We can see the tasks that are worked on in the background on the cluster. We can continue working with the data. Setting the ''%%notebook=False%%'' keyword will block further executions until all calculations are done. The paradigm is that all calculations are collected and not executed until we explicitly instruct xarray / dask to trigger computations.