





















































In this article by Giancarlo Zaccone, the author of the book Python Parallel Programming Cookbook, we will cover the following topics:
(For more resources related to this topic, see here.)
An asynchronous model is of fundamental importance along with the concept of event programming. The execution model of asynchronous activities can be implemented using a single stream of main control, both in uniprocessor systems and multiprocessor systems. In the asynchronous model of a concurrent execution, various tasks intersect with each other along the timeline, and all of this happens under the action of a single flow of control (single-threaded). The execution of a task can be suspended and then resumed alternating in time with any other task.
The asynchronous programming model
As you can see in the preceding figure, the tasks (each with a different color) are interleaved with one another, but they are in a single thread of control. This implies that when one task is in execution, the other tasks are not. A key difference between a multithreaded programming model and single-threaded asynchronous concurrent model is that in the first case, the operating system decides on the timeline whether to suspend the activity of a thread and start another, while in the second case, the programmer must assume that a thread may be suspended and replaced with another at almost any time.
The Python module Asyncio provides facilities to manage events, coroutines, tasks and threads, and synchronization primitives to write concurrent code. When a program becomes very long and complex, it is convenient to divide it into subroutines, each of which realizes a specific task, for which the program implements a suitable algorithm. The subroutine cannot be executed independently but only at the request of the main program, which is then responsible for coordinating the use of subroutines. Coroutines are a generalization of the subroutine. Like a subroutine, the coroutine computes a single computational step, but unlike subroutines, there is no main program that is used to coordinate the results. This is because the coroutines link themselves together to form a pipeline without any supervising function responsible for calling them in a particular order. In a coroutine, the execution point can be suspended and resumed later, having kept track of its local state in the intervening time.
In this example, we see how to use the coroutine mechanism of Asyncio to simulate a finite state machine of five states. A Finite-state automaton (FSA) is a mathematical model that is not only widely used in engineering disciplines but also in sciences, such as mathematics and computer science. The automata we want to simulate the behavior is as follows:
Finite State Machine
We have indicated with S0, S1, S2, S3, and S4 the states of the system, with 0 and 1 as the values for which the automata can pass from one state to the next state (this operation is called a transition). So for example, the state S0 can be passed to the state S1 only for the value 1 and S0 can pass the state S2 only to the value 0. The Python code that follows simulates a transition of the automaton from the state S0, the so-called Start State, up to the state S4, the End State:
#Asyncio Finite State Machine
import asyncio
import time
from random import randint
@asyncio.coroutine
def StartState():
print ("Start State called n")
input_value = randint(0,1)
time.sleep(1)
if (input_value == 0):
result = yield from State2(input_value)
else :
result = yield from State1(input_value)
print("Resume of the Transition : nStart State calling "
+ result)
@asyncio.coroutine
def State1(transition_value):
outputValue = str(("State 1 with transition value = %s n"
%(transition_value)))
input_value = randint(0,1)
time.sleep(1)
print("...Evaluating...")
if (input_value == 0):
result = yield from State3(input_value)
else :
result = yield from State2(input_value)
result = "State 1 calling " + result
return (outputValue + str(result))
@asyncio.coroutine
def State2(transition_value):
outputValue = str(("State 2 with transition value = %s n"
%(transition_value)))
input_value = randint(0,1)
time.sleep(1)
print("...Evaluating...")
if (input_value == 0):
result = yield from State1(input_value)
else :
result = yield from State3(input_value)
result = "State 2 calling " + result
return (outputValue + str(result))
@asyncio.coroutine
def State3(transition_value):
outputValue = str(("State 3 with transition value = %s n"
%(transition_value)))
input_value = randint(0,1)
time.sleep(1)
print("...Evaluating...")
if (input_value == 0):
result = yield from State1(input_value)
else :
result = yield from EndState(input_value)
result = "State 3 calling " + result
return (outputValue + str(result))
@asyncio.coroutine
def EndState(transition_value):
outputValue = str(("End State with transition value = %s n"
%(transition_value)))
print("...Stop Computation...")
return (outputValue )
if __name__ == "__main__":
print("Finite State Machine simulation with Asyncio Coroutine")
loop = asyncio.get_event_loop()
loop.run_until_complete(StartState())
After running the code, we have an output similar to this:
C:Python CookBookChapter 4- Asynchronous Programmingcodes - Chapter 4>python asyncio_state_machine.py
Finite State Machine simulation with Asyncio Coroutine
Start State called
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Stop Computation...
Resume of the Transition :
Start State calling State 1 with transition value = 1
State 1 calling State 3 with transition value = 0
State 3 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 3 with transition value = 1
State 3 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 3 with transition value = 1
State 3 calling End State with transition value = 1
Each state of the automata has been defined with the annotation @asyncio.coroutine. For example, the state S0 is:
@asyncio.coroutine
def StartState():
print ("Start State called n")
input_value = randint(0,1)
time.sleep(1)
if (input_value == 0):
result = yield from State2(input_value)
else :
result = yield from State1(input_value)
The transition to the next state is determined by input_value, which is defined by the randint(0,1) function of Python's module random. This function randomly provides the value 0 or 1, where it randomly determines to which state the finite-state machine will be passed:
input_value = randint(0,1)
After determining the value at which state the finite state machine will be passed, the coroutine calls the next coroutine using the command yield from:
if (input_value == 0):
result = yield from State2(input_value)
else :
result = yield from State1(input_value)
The variable result is the value that each coroutine returns. It is a string, and at the end of the computation, we can reconstruct [NV1] the transition from the initial state of the automaton, the Start State, up to the final state, the End State.
The main program starts the evaluation inside the event loop:
if __name__ == "__main__":
print("Finite State Machine simulation with Asyncio Coroutine")
loop = asyncio.get_event_loop()
loop.run_until_complete(StartState())
A graphics processing unit (GPU) is an electronic circuit that specializes in processing data to render images from polygonal primitives. Although they were designed to carry out rendering images, GPUs have continued to evolve, becoming more complex and efficient in serving both real-time and offline rendering community. GPUs have continued to evolve, becoming more complex and efficient in performing any scientific computation. Each GPU is indeed composed of several processing units called streaming multiprocessor (SM), representing the first logic level of parallelism; each SM in fact, works simultaneously and independently from the others.
The GPU architecture
Each SM is in turn divided into a group of Stream Processors (SP), each of which has a core of real execution and can run a thread sequentially. SP represents the smallest unit of execution logic and the level of finer parallelism. The division in SM and SP is structural in nature, but it is possible to outline a further logical organization of the SP of a GPU, which are grouped together in logical blocks characterized by a particular mode of execution—all cores that make up a group run at the same time with the same instructions. This is just the SIMD (Single Instruction, Multiple Data) model. The programming paradigm that characterizes GPU computing is also called stream processing because the data can be viewed as a homogeneous flow of values that are applied synchronously to the same operations.
Currently, the most efficient solutions to exploit the computing power provided by the GPU cards are the software libraries CUDA and OpenCL.
PyCUDA is a Python wrapper for CUDA (Compute Unified Device Architecture), the software library developed by NVIDIA for GPU programming.
The PyCuda programming model is designed for the common execution of a program on the CPU and GPU so as to allow you to perform the sequential parts on the CPU and the numeric parts that are more intensive on the GPU. The phases to be performed in the sequential mode are implemented and executed on the CPU (host), while the steps to be performed in parallel are implemented and executed on the GPU (device). The functions to be performed in parallel on the device are called kernels. The skeleton general for the execution of a generic function kernel on the device is as follows:
The PyCUDA programming model
To show the PyCuda workflow, let's consider a 5 × 5 random array and the following procedure:
The code for this is as follows:
import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule
import numpy
a = numpy.random.randn(5,5)
a = a.astype(numpy.float32)
a_gpu = cuda.mem_alloc(a.nbytes)
cuda.memcpy_htod(a_gpu, a)
mod = SourceModule("""
__global__ void doubleMatrix(float *a)
{
int idx = threadIdx.x + threadIdx.y*4;
a[idx] *= 2;
}
""")
func = mod.get_function("doubleMatrix")
func(a_gpu, block=(5,5,1))
a_doubled = numpy.empty_like(a)
cuda.memcpy_dtoh(a_doubled, a_gpu)
print ("ORIGINAL MATRIX")
print a
print ("DOUBLED MATRIX AFTER PyCUDA EXECUTION")
print a_doubled
The example output should be like this :
C:Python CookBookChapter 6 - GPU Programming with Python >python PyCudaWorkflow.py
ORIGINAL MATRIX
[[-0.59975582 1.93627465 0.65337795 0.13205571 -0.46468592]
[ 0.01441949 1.40946579 0.5343408 -0.46614054 -0.31727529]
[-0.06868593 1.21149373 -0.6035406 -1.29117763 0.47762445]
[ 0.36176383 -1.443097 1.21592784 -1.04906416 -1.18935871]
[-0.06960868 -1.44647694 -1.22041082 1.17092752 0.3686313 ]]
DOUBLED MATRIX AFTER PyCUDA EXECUTION
[[-1.19951165 3.8725493 1.3067559 0.26411143 -0.92937183]
[ 0.02883899 2.81893158 1.0686816 -0.93228108 -0.63455057]
[-0.13737187 2.42298746 -1.2070812 -2.58235526 0.95524889]
[ 0.72352767 -1.443097 1.21592784 -1.04906416 -1.18935871]
[-0.06960868 -1.44647694 -1.22041082 1.17092752 0.3686313 ]]
The code starts with the following imports:
import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule
The pycuda.autoinit import automatically picks a GPU to run on based on the availability and number. It also creates a GPU context for subsequent code to run in. Both the chosen device and the created context are available from pycuda.autoinit as importable symbols if needed. While the SourceModule component is the object where a C-like code for the GPU must be written.
The first step is to generate the input 5 × 5 matrix. Since most GPU computations involve large arrays of data, the NumPy module must be imported:
import numpy
a = numpy.random.randn(5,5)
Then, the items in the matrix are converted in a single precision mode, many NVIDIA cards support only single precision:
a = a.astype(numpy.float32)
The first operation to be done in order to implement a GPU loads the input array from the host memory (CPU) to the device (GPU). This is done at the beginning of the operation and consists two steps that are performed by invoking two functions provided PyCuda[NV3] .
call cuda.memcpy_htod : cuda.memcpy_htod(a_gpu, a).
We also note that a_gpu is one dimensional, and on the device, we need to handle it as such. All these operations do not require the invocation of a kernel and are made directly by the main processor. The SourceModule entity serves to define the (C-like) kernel function doubleMatrix that multiplies each array entry by 2:
mod = SourceModule("""
__global__ void doubleMatrix(float *a)
{
int idx = threadIdx.x + threadIdx.y*4;
a[idx] *= 2;
}
""")
The __global__ qualifier is a directive that indicates that the doubleMatrix function will be processed on the device. It will be just the compiler Cuda nvcc that will be used to perform this task.
Let's look at the function's body, which is as follows:
int idx = threadIdx.x + threadIdx.y*4;
The idx parameter is the matrix index that is identified by the thread coordinates threadIdx.x and threadIdx.y. Then, the element matrix with the index idx is multiplied by 2:
a[idx] *= 2;
We also note that this kernel function will be executed once in 16 different threads. Both the variables threadIdx.x and threadIdx.y contain indices between 0 and 3
, and the pair[NV4] is different for each thread. Threads scheduling is directly linked to the GPU architecture and its intrinsic parallelism. A block of threads is assigned to a single SM. Here, threads are further divided into groups called warps. The size of a warp depends on the architecture under consideration. The threads of the same warp are managed by the control unit called the warp scheduler. To take full advantage of the inherent parallelism of the SM, the threads of the same warp must execute the same instruction. If this condition does not occur, we speak of divergence of threads. If the same warp threads execute different instructions, the control unit cannot handle all the warps. It must follow the sequences of instructions for every single thread (or for homogeneous subsets of threads) in a serial mode. Let's observe how the thread block is divided in various warps—threads are divided by the value of threadIdx.
The threadIdx structure consists of 3 fields: threadIdx.x, threadIdx.y, and threadIdx.z.
Thread blocks subdivision: T(x,y), where x = threadIdx.x and y = threadIdx.y
We can see again that the code in the kernel function will be automatically compiled by the nvcc cuda compiler. If there are no errors, a pointer to this compiled function will be created. In fact, the mod.get_function[NV5] ("doubleMatrix") returns an identifier to the function created func:
func = mod.get_function("doubleMatrix ")
To perform a function on the device, you must first configure the execution appropriately. This means that we need to determine the size of the coordinates to identify and distinguish the thread belonging to different blocks. This will be done using the block parameter inside the func call:
func(a_gpu, block = (5, 5, 1))
The block = (5, 5, 1) tells us that we are calling a kernel function with a_gpu linearized input matrix and a single thread block of size, 5 threads in the x direction, 5 threads in the y direction, and 1 thread in the z direction, 16 threads in total. This structure is designed with parallel implementation of the algorithm of interest. The division of the workload results is an early form of parallelism that is sufficient and necessary to make use of the computing resources provided by the GPU. Once you've configured the kernel's invocation, you can invoke the kernel function that executes instructions in parallel on the device. Each thread executes the same code kernel.
After the computation in the gpu device, we use an array to store the results:
a_doubled = numpy.empty_like(a)
cuda.memcpy_dtoh(a_doubled, a_gpu)
As for programming with PyCuda, the first step to build a program for PyOpenCL is the encoding of the host application. In fact, this is performed on the host computer (typically, the user's PC) and then, it dispatches the kernel application on the connected devices (GPU cards).
The host application must contain five data structures, which are as follows:
PyOpenCL programming
The preceding figure shows how these data structures can work in a host application. Let's remember again that a program can contain multiple functions that are to be executed on the device and each kernel encapsulates only a single function from the program.
In this example, we show you the basic steps to build a PyOpenCL program. The task to be executed is the parallel sum of two vectors. In order to maintain a readable output, let's consider two vectors, each of one out of 100 elements. The resulting vector will be for each element's i[NV6] th, which is the sum of the ith element vector_a plus the ith element vector_b.
Of course, to be able to appreciate the parallel execution of this code, you can also increase some orders of magnitude the size of the input vector_dimension:[NV7]
import numpy as np
import pyopencl as cl
import numpy.linalg as la
vector_dimension = 100
vector_a = np.random.randint(vector_dimension, size=vector_dimension)
vector_b = np.random.randint(vector_dimension, size=vector_dimension)
platform = cl.get_platforms()[0]
device = platform.get_devices()[0]
context = cl.Context([device])
queue = cl.CommandQueue(context)
mf = cl.mem_flags
a_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_a)
b_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_b)
program = cl.Program(context, """
__kernel void vectorSum(__global const int *a_g, __global const int *b_g, __global int *res_g) {
int gid = get_global_id(0);
res_g[gid] = a_g[gid] + b_g[gid];
}
""").build()
res_g = cl.Buffer(context, mf.WRITE_ONLY, vector_a.nbytes)
program.vectorSum(queue, vector_a.shape, None, a_g, b_g, res_g)
res_np = np.empty_like(vector_a)
cl.enqueue_copy(queue, res_np, res_g)
print ("PyOPENCL SUM OF TWO VECTORS")
print ("Platform Selected = %s" %platform.name )
print ("Device Selected = %s" %device.name)
print ("VECTOR LENGTH = %s" %vector_dimension)
print ("INPUT VECTOR A")
print vector_a
print ("INPUT VECTOR B")
print vector_b
print ("OUTPUT VECTOR RESULT A + B ")
print res_np
assert(la.norm(res_np - (vector_a + vector_b))) < 1e-5
The output from Command Prompt should be like this:
C:Python CookBook Chapter 6 - GPU Programming with PythonChapter 6 - codes>python PyOpenCLParallellSum.py
Platform Selected = NVIDIA CUDA
Device Selected = GeForce GT 240
VECTOR LENGTH = 100
INPUT VECTOR A
[ 0 29 88 46 68 93 81 3 58 44 95 20 81 69 85 25 89 39 47 29 47 48 20 86 59 99 3 26 68 62 16 13 63 28 77 57 59 45 52 89 16 6 18 95 30 66 19 29 31 18 42 34 70 21 28 0 42 96 23 86 64 88 20 26 96 45 28 53 75 53 39 83 85 99 49 93 23 39 1 89 39 87 62 29 51 66 5 66 48 53 66 8 51 3 29 96 67 38 22 88]
INPUT VECTOR B
[98 43 16 28 63 1 83 18 6 58 47 86 59 29 60 68 19 51 37 46 99 27 4 94 5 22 3 96 18 84 29 34 27 31 37 94 13 89 3 90 57 85 66 63 8 74 21 18 34 93 17 26 9 88 38 28 14 68 88 90 18 6 40 30 70 93 75 0 45 86 15 10 29 84 47 74 22 72 69 33 81 31 45 62 81 66 69 14 71 96 91 51 35 4 63 36 28 65 10 41]
OUTPUT VECTOR RESULT A + B
[ 98 72 104 74 131 94 164 21 64 102 142 106 140 98 145 93 108 90
84 75 146 75 24 180 64 121 6 122 86 146 45 47 90 59 114 151
72 134 55 179 73 91 84 158 38 140 40 47 65 111 59 60 79 109
66 28 56 164 111 176 82 94 60 56 166 138 103 53 120 139 54 93
114 183 96 167 45 111 70 122 120 118 107 91 132 132 74 80 119 149
157 59 86 7 92 132 95 103 32 129]
In the first line of the code after the required module import, we defined the input vectors:
vector_dimension = 100
vector_a = np.random.randint(vector_dimension, size= vector_dimension)
vector_b = np.random.randint(vector_dimension, size= vector_dimension)
Each vector contain 100 integers items that are randomly selected thought the NumPy function:
np.random.randint(max integer , size of the vector)
Then, we must select the device to run the kernel code. To do this, we must first select the platform using the get_platform() PyOpenCL statement:
platform = cl.get_platforms()[0]
This platform, as you can see from the output, corresponds to the NVIDIA CUDA platform. Then, we must select the device using the get_device() platform's method:
device = platform.get_devices()[0]
In the following steps, the context and the queue are defined, PyOpenCL provides the method context (device selected) and queue (context selected):
context = cl.Context([device])
queue = cl.CommandQueue(context)
To perform the computation in the device, the input vector must be transferred to the device's memory. So, two input buffers in the device memory must be created:
mf = cl.mem_flags
a_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_a)
b_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_b)
Also, we prepare the buffer for the resulting vector:
res_g = cl.Buffer(context, mf.WRITE_ONLY, vector_a.nbytes)
Finally, the core of the script, the kernel code is defined inside a program as follows:
program = cl.Program(context, """
__kernel void vectorSum(__global const int *a_g, __global const int *b_g, __global int *res_g) {
int gid = get_global_id(0);
res_g[gid] = a_g[gid] + b_g[gid];
}
""").build()
The kernel's name is vectorSum. The parameter list defines the data types of the input arguments (vectors of integers) and output data type (a vector of integer again).
Inside the kernel, the sum of the two vectors is simply defined as:
In OpenCL and PyOpenCL, buffers are attached to a context[NV8] and are only moved to a device once the buffer is used on that device. Finally, we execute vectorSum in the device:
program.vectorSum(queue, vector_a.shape, None, a_g, b_g, res_g)
To visualize the results, an empty vector is built:
res_np = np.empty_like(vector_a)
Then, the result is copied into this vector:
cl.enqueue_copy(queue, res_np, res_g)
Finally, the results are displayed:
print ("VECTOR LENGTH = %s" %vector_dimension)
print ("INPUT VECTOR A")
print vector_a
print ("INPUT VECTOR B")
print vector_b
print ("OUTPUT VECTOR RESULT A + B ")
print res_np
To check the result, we use the assert statement. It tests the result and triggers an error if the condition is false:
assert(la.norm(res_np - (vector_a + vector_b))) < 1e-5
In this article we discussed about Asyncio, GPU programming with Python, PyCUDA, and PyOpenCL.
Further resources on this subject: