Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Asynchronous Programming with Python

Save for later
  • 20 min read
  • 26 Aug 2015

article-image

 In this article by Giancarlo Zaccone, the author of the book Python Parallel Programming Cookbook, we will cover the following topics:

  • Introducing Asyncio
  • GPU programming with Python
  • Introducing PyCUDA
  • Introducing PyOpenCL

(For more resources related to this topic, see here.)

An asynchronous model is of fundamental importance along with the concept of event programming. The execution model of asynchronous activities can be implemented using a single stream of main control, both in uniprocessor systems and multiprocessor systems. In the asynchronous model of a concurrent execution, various tasks intersect with each other along the timeline, and all of this happens under the action of a single flow of control (single-threaded). The execution of a task can be suspended and then resumed alternating in time with any other task.

asynchronous-programming-python-img-0

The asynchronous programming model

As you can see in the preceding figure, the tasks (each with a different color) are interleaved with one another, but they are in a single thread of control. This implies that when one task is in execution, the other tasks are not. A key difference between a multithreaded programming model and single-threaded asynchronous concurrent model is that in the first case, the operating system decides on the timeline whether to suspend the activity of a thread and start another, while in the second case, the programmer must assume that a thread may be suspended and replaced with another at almost any time.

Introducing Asyncio

The Python module Asyncio provides facilities to manage events, coroutines, tasks and threads, and synchronization primitives to write concurrent code. When a program becomes very long and complex, it is convenient to divide it into subroutines, each of which realizes a specific task, for which the program implements a suitable algorithm. The subroutine cannot be executed independently but only at the request of the main program, which is then responsible for coordinating the use of subroutines. Coroutines are a generalization of the subroutine. Like a subroutine, the coroutine computes a single computational step, but unlike subroutines, there is no main program that is used to coordinate the results. This is because the coroutines link themselves together to form a pipeline without any supervising function responsible for calling them in a particular order. In a coroutine, the execution point can be suspended and resumed later, having kept track of its local state in the intervening time.

In this example, we see how to use the coroutine mechanism of Asyncio to simulate a finite state machine of five states. A Finite-state automaton (FSA) is a mathematical model that is not only widely used in engineering disciplines but also in sciences, such as mathematics and computer science. The automata we want to simulate the behavior is as follows:

asynchronous-programming-python-img-1

Finite State Machine

We have indicated with S0, S1, S2, S3, and S4 the states of the system, with 0 and 1 as the values for which the automata can pass from one state to the next state (this operation is called a transition). So for example, the state S0 can be passed to the state S1 only for the value 1 and S0 can pass the state S2 only to the value 0. The Python code that follows simulates a transition of the automaton from the state S0, the so-called Start State, up to the state S4, the End State:

#Asyncio Finite State Machine
import asyncio
import time
from random import randint
@asyncio.coroutine
def StartState():
    print ("Start State called n")
    input_value = randint(0,1)
    time.sleep(1)
    if (input_value == 0):
        result = yield from State2(input_value)
    else :
        result = yield from State1(input_value)
    print("Resume of the Transition : nStart State calling "
          + result)

@asyncio.coroutine def State1(transition_value): outputValue = str(("State 1 with transition value = %s n" %(transition_value))) input_value = randint(0,1) time.sleep(1) print("...Evaluating...") if (input_value == 0): result = yield from State3(input_value) else : result = yield from State2(input_value) result = "State 1 calling " + result return (outputValue + str(result)) @asyncio.coroutine def State2(transition_value): outputValue = str(("State 2 with transition value = %s n" %(transition_value))) input_value = randint(0,1) time.sleep(1) print("...Evaluating...") if (input_value == 0): result = yield from State1(input_value) else : result = yield from State3(input_value) result = "State 2 calling " + result return (outputValue + str(result)) @asyncio.coroutine def State3(transition_value): outputValue = str(("State 3 with transition value = %s n" %(transition_value))) input_value = randint(0,1) time.sleep(1) print("...Evaluating...") if (input_value == 0): result = yield from State1(input_value) else : result = yield from EndState(input_value) result = "State 3 calling " + result return (outputValue + str(result)) @asyncio.coroutine def EndState(transition_value): outputValue = str(("End State with transition value = %s n" %(transition_value))) print("...Stop Computation...") return (outputValue ) if __name__ == "__main__": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())

After running the code, we have an output similar to this:

C:Python CookBookChapter 4- Asynchronous Programmingcodes - Chapter 4>python asyncio_state_machine.py
Finite State Machine simulation with Asyncio Coroutine
Start State called
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Stop Computation...
Resume of the Transition :
Start State calling State 1 with transition value = 1
State 1 calling State 3 with transition value = 0
State 3 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 3 with transition value = 1
State 3 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 3 with transition value = 1
State 3 calling End State with transition value = 1

Each state of the automata has been defined with the annotation @asyncio.coroutine. For example, the state S0 is:

@asyncio.coroutine
def StartState():
    print ("Start State called n")
    input_value = randint(0,1)
    time.sleep(1)
    if (input_value == 0):
        result = yield from State2(input_value)
    else :
        result = yield from State1(input_value)

The transition to the next state is determined by input_value, which is defined by the randint(0,1) function of Python's module random. This function randomly provides the value 0 or 1, where it randomly determines to which state the finite-state machine will be passed:

input_value = randint(0,1)

After determining the value at which state the finite state machine will be passed, the coroutine calls the next coroutine using the command yield from:

if (input_value == 0):
        result = yield from State2(input_value)
    else :
        result = yield from State1(input_value)

The variable result is the value that each coroutine returns. It is a string, and at the end of the computation, we can reconstruct [NV1] the transition from the initial state of the automaton, the Start State, up to the final state, the End State.

The main program starts the evaluation inside the event loop:

if __name__ == "__main__":
    print("Finite State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

GPU programming with Python

A graphics processing unit (GPU) is an electronic circuit that specializes in processing data to render images from polygonal primitives. Although they were designed to carry out rendering images, GPUs have continued to evolve, becoming more complex and efficient in serving both real-time and offline rendering community. GPUs have continued to evolve, becoming more complex and efficient in performing any scientific computation. Each GPU is indeed composed of several processing units called streaming multiprocessor (SM), representing the first logic level of parallelism; each SM in fact, works simultaneously and independently from the others.

asynchronous-programming-python-img-2

The GPU architecture

Each SM is in turn divided into a group of Stream Processors (SP), each of which has a core of real execution and can run a thread sequentially. SP represents the smallest unit of execution logic and the level of finer parallelism. The division in SM and SP is structural in nature, but it is possible to outline a further logical organization of the SP of a GPU, which are grouped together in logical blocks characterized by a particular mode of execution—all cores that make up a group run at the same time with the same instructions. This is just the SIMD (Single Instruction, Multiple Data) model. The programming paradigm that characterizes GPU computing is also called stream processing because the data can be viewed as a homogeneous flow of values that are applied synchronously to the same operations.

Currently, the most efficient solutions to exploit the computing power provided by the GPU cards are the software libraries CUDA and OpenCL.

Introducing PyCUDA

PyCUDA is a Python wrapper for CUDA (Compute Unified Device Architecture), the software library developed by NVIDIA for GPU programming.

The PyCuda programming model is designed for the common execution of a program on the CPU and GPU so as to allow you to perform the sequential parts on the CPU and the numeric parts that are more intensive on the GPU. The phases to be performed in the sequential mode are implemented and executed on the CPU (host), while the steps to be performed in parallel are implemented and executed on the GPU (device). The functions to be performed in parallel on the device are called kernels. The skeleton general for the execution of a generic function kernel on the device is as follows:

  1. Allocation of memory on the device.
  2. Transfer of data from the host memory to that allocated on the device.
  3. Running the device:
    1. Running the configuration.
    2. Invocation of the kernel function.
  4. Transfer of the results from the memory on the device to the host memory.
  5. Release of the memory allocated on the device.

    asynchronous-programming-python-img-3

The PyCUDA programming model

To show the PyCuda workflow, let's consider a 5 × 5 random array and the following procedure:

  1. Create the array 5×5 on the CPU.
  2. Transfer the array to the GPU.
  3. Perform a Task[NV2]  on the array in the GPU (double all the items in the array).
  4. Transfer the array from the GPU to the CPU.
  5. Print the results.

The code for this is as follows:

import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule

import numpy

a = numpy.random.randn(5,5)
a = a.astype(numpy.float32)

a_gpu = cuda.mem_alloc(a.nbytes)
cuda.memcpy_htod(a_gpu, a)

mod = SourceModule("""
  __global__ void doubleMatrix(float *a)
  {
    int idx = threadIdx.x + threadIdx.y*4;
    a[idx] *= 2;
  }
  """)

func = mod.get_function("doubleMatrix")
func(a_gpu, block=(5,5,1))

a_doubled = numpy.empty_like(a)
cuda.memcpy_dtoh(a_doubled, a_gpu)
print ("ORIGINAL MATRIX")
print a
print ("DOUBLED MATRIX AFTER PyCUDA EXECUTION")
print a_doubled

The example output should be like this :

C:Python CookBookChapter 6 - GPU Programming with Python >python PyCudaWorkflow.py
ORIGINAL MATRIX
[[-0.59975582  1.93627465  0.65337795  0.13205571 -0.46468592]
 [ 0.01441949  1.40946579  0.5343408  -0.46614054 -0.31727529]
 [-0.06868593  1.21149373 -0.6035406  -1.29117763  0.47762445]
 [ 0.36176383 -1.443097    1.21592784 -1.04906416 -1.18935871]
 [-0.06960868 -1.44647694 -1.22041082  1.17092752  0.3686313 ]]

DOUBLED MATRIX AFTER PyCUDA EXECUTION
[[-1.19951165  3.8725493   1.3067559   0.26411143 -0.92937183]
 [ 0.02883899  2.81893158  1.0686816  -0.93228108 -0.63455057]
 [-0.13737187  2.42298746 -1.2070812  -2.58235526  0.95524889]
 [ 0.72352767 -1.443097    1.21592784 -1.04906416 -1.18935871]
 [-0.06960868 -1.44647694 -1.22041082  1.17092752  0.3686313 ]]

The code starts with the following imports:

import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule

The pycuda.autoinit import automatically picks a GPU to run on based on the availability and number. It also creates a GPU context for subsequent code to run in. Both the chosen device and the created context are available from pycuda.autoinit as importable symbols if needed. While the SourceModule component is the object where a C-like code for the GPU must be written.

The first step is to generate the input 5 × 5 matrix. Since most GPU computations involve large arrays of data, the NumPy module must be imported:

import numpy
a = numpy.random.randn(5,5)

Then, the items in the matrix are converted in a single precision mode, many NVIDIA cards support only single precision:

a = a.astype(numpy.float32)

The first operation to be done in order to implement a GPU loads the input array from the host memory (CPU) to the device (GPU). This is done at the beginning of the operation and consists two steps that are performed by invoking two functions provided PyCuda[NV3] .

  • Memory allocation on the device is done via the cuda.mem_alloc function. The device and host memory may not ever communicate while performing a function kernel. This means that to run a function in parallel on the device, the data relating to it must be present in the memory of the device itself. Before you copy data from the host memory to the device memory, you must allocate the memory required on the device: a_gpu = cuda.mem_alloc(a.nbytes).
  • Copy of the matrix from the host memory to that of the device with the function:
call cuda.memcpy_htod : cuda.memcpy_htod(a_gpu, a).

We also note that a_gpu is one dimensional, and on the device, we need to handle it as such. All these operations do not require the invocation of a kernel and are made directly by the main processor. The SourceModule entity serves to define the (C-like) kernel function doubleMatrix that multiplies each array entry by 2:

mod = SourceModule("""
  __global__ void doubleMatrix(float *a)
  {
    int idx = threadIdx.x + threadIdx.y*4;
    a[idx] *= 2;
  }
  """)

The __global__ qualifier is a directive that indicates that the doubleMatrix function will be processed on the device. It will be just the compiler Cuda nvcc that will be used to perform this task.

Let's look at the function's body, which is as follows:

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at £15.99/month. Cancel anytime
int idx = threadIdx.x + threadIdx.y*4;

The idx parameter is the matrix index that is identified by the thread coordinates threadIdx.x and threadIdx.y. Then, the element matrix with the index idx is multiplied by 2:

a[idx] *= 2;

We also note that this kernel function will be executed once in 16 different threads. Both the variables threadIdx.x and threadIdx.y contain indices between 0 and 3

, and the pair[NV4]  is different for each thread. Threads scheduling is directly linked to the GPU architecture and its intrinsic parallelism. A block of threads is assigned to a single SM. Here, threads are further divided into groups called warps. The size of a warp depends on the architecture under consideration. The threads of the same warp are managed by the control unit called the warp scheduler. To take full advantage of the inherent parallelism of the SM, the threads of the same warp must execute the same instruction. If this condition does not occur, we speak of divergence of threads. If the same warp threads execute different instructions, the control unit cannot handle all the warps. It must follow the sequences of instructions for every single thread (or for homogeneous subsets of threads) in a serial mode. Let's observe how the thread block is divided in various warps—threads are divided by the value of threadIdx.

The threadIdx structure consists of 3 fields: threadIdx.x, threadIdx.y, and threadIdx.z.

asynchronous-programming-python-img-4

Thread blocks subdivision: T(x,y), where x = threadIdx.x and y = threadIdx.y

We can see again that the code in the kernel function will be automatically compiled by the nvcc cuda compiler. If there are no errors, a pointer to this compiled function will be created. In fact, the mod.get_function[NV5] ("doubleMatrix") returns an identifier to the function created func:

func = mod.get_function("doubleMatrix ")

To perform a function on the device, you must first configure the execution appropriately. This means that we need to determine the size of the coordinates to identify and distinguish the thread belonging to different blocks. This will be done using the block parameter inside the func call:

func(a_gpu, block = (5, 5, 1))

The block = (5, 5, 1) tells us that we are calling a kernel function with a_gpu linearized input matrix and a single thread block of size, 5 threads in the x direction, 5 threads in the y direction, and 1 thread in the z direction, 16 threads in total. This structure is designed with parallel implementation of the algorithm of interest. The division of the workload results is an early form of parallelism that is sufficient and necessary to make use of the computing resources provided by the GPU. Once you've configured the kernel's invocation, you can invoke the kernel function that executes instructions in parallel on the device. Each thread executes the same code kernel.

After the computation in the gpu device, we use an array to store the results:

a_doubled = numpy.empty_like(a)
cuda.memcpy_dtoh(a_doubled, a_gpu)

Introducing PyOpenCL

As for programming with PyCuda, the first step to build a program for PyOpenCL is the encoding of the host application. In fact, this is performed on the host computer (typically, the user's PC) and then, it dispatches the kernel application on the connected devices (GPU cards).

The host application must contain five data structures, which are as follows:

  • Device: This identifies the hardware where the kernel code must be executed. A PyOpenCL application can be executed not only on CPU and GPU cards but also on embedded devices such as FPGA (Field Programmable Gate Array).
  • Program: This is a group of kernels. A program selects which kernel must be executed on the device.
  • Kernel: This is the code to be executed on the device. A kernel is essentially a (C-like) function that enables it to be compiled for an execution on any device that supports OpenCL drivers. The kernel is the only way the host can call a function that will run on a device. When the host invokes a kernel, many work items start running on the device. Each work item runs the code of the kernel but works on a different part of the dataset.
  • Command queue: Each device receives kernels through this data structure. A command queue orders the execution of kernels on the device.
  • Context: This is a group of devices. A context allows devices to receive kernels and transfer data.

asynchronous-programming-python-img-5

PyOpenCL programming

The preceding figure shows how these data structures can work in a host application. Let's remember again that a program can contain multiple functions that are to be executed on the device and each kernel encapsulates only a single function from the program.

In this example, we show you the basic steps to build a PyOpenCL program. The task to be executed is the parallel sum of two vectors. In order to maintain a readable output, let's consider two vectors, each of one out of 100 elements. The resulting vector will be for each element's i[NV6] th, which is the sum of the ith element vector_a plus the ith element vector_b.

Of course, to be able to appreciate the parallel execution of this code, you can also increase some orders of magnitude the size of the input vector_dimension:[NV7] 

import numpy as np

import pyopencl as cl
import numpy.linalg as la
vector_dimension = 100

vector_a = np.random.randint(vector_dimension, size=vector_dimension)
vector_b = np.random.randint(vector_dimension, size=vector_dimension)

platform = cl.get_platforms()[0]
device = platform.get_devices()[0]

context = cl.Context([device])
queue = cl.CommandQueue(context)

mf = cl.mem_flags
a_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_a)
b_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_b)

 

program = cl.Program(context, """
__kernel void vectorSum(__global const int *a_g, __global const int *b_g, __global int *res_g) {
  int gid = get_global_id(0);
  res_g[gid] = a_g[gid] + b_g[gid];
}
""").build()

res_g = cl.Buffer(context, mf.WRITE_ONLY, vector_a.nbytes)
program.vectorSum(queue, vector_a.shape, None, a_g, b_g, res_g)

res_np = np.empty_like(vector_a)
cl.enqueue_copy(queue, res_np, res_g)

print ("PyOPENCL SUM OF TWO VECTORS")
print ("Platform Selected = %s" %platform.name )
print ("Device Selected = %s" %device.name)
print ("VECTOR LENGTH = %s" %vector_dimension)
print ("INPUT VECTOR A")
print vector_a
print ("INPUT VECTOR B")
print vector_b
print ("OUTPUT VECTOR RESULT A + B ")
print res_np

assert(la.norm(res_np - (vector_a + vector_b))) < 1e-5

The output from Command Prompt should be like this:

C:Python CookBook Chapter 6 - GPU Programming with PythonChapter 6 - codes>python PyOpenCLParallellSum.py
Platform Selected = NVIDIA CUDA
Device Selected = GeForce GT 240

VECTOR LENGTH = 100
INPUT VECTOR A
[ 0 29 88 46 68 93 81  3 58 44 95 20 81 69 85 25 89 39 47 29 47 48 20 86 59 99  3 26 68 62 16 13 63 28 77 57 59 45 52 89 16  6 18 95 30 66 19 29 31 18 42 34 70 21 28  0 42 96 23 86 64 88 20 26 96 45 28 53 75 53 39 83 85 99 49 93 23 39  1 89 39 87 62 29 51 66  5 66 48 53 66  8 51  3 29 96 67 38 22 88]

INPUT VECTOR B
[98 43 16 28 63  1 83 18  6 58 47 86 59 29 60 68 19 51 37 46 99 27  4 94  5 22 3 96 18 84 29 34 27 31 37 94 13 89  3 90 57 85 66 63  8 74 21 18 34 93 17 26  9 88 38 28 14 68 88 90 18  6 40 30 70 93 75  0 45 86 15 10 29 84 47 74 22 72 69 33 81 31 45 62 81 66 69 14 71 96 91 51 35  4 63 36 28 65 10 41]

OUTPUT VECTOR RESULT A + B
[ 98  72 104  74 131  94 164  21  64 102 142 106 140  98 145  93 108  90
  84  75 146  75  24 180  64 121   6 122  86 146  45  47  90  59 114 151
  72 134  55 179  73  91  84 158  38 140  40  47  65 111  59  60  79 109
  66  28  56 164 111 176  82  94  60  56 166 138 103  53 120 139  54  93
 114 183  96 167  45 111  70 122 120 118 107  91 132 132  74  80 119 149
 157  59  86   7  92 132  95 103  32 129]

In the first line of the code after the required module import, we defined the input vectors:

vector_dimension = 100
vector_a = np.random.randint(vector_dimension, size= vector_dimension)
vector_b = np.random.randint(vector_dimension, size= vector_dimension)

Each vector contain 100 integers items that are randomly selected thought the NumPy function:

np.random.randint(max integer , size of the vector)

Then, we must select the device to run the kernel code. To do this, we must first select the platform using the get_platform() PyOpenCL statement:

platform = cl.get_platforms()[0]

This platform, as you can see from the output, corresponds to the NVIDIA CUDA platform. Then, we must select the device using the get_device() platform's method:

device = platform.get_devices()[0]

In the following steps, the context and the queue are defined, PyOpenCL provides the method context (device selected) and queue (context selected):

context = cl.Context([device])
queue = cl.CommandQueue(context)

To perform the computation in the device, the input vector must be transferred to the device's memory. So, two input buffers in the device memory must be created:

mf = cl.mem_flags
a_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_a)
b_g = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=vector_b)

Also, we prepare the buffer for the resulting vector:

res_g = cl.Buffer(context, mf.WRITE_ONLY, vector_a.nbytes)

Finally, the core of the script, the kernel code is defined inside a program as follows:

program = cl.Program(context, """
__kernel void vectorSum(__global const int *a_g, __global const int *b_g, __global int *res_g) {
  int gid = get_global_id(0);
  res_g[gid] = a_g[gid] + b_g[gid];
}
""").build()

The kernel's name is vectorSum. The parameter list defines the data types of the input arguments (vectors of integers) and output data type (a vector of integer again).

Inside the kernel, the sum of the two vectors is simply defined as:

  • Initialize the vector index: int gid = get_global_id(0)
  • Sum the vector's components: res_g[gid] = a_g[gid] + b_g[gid];

In OpenCL and PyOpenCL, buffers are attached to a context[NV8]  and are only moved to a device once the buffer is used on that device. Finally, we execute vectorSum in the device:

program.vectorSum(queue, vector_a.shape, None, a_g, b_g, res_g)

To visualize the results, an empty vector is built:

res_np = np.empty_like(vector_a)

Then, the result is copied into this vector:

cl.enqueue_copy(queue, res_np, res_g)

Finally, the results are displayed:

print ("VECTOR LENGTH = %s" %vector_dimension)
print ("INPUT VECTOR A")
print vector_a
print ("INPUT VECTOR B")
print vector_b
print ("OUTPUT VECTOR RESULT A + B ")
print res_np

To check the result, we use the assert statement. It tests the result and triggers an error if the condition is false:

assert(la.norm(res_np - (vector_a + vector_b))) < 1e-5

Summary

In this article we discussed about Asyncio, GPU programming with Python, PyCUDA, and PyOpenCL.

Resources for Article:


Further resources on this subject: