[Python] Introduction to Multithreading and Synchronization


Thread is the smallest unit that the operating system can perform operation scheduling; the thread itself does not own system resources, but only has a few resources that are necessary for operation, but it can share the process owned by other threads belonging to the same process. All resources.

thread implementation

There are two ways of threading in Python: functions or classes that wrap thread objects. The threading module contains rich multi-threading support functions:

  • threading.currentThread(): returns the current thread;
  • threading.enumerate(): returns a list containing running threads;
  • threading.activeCount(): Returns the number of running threads, equivalent to len(threading.enumerate()).

Thread class

Threads are handled by the Thread class, some of the methods provided in the class:

  • run(): The method used to represent the execution of the thread (can be overloaded to implement the actual function);
  • start(): start the thread;
  • join([time]): wait for the thread to terminate (or timeout);
  • isAlive(): Returns whether the thread is alive;
  • getName(): returns the thread name;
  • setName(): Set the thread name;
  • setDaemon(True): Set as a background process (must be set before the start call).

functional way

Construct the thread directly through Thread, and then start the thread through the start method:
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)
Description of each parameter:

  • group: the thread group to which the specified thread belongs (currently ignored);
  • target: specifies the target method to be scheduled by the thread (that is, the function that implements the function);
  • args: the parameters passed to the target method (in the form of a tuple);
  • kwargs: parameters passed to the target method (in the form of a dictionary);
  • daemon: Specifies whether the thread is a background thread.
def simpleRoutine(name, delay):
    print(f"routine {name} starting...")
    time.sleep(delay)
    print(f"routine {name} finished")


if __name__ == '__main__':
    thrOne = threading.Thread(target=simpleRoutine, args=("First", 1))
    thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2))
    
    thrOne.start()
    thrTwo.start()
    thrOne.join()
    thrTwo.join()

Inheritance

Directly inherit Thread and create a new subclass (mainly implement the run method):

class SimpleThread (threading.Thread):
    def __init__(self, name, delay):
        threading.Thread.__init__(self)
        self.name = name
        self.delay = delay

    def run(self):
        print(f"thread {self.name} starting...")
        time.sleep(self.delay)
        print(f"thread {self.name} finished")


if __name__ == '__main__':
    thrOne = SimpleThread("First", 2)
    thrTwo = SimpleThread("Second", 2)
    thrOne.start()
    thrTwo.start()

    thrOne.join()
    thrTwo.join()

synchronization mechanism

Dirty data may occur when multiple threads modify the same data at the same time; therefore, thread locks are required, that is, only one thread is allowed to perform operations at the same time.

Sync Lock

threading provides two classes, Lock and RLock (reentrant locks), which both provide the following two methods to lock and release locks:

  • acquire(blocking=True, timeout=-1): lock, where the timeout parameter specifies how many seconds to lock.
  • release(): Release the lock.

There are two ways to use locks:

gCount = 0

def PlusOne(locker):
    global gCount
      with locker:
          gCount += 1,

def MinusOne(locker):
    global gCount
      if locker.acquire():
          gCount -= 1
          locker.release()

Condition variable

The Condition object internally maintains a lock (a Lock/RLock object can be passed during construction, otherwise an RLock will be created internally) and a waiting pool:

  • Obtain the Condition object through acquire;
  • When the wait method is called, the thread will release the lock inside the Condition and enter the blocked state, and record the thread in the waiting pool;
  • When the notify method is called, the Condition object will pick a thread from the waiting pool and notify it to call the acquire method to try to acquire the lock.

Condition object:

  • __init__(self,lock=None): The Condition class is always associated with a lock (if the lock parameter is not specified, an RLock object bound to it will be automatically created);
  • acquire(timeout): call the acquire() method of the associated lock;
  • release(): call the release() method of the associated lock
  • wait(timeout): The thread is suspended and will not be woken up until a notify notification or timeout is received; it must be called on the premise that the lock has been acquired;
  • notify(n=1): Wake up n waiting threads in the waiting pool and notify it:
    • The thread that receives the notification will automatically call the acquire() method to try to lock;
    • If there are multiple threads in the waiting pool, randomly select n wakeups;
    • Must be called if the lock has been acquired, otherwise an error will be raised.
  • notify_all(): Notify all threads.
class Producer(threading.Thread):
    def __init__(self, cond, storage):
        threading.Thread.__init__(self)
        self.cond = cond
        self.storage = storage

    def run(self):
        label = 1
        while True:
            with self.cond:
                if len(self.storage) < 10:
                    self.storage.append(label)
                    print(f"<- Produce {label} product")
                    label += 1
                    self.cond.notify(2)
                else:
                    print(f"<- storage full: Has Produced {label - 1} product")
                    self.cond.notify_all()
                    self.cond.wait()

                time.sleep(0.4)


