import os import subprocess as sp from dask.distributed import Client # O. Acquire the reserved resources in the cluster client=Client(scheduler_file=('/scratch/x/x000000/dask-work/Restarts'+ '/scheduler_Restarts.json')) print(client) # I. Definition of paths to the data and to the scratch space exps_path=os.path.abspath(("/work/yy0000/x000000/"+ "mpiesm-1.2.01p1-fixvar/experiments"))+"/" outs_path=os.path.abspath("/scratch/x/x000000")+"/foobar/experiments/" # II. Definition of experiments paths exps=["djc_cntrl_stnd"] # Experiment names for i in range(7): exps.append("djc_cldfc.{0}_stnd".format(i)) strms=["echam6", "hamocc", "jsbach", "mpiom", "oasis3mct" ] # Streams of restarts # III. Strings with evaluable fields used in helper functions path="{0}{1}/{2}" file="{0}_{1}.tar.gz" # IV. Helper function def tar_and_zip_restarts(experiments_path, output_path, experiment, stream ): # 1 Path construction path_to_files=path.format(experiments_path,experiment,"restart") path_to_output=path.format(output_path,experiment,"restart") # 2 Construct output directory if missing command=["mkdir","-p",path_to_output] # Command sp.run(command,check=True) # Execute the command # 3 Add file name to the output path path_to_output+="/" path_to_output+=file.format(stream,"restarts") # 4 concatenate and compress all files in the output file os.chdir(path_to_files) # Change to the input directory with open(path_to_output,"w") as out: # With the output file opened... c1=["tar","cvf","-",stream] # Tar command c2=["pigz"] # Parallel compression command r1=sp.Popen(c1,stdout=sp.PIPE) # Execute tar and pipe the output... r2=sp.Popen(c2,stdin=r1.stdout,stdout=out) # ...to the pigz command r1.stdout.close() # When pigz ends, close stdout of tar r3=r2.communicate()[0] # Check whether pigz have ended or not # 5 Return the full path to the output return path_to_output # V. Parallel compilation of files # In this case the settings of the workers are for five nodes. One node for # each stream. We parallelise at the level of streams. We gather the results # in the same for cycle, which means that it will wait until all the streams # of a given experiment are processed to proceed to the next experiment. list_tasks={} # Dictionary of tasks list_results={} # Dictionary of results for exp in exps: # Cycle throughout experiments... print("Storing restarts of experiment {}...".format(exp)) list_tasks[exp]=[ client.submit(tar_and_zip_restarts, exps_path, outs_path, exp, strm, pure=False ) for strm in strms ] # Submit to the cluster for each stream list_results[exp]=[ i.result() for i in list_tasks[exp] ] # Wait # VI. Relinquish control of cluster reserved resources client.close() # Close the connection to the scheduler print(client) # Print to check if it was disconnected # VII. Cancel the processes in the cluster command=["scancel","-u","x000000","--name","dask-workers-Restarts"] sp.run(command,check=True) # First kill workers command=["scancel","-u","x000000","--name","dask-scheduler-Restarts"] sp.run(command,check=True) # Then the scheduler