threading module other classes

Lock

When reading and writing data globally, if atomic operations cannot be guaranteed, it is easy to pollute the data.
Such as:

import threading
import time
from threading import Lock

l = [0]
def func():
        num = l[0]
        time.sleep(0.001)
        l[0] = num + 1

threads = []
if __name__ == '__main__':
    for i in range(100):
        thread = threading.Thread(target=func)
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    print(l)

The printed result is [10], which means that the data has been polluted.
The solution is to add lock:

import threading
import time
from threading import Lock

l = [0]
lock = Lock()
def func():
        lock.acquire()
        num = l[0]
        time.sleep(0.001)
        l[0] = num + 1
        lock.acquire()
 
 # You can also use the with statement
 def func2():
        with lock:
            num = l[0]
            time.sleep(0.001)
            l[0] = num + 1

threads = []
if __name__ == '__main__':
    for i in range(100):
        thread = threading.Thread(target=func)
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    print(l)

The result printed after execution is [100]

RLock

The program has multiple locks, or the recursion causes the lock to be unable to be released or acquired.

import threading
import time
from threading import Lock

num = 5
lock = Lock()
def func():
    global num
    with lock:
        if num == 1:
            print(num)
            return
        num -= 1
        print(num)
        func()


threads = []
if __name__ == '__main__':
    for i in range(2):
        thread = threading.Thread(target=func)
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()

Run the program, the program will be blocked, it should enter the recursion after the first layer gets the lock, and the function of the second recursion cannot get the lock unless it is forced to exit.
Solution:
Use RLock locks. (In fact, it is enough to recurse after the lock is cast, here to demonstrate the function of RLock)

import threading
import time
from threading import RLock

num = 5
lock = RLock()
def func():
    global num
    with lock:
        if num == 1:
            return
        num -= 1
        print(num)
        func()


threads = []
if __name__ == '__main__':
    for i in range(2):
        thread = threading.Thread(target=func)
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()

The result of running the program is 4, 3, 2, 1

Condition

When there are many threads, Lock will repeatedly lock and unlock, which consumes a lot of CPU resources. Condition is used to solve this problem.

import time
import threading
import random

num = 1000
class Parents(threading.Thread):
    def __init__(self, condition, thread_name):
        super(Parents, self).__init__()
        self.condition = condition
        self.thread_name = thread_name

    def run(self):
        global num
        while 1:
            self.condition.acquire()
            plus_num = random.randint(0, 200)
            num = num + plus_num
            print('%s has increased%s,The amount is now%s' % (self.thread_name, plus_num, num))
            # Notify all threads that production has ended
            self.condition.notify_all()
            self.condition.release()
            time.sleep(.5)


class Children(threading.Thread):
    def __init__(self, condition, thread_name):
        super(Children, self).__init__()
        self.condition = condition
        self.thread_name = thread_name

    def run(self):
        global num
        while 1:
            self.condition.acquire()
            plus_num = random.randint(200, 500)
            # Every time you connect to a notification, you need to check whether the conditions are met
            while num - plus_num < 0:
                print('%s need to consume%s´╝îInsufficient remaining amount, waiting for production' % (self.thread_name, plus_num))
                # Equal production with parents
                self.condition.wait()
            num = num - plus_num
            print('%s have consumed%s,The amount is now%s' % (self.thread_name, plus_num, num))
            self.condition.release()
            time.sleep(.5)

condition = threading.Condition()
Parents(condition, 'mather').start()
Parents(condition, 'father').start()
Children(condition, 'daughter').start()
Children(condition, 'son').start()

output result

mather 199 has been added,The amount is now 1199
father 63 has been added,The amount is now 1262
daughter 218 have been consumed,The amount is now 1044
son 477 spent,The amount is now 567
father 116 has been added,The amount is now 683
daughter 304 has been consumed,The amount is now 379
mather has increased by 60,The amount is now 439
son 329 spent,The amount is now 110
daughter Need to consume 394, the remaining amount is insufficient, waiting for production
father 74 has been added,The amount is now 184
mather 163 has been added,The amount is now 347
daughter Need to consume 394, the remaining amount is insufficient, waiting for production

Semaphore

Semaphore is similar to Lock. The difference is that Semaphore can have multiple processes set to acquire at the same time. When the acquired process reaches the set number, there must be a process release to have a new process acquire.

import time
import threading
import random


class Sem(threading.Thread):
    def __init__(self, thread_name, semaphore):
        super().__init__()
        self.semaphore = semaphore
        self.thread_name = thread_name

    def run(self):
            self.semaphore.acquire()
            print('%s is runing'% self.thread_name)
            time.sleep(random.randint(2, 5))
            print('%s is end' % self.thread_name)
            self.semaphore.release()

semaphore = threading.Semaphore(2)

for i in range(100):
    Sem(i, semaphore).start()

Some of the results are as follows:

0 is runing
1 is runing
1 is end
0 is end
2 is runing
3 is runing
2 is end
4 is runing
3 is end
5 is runing

It can be seen that the thread must end before a new thread can start.

Event

The Event class can determine the blocking of other processes by setting the Boolean value of the Flag through an event thread. For example, in the famous traffic light event, whether the car process is carried out is determined by the traffic light event process:

import time
import random
import threading
from threading import Event


class Light(threading.Thread):
    def __init__(self, event):
        super().__init__()
        self.event = event

    def red(self):
        print("Traffic-light is red now!")
        # Red light is on, set flag to False
        self.event.clear()

    def green(self):
        print('Traffic-light is green now!')
        # Green light is on, set flag to True
        self.event.set()

    def run(self):
        while 1:
            self.red()
            time.sleep(5)
            self.green()
            time.sleep(20)


class Car(threading.Thread):
    def __init__(self, event, i):
        super(Car, self).__init__()
        self.event = event
        self.i = i

    def run(self):
        if not self.event.is_set():
            # If the flag is Fasle then wait for the red light.
            print('Car %s is waiting' % self.i)
        self.event.wait()
        # pass
        print('Car %s is crossing' % self.i)


event = Event()

Light(event).start()
for i in range(100):
    time.sleep(random.randint(0, 10))
    Car(event, i).start()

Some of the results are as follows:

Traffic-light is red now!
Traffic-light is green now!
Car 0 is crossing
Car 1 is crossing
Car 2 is crossing
Car 3 is crossing
Car 4 is crossing
Traffic-light is red now!
Car 5 is waiting
Car 6 is waiting
Traffic-light is green now!
Car 6 is crossing
Car 5 is crossing

Timer

Timer is a timer, which is used to start threads regularly:

import time
import threading


def task():
    print(time.ctime())

threading.Timer(2.0, task).start()

If you need to start cyclically, you can add a timer to the task, so that a task can be started every 2 seconds.

import time
import threading


def task():
    print(time.ctime())
    threading.Timer(2.0, task).start()
    
threading.Timer(2.0, task).start()

Tags: Python Multithreading thread

Posted by Wynder on Thu, 19 May 2022 05:28:58 +0300