class Consumer(threading.Thread):
    def __init__(self, name, cond, storage):
        threading.Thread.__init__(self)
        self.name = name
        self.cond = cond
        self.storage = storage

    def run(self):
        while True:
            if self.cond.acquire():
                if len(self.storage) > 1:
                    pro = self.storage.pop(0)
                    print(f"-> {self.name} consumed {pro}")
                    self.cond.notify()
                else:
                    print(f"-> {self.name} storage empty: no product to consume")
                    self.cond.wait()

                self.cond.release()
                time.sleep(1)

Semaphore

The semaphore object internally maintains a counter:

  • When acquire(blocking=True,timeout=None), subtract 1, and when the count is 0, block the requested thread;
  • Add 1 when release(), and resume the blocked thread when the count is greater than 0;

There are two semaphores, Semaphore and BoundedSemaphore in threading; BoundedSemaphore limits the number of releases, and the value of the counter cannot be greater than the initial value at any time (the value of the counter will be detected during release, and if it is greater than or equal to the initial value, a ValueError exception will be thrown ).

Maintain production (release one) and consumption (acquire one) through Semaphore:

# products = threading.Semaphore(0)

def produceOne(label, sem: threading.Semaphore):
    sem.release()
    print(f"{label} produce one")

def consumeOne(label, sem: threading.Semaphore):
    sem.acquire()
    print(f"{label} consume one")

Control the number of concurrency through BoundedSemaphore (up to the number of threads concurrency with the initial value of Semaphore):

# runner = threading.BoundedSemaphore(3)

def runBound(name, sem: threading.BoundedSemaphore):
    with sem:
        print(f"{name} is running")
        time.sleep(1)
        print(f"{name} finished")

Event Event

There is a flag field inside the event object, which is used for the thread to wait for the occurrence of the event:

  • isSet(): Returns the state value of the event;
  • wait(): When the status is False, it will always block; otherwise, it will return immediately;
  • set(): Set the status value to True to activate all blocked threads;
  • clear(): The restore state value is False.

Multithreading waits for an event to occur, then starts executing:

def waiters(name, evt: threading.Event):
    evt.wait()
    print(f"{name} is running")
    time.sleep(1)
    print(f"{name} finished")

def starting(evt: threading.Event):
    evt.set()
    print("event is set")

Barrier

The barrier is used to set the number of waiting threads. When the number reaches the specified value, the execution starts:
threading.Barrier(parties, action=None, timeout=None)

Barrier properties and methods:

  • wait(timeout=None): waiting to pass the barrier; the thread is blocked until the number of blocked parties reaches the number of parties, the blocked threads are all released at the same time;
  • reset(): reset the barrier to the default empty state;
  • abort(): put the barrier in the disconnected state; cause the waiting thread to raise a BrokenBarrierError exception;
  • partner(): the number of threads required to pass the barrier;
  • n_waiting(): The number of threads currently waiting in the barrier;
  • broken(): Returns True if the barrier is in a broken state.
def waitBarrier(name, barr: threading.Barrier):
    print(f"{name} waiting for open")
    try:
        barr.wait()
        print(f"{name} running")
        time.sleep(5)
    except threading.BrokenBarrierError:
        print(f"{name} exception")
    print(f"{name} finished")

GIL global interpreter lock

GIL (Global Interpreter Lock, Global Interpreter Lock); In cpython, if a thread wants to execute, it must first obtain the GIL (the GIL can be regarded as a "passport"). Every time the GIL lock is released, threads must compete for locks, and switching threads will consume resources.

Due to the existence of the GIL lock, a process in python can always execute only one thread at the same time (the thread that got the GIL), which is why the multi-threading efficiency of python is not high on multi-core CPU s:

  • CPU-intensive code: due to a lot of computing work, the time slice will be used up quickly, and then the release and re-competition of the GIL will be triggered;
  • IO-intensive code (file processing, web crawler, etc.): Multi-threading can effectively improve efficiency (IO operations under a single thread will cause IO waiting, causing unnecessary waste of time, and enabling multi-threading can automatically wait for thread A when thread A is waiting. Switching to thread B can not waste CPU resources, thereby improving program execution efficiency).

When python uses multithreading, it calls the native thread of c language:

  • get public data
  • Apply for GIL
  • The python interpreter calls the os native thread
  • os operates cpu to perform operations
  • When the thread execution time is up, it will switch (context switch)

Tags: Python Multithreading

Posted by VLE79E on Tue, 04 Oct 2022 14:22:42 +0300