import os import subprocess as sp from dask.distributed import Client # O. Acquire the reserved resources in the cluster client=Client(scheduler_file=('/scratch/x/x000000/dask-work/Storage3d'+ '/scheduler_Storage3d.json' ) ) print(client) # I. Definition of paths to the data and the scratch space exps_path=os.path.abspath(("/work/yy0000/x000000/"+ "mpiesm-1.2.01p1/experiments" ) )+"/" outs_path=os.path.abspath("/scratch/x/x000000")+"/foobar/experiments/" # II. Definition of experiments, streams and substreams exps=["djc_cntrl_stnd","djc_cnt4x.0.1_stnd"] # Experiment names for i in range(7): exps.append("djc_cld4x.{0}.1_stnd".format(i)) exps.append("djc_cldfc.{0}_stnd".format(i)) strms=[#"echam6", #"jsbach", #"hamocc", "mpiom" ] # Streams of output sstrms={ "echam6" : ["ATM_mm", "BOT_mm", "LOG_mm", "echam", "accw", "co2", "co2_mm" ], "jsbach" : ["jsbach_mm", "land_mm", "nitro_mm", "surf_mm", "veg_mm", "yasso_mm" ], "mpiom" : [#"data_2d_3h", #"data_2d_dmax", #"data_2d_dm", #"data_2d_mm", #"data_2d_mmax", #"data_2d_mmin", #"data_2d_ym", "data_3d_mm"#, #"data_3d_ym", #"data_moc_mm", #"monitoring_ym", #"timeser_mm", #"fx", #"map" ] } # Dictionary with substreams for each output # III. Strings with evaluable fields used in the helper function path="{0}{1}/{2}" file="{0}_{1}.tar.gz" # IV. Helper function def tar_and_zip_output(experiments_path, output_path, experiment, stream, substream ): # 1 Path construction path_to_files=path.format(experiments_path,experiment,"outdata") path_to_output=path.format(output_path,experiment,"outdata") # 2 Construct output directory if missing command=["mkdir","-p",path_to_output] sp.run(command,check=True) # 3 Add the file name to the output path path_to_output+="/" path_to_output+=file.format(stream,substream) # 4 tar and zip all files of the stream-substream pair os.chdir(path_to_files) with open(path_to_output,"w") as out: pattern=experiment+"_"+stream+"_"+substream+"_*" # Pattern to find c0=["find",stream,"-type","f","-name",pattern] # Find command c1=["tar","cvf","-"] # Tar command c2=["pigz","-9"] # Parallel gzip command with the maximum compression level r0=sp.run(c0,check=True,capture_output=True,text=True).stdout # Find r0=r0.split("\n")[:-1] # Convert output to list and cut the last element r0.sort() # Order the list r0=r0[-500:] # Cut the list to only up to 500 files [ c1.append(i) for i in r0 ] # Append the file list to the tar command r1=sp.Popen(c1,stdout=sp.PIPE) # Execute tar and pipe the output... r2=sp.Popen(c2,stdin=r1.stdout,stdout=out) # ...to the pigz command r1.stdout.close() # When pigz ends, close stdout of tar r3=r2.communicate()[0] # Check whether pigz have ended or not return path_to_output # V. Parallel compilation of files # In this case, the settings of the workers are for sixteen nodes. One node for # each experiment. There is only a single substream for each stream and only a # stream for each experiment. Then we parallelise at the level of experiments. # For that reason, we gather the results in a separate for-loop. list_tasks={} # Dictionary of tasks list_results={} # Dictionary of results for exp in exps: # Cycle throughout experiments... print("Concatenating experiment {}...".format(exp)) list_tasks[exp]={} # Initialize the dictionary of the exp-stream pair list_results[exp]={} for strm in strms: # Cycle throughout the streams... print(" Concatenating stream {}...".format(strm)) list_tasks[exp][strm]=[ client.submit(tar_and_zip_output, exps_path, outs_path, exp, strm, sstrm, pure=False ) for sstrm in sstrms[strm] ] for exp in exps: for strm in strms: list_results[exp][strm]=[ i.result() for i in list_tasks[exp][strm] ] # VI. Relinquish control of cluster reserved resources client.close() print(client) # VII. Cancel the processes in the cluster command=["scancel","-u","x000000","--name","dask-workers-Storage3d"] sp.run(command,check=True) command=["scancel","-u","x000000","--name","dask-scheduler-Storage3d"] sp.run(command,check=True)