Python concurrency

 

@Author: damminhtien :whale:

@LastUpdate: 27/12/2020

Python concurrency

Concurrent mechanism in Python (key features from Python Cookbook)

As we know, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome.

Concurrency is always the advanced mechanism in every programming languages. But, it is worth, by understanding this stuff, we can do a lot of things: multi-tasking,avoiding IO-bound, CPU-bound,…

This document includes some key features of concurrency, these implementations are written in Python.

Reference: Python Cookbook: Recipes for Mastering Python 3

0. The theories

What is a Thread?

A thread is a path of execution within a process. A process can contain multiple threads. @Geeks4Greeks

Concurrent vs Parallel

A system is said to be concurrent if it can support two or more actions in progress at the same time. A system is said to be parallel if it can support two or more actions executing simultaneously. The key concept and difference between these definitions is the phrase “ @takuti

Concurrent Execution in Python

There are several differences ways to implement concurrent execution in Python:

GIL

The Python Global Interpreter Lock or GIL, in simple words, is a mutex (or a lock) that allows only one thread to hold the control of the Python interpreter.

This means that only one thread can be in a state of execution at any point in time. @realpython

1. The basics

Start the Thread

The threading library can be used to execute any Python callable in its own thread. To do this, you create a Thread instance and supply the callable that you wish to execute as a target.

import time
from threading import Thread


def coundown(n):
    while n > 0:
        print(n)
        n -= 1
        time.sleep(0.5)


t1 = Thread(target=coundown, args=(1000000,))
t1.start()
t2 = Thread(target=coundown, args=(10,))
t2.start()

Terminate the Thread

Due to a global interpreter lock (GIL), Python threads are restricted to an execution model that only allows one thread to execute in the interpreter at any given time. For this reason, Python threads should generally not be used for computationally intensive tasks where you are trying to achieve parallelism on multiple CPUs. They are much better suited for I/O handling and handling concurrent execution in code that performs blocking operations (e.g., waiting for I/O, waiting for results from a database, etc.).

import time
from threading import Thread


class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print(n)
            n -= 1
            time.sleep(1)


c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
time.sleep(5)
c.terminate()  # Signal termination
t.join()

time.sleep(5)
print('Main thread terminate')

Thread’s status

Event instances are similar to a “sticky” flag that allows threads to wait for something to happen. Initially, an event is set to 0. If the event is unset and a thread waits on the event, it will block (i.e., go to sleep) until the event gets set. A thread that sets the event will wake up all of the threads that happen to be waiting (if any). If a thread waits on an event that has already been set, it merely moves on, continuing to execute.

from threading import Thread, Event
import time


# Code to execute in an independent thread
def countdown(n, started_evt):
    started_evt.set()
    print('Countdown starting')
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)


# Create the event object that will be used to signal startup
started_evt = Event()
# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10, started_evt))
t.start()
# Wait for the thread to start
started_evt.wait()
print('Countdown is running')

Threading condition

Event objects are best used for one-time events. That is, you create an event, threads wait for the event to be set, and once set, the Event is discarded. Although it is possible to clear an event using its clear() method, safely clearing an event and waiting for it to be set again is tricky to coordinate, and can lead to missed events, deadlock, or other problems (in particular, you can’t guarantee that a request to clear an event after setting it will execute before a released thread cycles back to wait on the event again). If a thread is going to repeatedly signal an event over and over, you’re probably better off using a Condition object instead. For example, this code implements a periodic timer that other threads can monitor to see whenever the timer expires:

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()

    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True
        t.start()

    def run(self):
        '''
        Run the timer and notify waiting threads after each interval
        '''
        while True:
            time.sleep(self._interval)
            with self._cv:
                self._flag ^= 1
                self._cv.notify_all()

    def wait_for_tick(self):
        '''
        Wait for the next tick of the timer
        '''
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()


ptimer = PeriodicTimer(5)
ptimer.start()


# Two threads that synchronize on the timer
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1


def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1


threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

Basic semaphore

