I show you how to use Python for compressing MPI-ESM output. You only need to think in two things: what do you want to store and how much MISTRAL resources you will use for that end. For the distribution of the compression, I use the ideas given in the Reserve and Use method.
Search only in this Namespaces below. For a global search, use the field in the upper right corner. More tips: how_to_use_the_wikisearch
MPI-ESM have several streams of output that come from four submodels: ECHAM, JSBACH, MPIOM and HAMOCC. Depending on your application, the model output can be massive. For storing in HPSS tape system, DKRZ recommends that each file you want to upload should be larger than 10 Gb but lesser than 500 Gb. If they are less than 1 Gb, they count as 1 Gb for your project's quota. Thus, you should be careful of what you want to store.
Therefore, before any compression, you should think about what you need (or what you will need) from the output data. Once you know that, you can check the size of your dataset by using commands such as du
or ls
. Knowing that enables you to prepare the data for storage. I offer you the following scripts to make the task as simple as possible.
The main script is extensible yet straightforward. I describe the different sections in the following. In the end, I offer the full script. First, we need to import the following modules
import os import subprocess as sp from dask.distributed import Client
The os
module allows us to perform some operative and file system tasks, such as directory changing. The subprocess
module makes possible to launch subprocesses to run programs outside the python session, in particular, shell programs. The dask.distributed
module enables the use of MISTRAL to distribute and parallelise computations. About this last point, the script is a direct application of the Reserve and Use method.
Once we have imported the modules, we first connect to the computing resources of MISTRAL (section 0 of the code). In the following sections, I will talk about it. Thus, for now, let us give it for granted.
# O. Acquire the reserved resources in the cluster client=Client(scheduler_file=('/scratch/x/x000000/dask-work/Storage3d'+ '/scheduler_Storage3d.json' ) ) print(client)
With the resources secured, we now define the paths to the model output and to the temporary space where we keep the compressed files (section I). I provide dummy paths. I strongly recommend that the temporary space is in /scratch
, because of the large amounts of disk space that the compressed files occupy.
# I. Definition of paths to the data and the scratch space exps_path=os.path.abspath(("/work/yy0000/x000000/"+ "mpiesm-1.2.01p1/experiments" ) )+"/" outs_path=os.path.abspath("/scratch/x/x000000")+"/foobar/experiments/"
Now, in section II, we set some variables that will describe which experiments we will process, which streams of the output, and which substreams of files. The assumption here is that the experiments directory has the following structure: <experiment>/outdata/<stream>/<experiment>_<stream>_<substream>_<date>.<extension>
. exps
is the list of experiment names. strms
define the output stream list. An output stream corresponds to the output corresponding to each component of the model. The substreams, defined in sstrms
dictionary, define subsets of the output. In this example, I have only sixteen experiments. I have four streams, but I commented all but mpiom
stream. For the substream
dictionary, I registered all the substreams of the echam6
, jsbach
and mpiom
streams, but I only want to process the “data_3d_mm”
substream and, thus, I commented all of the other mpiom
substreams.
# II. Definition of experiments, streams and substreams exps=["djc_cntrl_stnd","djc_cnt4x.0.1_stnd"] # Experiment names for i in range(7): exps.append("djc_cld4x.{0}.1_stnd".format(i)) exps.append("djc_cldfc.{0}_stnd".format(i)) strms=[#"echam6", #"jsbach", #"hamocc", "mpiom" ] # Streams of output sstrms={ "echam6" : ["ATM_mm", "BOT_mm", "LOG_mm", "echam", "accw", "co2", "co2_mm" ], "jsbach" : ["jsbach_mm", "land_mm", "nitro_mm", "surf_mm", "veg_mm", "yasso_mm" ], "mpiom" : [#"data_2d_3h", #"data_2d_dmax", #"data_2d_dm", #"data_2d_mm", #"data_2d_mmax", #"data_2d_mmin", #"data_2d_ym", "data_3d_mm"#, #"data_3d_ym", #"data_moc_mm", #"monitoring_ym", #"timeser_mm", #"fx", #"map" ] } # Dictionary with substreams for each output # III. Strings with evaluable fields used in the helper function path="{0}{1}/{2}" file="{0}_{1}.tar.gz"
In section III, we define two strings with evaluable fields that we will use in the helper function, which is section IV of the code. The helper function is the core of the script. It is the function that will process the files. That is why I called it tar_and_zip_output
. It has five inputs, the path to the experiments, the path to the compressed output, the name of the experiment, and the name of the stream-substream pair to be processed. The first step is to construct the string that contains the path to the outdata
directory where we store the data, as well as the outdata
directory where the compressed files will be stored. Then, we make the latter directory if it does not exist yet. The third step is to add the file name to the path of the compressed output with the following structure: <output_path>/<experiment>/outdata/<stream>_<substream>.tar.gz
. The fourth step begins with changing to the experiment output directory, apply a find
operation and some filters to obtain the list of files to compress. After that we apply a tar
to them and pipe the result to a pigz -9
command to compress with the highest compression rate. The function returns, after a successful compression, the path to the output.
# IV. Helper function def tar_and_zip_output(experiments_path, output_path, experiment, stream, substream ): # 1 Path construction path_to_files=path.format(experiments_path,experiment,"outdata") path_to_output=path.format(output_path,experiment,"outdata") # 2 Construct output directory if missing command=["mkdir","-p",path_to_output] sp.run(command,check=True) # 3 Add the file name to the output path path_to_output+="/" path_to_output+=file.format(stream,substream) # 4 tar and zip all files of the stream-substream pair os.chdir(path_to_files) with open(path_to_output,"w") as out: pattern=experiment+"_"+stream+"_"+substream+"_*" # Pattern to find c0=["find",stream,"-type","f","-name",pattern] # Find command c1=["tar","cvf","-"] # Tar command c2=["pigz","-9"] # Parallel gzip command with the maximum compression level r0=sp.run(c0,check=True,capture_output=True,text=True).stdout # Find r0=r0.split("\n")[:-1] # Convert output to list and cut the last element r0.sort() # Order the list r0=r0[-500:] # Cut the list to only up to 500 files [ c1.append(i) for i in r0 ] # Append the file list to the tar command r1=sp.Popen(c1,stdout=sp.PIPE) # Execute tar and pipe the output... r2=sp.Popen(c2,stdin=r1.stdout,stdout=out) # ...to the pigz command r1.stdout.close() # When pigz ends, close stdout of tar r3=r2.communicate()[0] # Check whether pigz have ended or not return path_to_output
The next step depends heavily on i) the MISTRAL resources that your group have and ii) How fast do you want to have the result. If you answer these two questions, then you know how many resources you will ask for and how you will set section V of the following code. In the example that I provide, I process only one substream of mpiom
for each of the sixteen experiments. Then, I can afford to reserve sixteen nodes of MISTRAL (later I show you how) and process the sixteen substreams in parallel. For doing that, the code first defines two dictionaries, one of the tasks and one of the results: list_tasks
and list_results
. Then I cycle throughout the experiments, create the subdictionaries list_tasks[exp]
and list_results[exp]
. Then I cycle on the streams and create a list for each stream. The list submits the helper function with the corresponding arguments for the experiment, stream and substream. With this for-loop, we will send concurrently sixteen tasks to the cluster. There is another for-loop that waits for the sixteen tasks to be completed. If you want another order, you should plan where to wait for the results.
# V. Parallel compilation of files # In this case, the settings of the workers are for sixteen nodes. One node for # each experiment. There is only a single substream for each stream and only a # stream for each experiment. Then we parallelise at the level of experiments. # For that reason, we gather the results in a separate for-loop. list_tasks={} # Dictionary of tasks list_results={} # Dictionary of results for exp in exps: # Cycle throughout experiments... print("Concatenating experiment {}...".format(exp)) list_tasks[exp]={} # Initialize the dictionary of the exp-stream pair list_results[exp]={} for strm in strms: # Cycle throughout the streams... print(" Concatenating stream {}...".format(strm)) list_tasks[exp][strm]=[ client.submit(tar_and_zip_output, exps_path, outs_path, exp, strm, sstrm, pure=False ) for sstrm in sstrms[strm] ] for exp in exps: for strm in strms: list_results[exp][strm]=[ i.result() for i in list_tasks[exp][strm] ]
The last two sections of the code (VI and VII) are only for closing the connection to the MISTRAL resources and cancelling the processes.
# VI. Relinquish control of cluster reserved resources client.close() print(client) # VII. Cancel the processes in the cluster command=["scancel","-u","x000000","--name","dask-workers-Storage3d"] sp.run(command,check=True) command=["scancel","-u","x000000","--name","dask-scheduler-Storage3d"] sp.run(command,check=True)
import os import subprocess as sp from dask.distributed import Client # O. Acquire the reserved resources in the cluster client=Client(scheduler_file=('/scratch/x/x000000/dask-work/Storage3d'+ '/scheduler_Storage3d.json' ) ) print(client) # I. Definition of paths to the data and the scratch space exps_path=os.path.abspath(("/work/yy0000/x000000/"+ "mpiesm-1.2.01p1/experiments" ) )+"/" outs_path=os.path.abspath("/scratch/x/x000000")+"/foobar/experiments/" # II. Definition of experiments, streams and substreams exps=["djc_cntrl_stnd","djc_cnt4x.0.1_stnd"] # Experiment names for i in range(7): exps.append("djc_cld4x.{0}.1_stnd".format(i)) exps.append("djc_cldfc.{0}_stnd".format(i)) strms=[#"echam6", #"jsbach", #"hamocc", "mpiom" ] # Streams of output sstrms={ "echam6" : ["ATM_mm", "BOT_mm", "LOG_mm", "echam", "accw", "co2", "co2_mm" ], "jsbach" : ["jsbach_mm", "land_mm", "nitro_mm", "surf_mm", "veg_mm", "yasso_mm" ], "mpiom" : [#"data_2d_3h", #"data_2d_dmax", #"data_2d_dm", #"data_2d_mm", #"data_2d_mmax", #"data_2d_mmin", #"data_2d_ym", "data_3d_mm"#, #"data_3d_ym", #"data_moc_mm", #"monitoring_ym", #"timeser_mm", #"fx", #"map" ] } # Dictionary with substreams for each output # III. Strings with evaluable fields used in the helper function path="{0}{1}/{2}" file="{0}_{1}.tar.gz" # IV. Helper function def tar_and_zip_output(experiments_path, output_path, experiment, stream, substream ): # 1 Path construction path_to_files=path.format(experiments_path,experiment,"outdata") path_to_output=path.format(output_path,experiment,"outdata") # 2 Construct output directory if missing command=["mkdir","-p",path_to_output] sp.run(command,check=True) # 3 Add the file name to the output path path_to_output+="/" path_to_output+=file.format(stream,substream) # 4 tar and zip all files of the stream-substream pair os.chdir(path_to_files) with open(path_to_output,"w") as out: pattern=experiment+"_"+stream+"_"+substream+"_*" # Pattern to find c0=["find",stream,"-type","f","-name",pattern] # Find command c1=["tar","cvf","-"] # Tar command c2=["pigz","-9"] # Parallel gzip command with the maximum compression level r0=sp.run(c0,check=True,capture_output=True,text=True).stdout # Find r0=r0.split("\n")[:-1] # Convert output to list and cut the last element r0.sort() # Order the list r0=r0[-500:] # Cut the list to only up to 500 files [ c1.append(i) for i in r0 ] # Append the file list to the tar command r1=sp.Popen(c1,stdout=sp.PIPE) # Execute tar and pipe the output... r2=sp.Popen(c2,stdin=r1.stdout,stdout=out) # ...to the pigz command r1.stdout.close() # When pigz ends, close stdout of tar r3=r2.communicate()[0] # Check whether pigz have ended or not return path_to_output # V. Parallel compilation of files # In this case, the settings of the workers are for sixteen nodes. One node for # each experiment. There is only a single substream for each stream and only a # stream for each experiment. Then we parallelise at the level of experiments. # For that reason, we gather the results in a separate for-loop. list_tasks={} # Dictionary of tasks list_results={} # Dictionary of results for exp in exps: # Cycle throughout experiments... print("Concatenating experiment {}...".format(exp)) list_tasks[exp]={} # Initialize the dictionary of the exp-stream pair list_results[exp]={} for strm in strms: # Cycle throughout the streams... print(" Concatenating stream {}...".format(strm)) list_tasks[exp][strm]=[ client.submit(tar_and_zip_output, exps_path, outs_path, exp, strm, sstrm, pure=False ) for sstrm in sstrms[strm] ] for exp in exps: for strm in strms: list_results[exp][strm]=[ i.result() for i in list_tasks[exp][strm] ] # VI. Relinquish control of cluster reserved resources client.close() print(client) # VII. Cancel the processes in the cluster command=["scancel","-u","x000000","--name","dask-workers-Storage3d"] sp.run(command,check=True) command=["scancel","-u","x000000","--name","dask-scheduler-Storage3d"] sp.run(command,check=True)
To reserve the resources, you need to follow the steps described in the Reserve and Use method. In this case, I reserved one node in the shared
partition for the scheduler. I also reserve sixteen nodes in the compute2
partition. In both cases, I only allow one task to run in each node. The scheduler node manages the sixteen worker nodes and sends your requests to them. Then, when I submit the requests in section V of the Python code, I am sending the instructions to the workers. As I said, if you want more details, you should look at the Reserve and Use method. In the following, I provide the scripts to reserve the nodes.
I recommend that you have a specific dask-work directory (possibly in scratch) as a place to store all the log files of the dask.distributed
operations. The directory should also have several subdirectories for each dask processing task. In case there is a problem, you will debug it better and faster.
#!/bin/bash #SBATCH --account=yy0000 #SBATCH --job-name=dask-scheduler-Storage3d #SBATCH --workdir=/scratch/x/x000000/dask-work/Storage3d #SBATCH --output=/scratch/x/x000000/dask-work/Storage3d/LOG_dask.%j.o #SBATCH --error=/scratch/x/x000000/dask-work/Storage3d/LOG_dask.%j.o #SBATCH --time=08:00:00 #SBATCH --partition=shared #SBATCH --nodes=1 #SBATCH --ntasks-per-node=1 rm -rf worker-* rm *.o rm scheduler*.json rm global.lock rm purge.lock module purge module load pigz export PATH=/work/yy0000/x000000/miniconda3/bin:$PATH conda activate glamdring srun dask-scheduler --scheduler-file /scratch/x/x000000/dask-work/Storage3d/scheduler_Storage3d.json
#!/bin/bash #SBATCH --account=yy0000 #SBATCH --job-name=dask-workers-Storage3d #SBATCH --workdir=/scratch/x/x000000/dask-work/Storage3d #SBATCH --output=/scratch/x/x000000/dask-work/Storage3d/LOG_dask.%j.o #SBATCH --error=/scratch/x/x000000/dask-work/Storage3d/LOG_dask.%j.o #SBATCH --time=08:00:00 #SBATCH --partition=compute2 #SBATCH --nodes=16 #SBATCH --ntasks-per-node=1 module purge module load pigz export PATH=/work/yy0000/x000000/miniconda3/bin:$PATH conda activate glamdring srun dask-worker --scheduler-file /scratch/x/x000000/dask-work/Storage3d/scheduler_Storage3d.json
The first thing is that you would want to first use screen
(or tmux
) to create a shell session that you can detach without killing your long-lived processing. Inside your screen
session, you submit to Slurm your request for the scheduler node.
sbatch launch_scheduler_Storage3d.sh
After some time it should be online, then you run the request for workers.
sbatch launch_workers_Storage3d.sh
Once the workers are online, they will connect to the scheduler. Then you can run your script
python Storage3d.py
Then you can wait and do other things. As an example, for the task that I described, it took one hour to compress 16 Tb of data (1 Tb for each experiment). Finally, I only remark that this script is only the solution when you have already answered the two fundamental questions!
Remark: In the Slurm scripts there are lines for putting in the search path a miniconda
distribution and initialising an environment called glamdring
. If you are using the anaconda3
from the system, just delete those lines and use module
command to load anaconda3
. If you are using your own miniconda
distribution, you should change those lines and load the proper environment.