Source code for holotomocupy.chunking

import cupy as cp
import numpy as np

global_chunk = 16


[docs]def gpu_batch(func): """ Decorator for processing data by chunks on GPU Parameters ---------- func : function for processing on GPU. The function should have syntax: [out1,out2,...] = func(in1, in2,.., par1,par2..), where arrays in1,in2,.., out1,out2,.. have the same shape in the first dimension treated as the dimension for chunking data processing on GPU. The func() should perform processing on cupy arrays and return the result as a list of ndarray or 1 ndarray. Example: nn=180 a = np.zeros([nn,5,3]) b = np.zeros([nn,5,3,2]) def func(in1,in2,par): out1 = in1+in2[:,0]*par out2 = in1+in2[:,1]*par return [out1,out2] """ def inner(*args, **kwargs): nn = args[0].shape[0] chunk = min(global_chunk, nn) nchunk = int(np.ceil(nn/chunk)) # if array is on gpu then just run the function if isinstance(args[0], cp.ndarray): out = func(*args, **kwargs) return out #else do processing by chunks inp_gpu = [] out = [] # determine the number o inputs ninp = 0 for k in range(0, len(args)): if (isinstance(args[k], np.ndarray) or isinstance(args[k], cp.ndarray)) and args[k].shape[0] == nn: inp_gpu.append( cp.empty([chunk, *args[k].shape[1:]], dtype=args[k].dtype)) ninp += 1 else: break # run by chunks for k in range(nchunk): st, end = k*chunk, min(nn, (k+1)*chunk) s = end-st # copy to gpu for j in range(ninp): inp_gpu[j][:s].set(args[j][st:end]) inp_gpu0 = [a for a in inp_gpu] #run function out_gpu = func(*inp_gpu0, *args[ninp:], **kwargs) if not isinstance(out_gpu, list): out_gpu = [out_gpu] if k == 0: # first time we know the out shape nout = len(out_gpu) for j in range(nout): out.append( np.empty([nn, *out_gpu[j].shape[1:]], dtype=out_gpu[j].dtype)) # copy from gpu for j in range(nout): out_gpu[j][:s].get(out=out[j][st:end]) # contiguous copy, fast if nout == 1: out = out[0] return out return inner
#####TO TRY WITh PINNED MEMORY #cp.cuda.set_pinned_memory_allocator(cp.cuda.PinnedMemoryPool().malloc) # streams for overlapping data transfers with computations # stream1 = cp.cuda.Stream(non_blocking=False) # stream2 = cp.cuda.Stream(non_blocking=False) # stream3 = cp.cuda.Stream(non_blocking=False) # def pinned_array(array): # """Allocate pinned memory and associate it with numpy array""" # mem = cp.cuda.alloc_pinned_memory(array.nbytes) # src = np.frombuffer( # mem, array.dtype, array.size).reshape(array.shape) # src[...] = array # return src # def gpu_batch(func): # #cp.cuda.set_pinned_memory_allocator(cp.cuda.PinnedMemoryPool().malloc) # def inner(*args, **kwargs): # nn = args[0].shape[0] # chunk = min(global_chunk, nn) # calculate based on data sizes # nchunk = int(np.ceil(nn/chunk)) # if isinstance(args[0], cp.ndarray): # out = func(*args, **kwargs) # return out # inp_gpu = [] # out_gpu = [] # out = [] # ninp = 0 # for k in range(0, len(args)): # if isinstance(args[k], np.ndarray) and args[k].shape[0] == nn: # inp_gpu.append( # cp.empty([2, chunk, *args[k].shape[1:]], dtype=args[k].dtype)) # ninp += 1 # else: # break # for k in range(nchunk+2): # if (k > 0 and k < nchunk+1): # with stream2: # st, end = (k-1)*chunk, min(nn, k*chunk) # inp_gpu0 = [a[(k-1) % 2] for a in inp_gpu] # tmp = func(*inp_gpu0, *args[ninp:], **kwargs) # if not isinstance(tmp, list): # tmp = [tmp] # if k == 1: # first time we know the out shape # nout = len(tmp) # for j in range(nout): # out_gpu.append( # cp.empty([2, chunk, *tmp[j].shape[1:]], dtype=tmp[j].dtype)) # out.append( # np.empty([nn, *tmp[j].shape[1:]], dtype=tmp[j].dtype)) # for j in range(nout): # out_gpu[j][(k-1) % 2] = tmp[j] # if (k > 1): # with stream3: # gpu->cpu copy # for j in range(nout): # # out_gpu[j][(k-2) % 2].get(out=out_pinned[j] # # [(k-2) % 2]) # contiguous copy, fast # st, end = (k-2)*chunk, min(nn, (k-1)*chunk) # s = end-st # out_gpu[j][(k-2) % 2,:s].get(out=out[j][st:end]) # contiguous copy, fast # if (k < nchunk): # with stream1: # cpu->gpu copy # st, end = k*chunk, min(nn, (k+1)*chunk) # s = end-st # for j in range(ninp): # # inp_pinned[j][k % 2, :s] = args[j][st:end] # # # contiguous copy, fast # # inp_gpu[j][k % 2].set(inp_pinned[j][k % 2]) # #inp_pinned[j][k % 2, :s] = args[j][st:end] # # contiguous copy, fast # inp_gpu[j][k % 2,:s].set(args[j][st:end]) # # stream3.synchronize() # # if (k > 1): # # st, end = (k-2)*chunk, min(nn, (k-1)*chunk) # # s = end-st # # for j in range(nout): # # out[j][st:end] = out_pinned[j][(k-2) % 2, :s] # stream1.synchronize() # stream2.synchronize() # stream3.synchronize() # if nout == 1: # out = out[0] # return out # return inner # @gpu_batch(8) # def S(psi, shift): # """Shift operator""" # n = psi.shape[-1] # p = shift.copy()#[st:end] # res = psi.copy() # # if p.shape[0]!=res.shape[0]: # # res = cp.tile(res,(shift.shape[0],1,1)) # res = cp.pad(res,((0,0),(n//2,n//2),(n//2,n//2)),'symmetric') # x = cp.fft.fftfreq(2*n).astype('float32') # [x, y] = cp.meshgrid(x, x) # pp = cp.exp(-2*cp.pi*1j * (x*p[:, 1, None, None]+y*p[:, 0, None, None])) # res = cp.fft.ifft2(pp*cp.fft.fft2(res)) # res = res[:,n//2:-n//2,n//2:-n//2] # return [res,res] # cp.random.seed(10) # a = tifffile.imread('../../tests/data/delta-chip-192.tiff') # a = a+1j*a/2 # b = np.empty_like(a) # shift = np.array(np.random.random([a.shape[0], 2]), dtype='float32')+3 # [b,b0] = S(a,shift) # [bb,bb0] = S(cp.array(a),cp.array(shift)) # import matplotlib.pyplot as plt # plt.figure() # plt.imshow(b[19].real,cmap='gray') # plt.colorbar() # plt.savefig('t1.png') # plt.figure() # plt.imshow(bb[19].real.get(),cmap='gray') # plt.colorbar() # plt.savefig('t.png') # # # print(np.linalg.norm(c)) # print(np.linalg.norm(b)) # print(cp.linalg.norm(bb)) # print(np.linalg.norm(b.real-bb.get().real)) # # print(np.linalg.norm(b-c))