import numpy as np import subprocess as sp from dask.distributed import Client # 0. Preparation client=Client(scheduler_file=('/scratch/m/m300556/dask-work/test'+ '/scheduler_test.json')) # Connect to the reserved resources. print(client) # Print out information about the resources. A=np.random.rand(8,8) # Define the matrix and the vector. v=np.array([1,1,2,3,5,8,13,21]) def multip(mat_elem,vec_elem): # Helper function. return mat_elem*vec_elem # 1. Parallel calculation task_multiply_sum=[ client.submit(sum,[ client.submit(multip,A[i,j],v[j]) for j in range(A.shape[1]) ]) for i in range(A.shape[0]) ] result=np.array([task_multiply_sum[i].result() for i in range(A.shape[0])]) # This gathers the results. # 2. Disconnection from the cluster resources client.close() # Disconnect the scheduler. print(client) # Shows information to verify the disconnection. command=["scancel","-u","m300556","--name","dask-workers-test"] sp.run(command,check=True) # First kill workers command=["scancel","-u","m300556","--name","dask-scheduler-test"] sp.run(command,check=True) # Then the scheduler # 3. Check the results against an established library alt_result=np.dot(A,v) # Use np.dot to perform the matrix multiplication. print(result==alt_result) # Print comparison. print(result) # Print out both vectors. print(alt_result)