A critical feature of Event objects is that they wake all waiting threads. If you are writing a program where you only want to wake up a single waiting thread, it is probably better to use a Semaphore or Condition object instead.

import threading
import time


# Worker thread
def worker(n, sema):
    # Wait to be signaled
    sema.acquire()
    # Do some work
    print('Working', n)
    sema.release()


# Create some threads
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()
sema.release()

Thread communication

Perhaps the safest way to send data from one thread to another is to use a Queue from the queue library. We create a Queue instance that is shared by the threads. Threads then use put() or get() operations to add or remove items from the queue.

When using queues, it can be somewhat tricky to coordinate the shutdown of the producer and consumer. A common solution to this problem is to rely on a special sentinel value, which when placed in the queue, causes consumers to terminate.

Thread communication with a queue is a one-way and nondeterministic process. In general, there is no way to know when the receiving thread has actually received a message and worked on it. However, Queue objects do provide some basic completion features, as illustrated by the task_done() and join() methods.

One caution with thread queues is that putting an item in a queue doesn’t make a copy of the item. Thus, communication actually involves passing an object reference between threads. If you are concerned about shared state, it may make sense to only pass im‐ mutable data structures (e.g., integers, strings, or tuples) or to make deep copies of the queued items. out_q.put(copy.deepcopy(data)).

from queue import Queue
from threading import Thread
import random
import time


# Object that signals shutdown
_sentinel = object()


# A thread that produces data
def producer(out_q):
    while True:
        time.sleep(0.01)
        # Produce some data
        data = random.randint(0, 10)
        print('Produce data', data)
        out_q.put(data)
    # Put the sentinel on the queue to indicate completion
    out_q.put(_sentinel)


# A thread that consumes data
def consumer(in_q):
    while True:
        # Get some data
        data = in_q.get()
        print('Receive data', data)

        # Check for termination
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        # Indicate completion
        in_q.task_done()


# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
# Wait for all produced items to be consumed
q.join()

Although queues are the most common thread communication mechanism, you can build your own data structures as long as you add the required locking and synchronization. The most common way to do this is to wrap your data structures with a condition variable. For example, here is how you might build a thread-safe priority queue.

import heapq
import threading


class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._count = 0
        self._cv = threading.Condition()

    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._queue, (-priority, self._count, item))
            self._count += 1
            self._cv.notify()

    def get(self):
        with self._cv:
            while len(self._queue) == 0:
                self._cv.wait()
            return heapq.heappop(self._queue)[-1]

Locking Critical Sections

To make mutable objects safe to use by multiple threads, use Lock objects in the threading library, as shown here:

import threading


class SharedCounter:
    # A counter object that can be shared by multiple threads.
    def __init__(self, initial_value=0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self, delta=1):
        # Increment the counter with locking
        with self._value_lock:
            self._value += delta

    def decr(self, delta=1):
        # Decrement the counter with locking
        with self._value_lock:
            self._value -= delta

Deadlock Avoidance

In multithreaded programs, a common source of deadlock is due to threads that attempt to acquire multiple locks at once. For instance, if a thread acquires the first lock, but then blocks trying to acquire the second lock, that thread can potentially block the progress of other threads and make the program freeze. One solution to deadlock avoidance is to assign each lock in the program a unique number, and to enforce an ordering rule that only allows multiple locks to be acquired in ascending order. The key to this recipe lies in the first statement that sorts the locks according to object identifier. By sorting the locks, they always get acquired in a consistent order regardless of how the user might have provided them to acquire().

import threading
from contextlib import contextmanager

_local = threading.local()


@contextmanager
def acquire(*locks):
    locks = sorted(locks, key=lambda x: id(x))

    acquired = getattr(_local, 'acquired', [])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # Acquire all of the locks
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]


x_lock = threading.Lock()
y_lock = threading.Lock()


def thread_1():
    while True:
        with acquire(x_lock, y_lock):
            print('Thread-1')


def thread_2():
    while True:
        with acquire(y_lock, x_lock):
            print('Thread-2')


t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()