1. Relationship between program processes and threads
Program: An application is a program, such as: qq, crawler
Process: The smallest unit of resource allocation a program runs.
Many people learn python and don't know where to start.
Many people learn python, learn the basic grammar, and don't know where to start looking for cases.
Many people who have already done cases do not know how to learn more.
So for these three types of people, I will provide you with a good learning platform, free access to video tutorials, e-books, and the source code of the course!
QQ Group: 101677771
A program can have at least one process
Threads: The smallest dispatching unit of a cpu, which must exist depending on a process. A process has at least one thread. Threads do not have independent resources. All threads under a process share all the resources of that process.
A program has at least one process and a process has at least one thread
Understanding multi-threading and multi-process
Multitask Purpose: Make full use of the physical performance of the computer to improve program speed
Single-threaded programs: cpu utilization is low during program run
Encountered various blockages, wait for these situations, then the cpu is idle
There are two ways to improve cpu utilization:
1. cpu has multiple cores, how to make good use of each core multi-process programming,
2. How to improve the utilization rate of a single cpu is through multi-threaded programming
3. Concurrency and Parallelism
Concurrent is that two queues use one coffee machine alternately, and parallel is that two queues use two coffee machines at the same time. If serial, one queue uses one coffee machine
Two solutions:
1. Vertical expansion:
Buy a better cpu and improve your hardware level. Disadvantage: There is always a limit.
2. Horizontal expansion:
Add a computer. Distributed Ideas
Parallel: true simultaneous execution
Parallel implementation in python through multiple processes
Concurrency: The cpu can only run one task at a uniform time. However, the cpu switches between tasks, because there are many time intervals, it can generally be seen that multiple programs execute together
python's multithreaded execution is concurrent
Serial: There is one unit of task execution, physically only one task, one task to execute
Prerequisite: cpython compiler
GIL: Global Interpretation Lock. He lets multiple threads execute at the same time. At the same time, only one thread can get the GIL lock. If the thread gets the lock, the cpu can execute. The purpose is to simplify multi-threaded programming and avoid data confusion between processes.
Disadvantages: Severe constraints on multithreaded execution efficiency
If we use multi-threading and multi-process to maximize the efficiency of our program execution. The utilization rate of cpu is 100%- not enough, how to solve it?
Two solutions:
1. Vertical expansion: https://user-gold-cdn.xitu.io/2020/7/17/1735bf415da0617a?w=6https://user -gold-cdn. Xitu. Io/2020/7/17/1735bf415da0617a? W=692&h=452&f=png&s=7952192&h=452&f=png&s=79521
Buy a better cpu and improve your hardware level. Disadvantages: There's always a limit
2. Horizontal expansion:
Add computers, distributed thinking
4. Thread Creation Methods
There are two options to complete a function:
Complete Thread Creation
1. Use modules that python has already set up.
threadin module
(1) Create a thread
t = threading.Thread(
target = what a thread does, it usually only needs to specify a reference to a method.
args = (write parameter list in order) - a tuple
)
(2) Start thread
t.start() - Both the thread and the main thread are threads after startup
After startup, he and the main thread decide who executes the cpu-only dispatch queue first
2. Make your own wheels
It's not easy to show that you want to make a car in life
In contrast, it is not easy to implement a thread class yourself in code. But object-oriented has an idea: inheritance makes wheels easy
Inheritance:
-
Subclasses inherit all properties and methods of the parent class that are not private
-
Subclasses override the properties and methods of the parent class, and subclasses have their own. (After the subclass is instantiated, the call to these properties and methods is actually your own schedule)
For inheriting programming ideas:
1. What functions do we want to accomplish, we can use this module
2. When the writing function of this module cannot meet our needs, we can inherit it and have its own characteristics.
3. If you are not satisfied with that method, rewrite it
Second way to create threads:
1. Write a class
2. Inherit threading.Thread
3. Override run Method
Because the underlying run method executes after the thread starts
4. Instantiating this class is equivalent to creating a thread, an object. start() can start this thread
Be sure to have the parent init method trigger during thread class use
If the custom thread class implements the init method, the parent init method must be called manually in the init method to execute
super().init()
threading.Thread.init(self)
V. Thread Status
6. Writing mongo using Python
import pymongo #1. Create a connection client=pymongo.MongoClient(host='localhost',port=27017) #2. Connect to a database db = client['tencent_data']#Create directly if the database does not exist #3. db is equivalent to a database reference #True:upsert=true -- update if available, insert if none db['Recruitment Information'].update({'PostId':item['PostId']},{'$set':item},True) print(item,'Save successfully!')
7. Multi-threaded implementation ideas
Method 1: Use the method of creating threads
Code implementation:
for i in range(1, 100): # Parse_ Page(i) --->Replace with Threads t = threading.Thread(target=parse_page, args=(i,)) t.start()
Method 2: Create a thread class
Code implementation:
import pymongo import requests, threading from queue import Queue class Tencent(threading.Thread): def __init__(self, url, name, q_task): super().__init__() self.base_url = url self.name = name self.q_task = q_task # 1. Create a connection self.client = pymongo.MongoClient(host='localhost', port=27017) # 2. Connect to a database self.db = self.client['tencent_data'] # Create directly if the database does not exist def get_json(self, url, page): ''' request ajax. Obtain json data :param url: :param page: :return: ''' headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36', } params = { 'timestamp': '1595207127325', 'countryId': '', 'cityId': '', 'bgIds': '', 'productId': '', 'categoryId': '', 'parentCategoryId': '', 'attrId': '', 'keyword': '', 'pageIndex': page, 'pageSize': '10', 'language': 'zh-cn', 'area': 'cn', } response = requests.get(url, params=params, headers=headers) return response.json() def write_to_mongo(self, item): # 3. db is equivalent to a database reference # True:upsert=true -- update if available, insert if none self.db['Recruitment Information'].update({'PostId': item['PostId']}, {'$set': item}, True) print(item, 'Save successfully!') def parse_json(self, json_data): ''' :param json_data: :return: ''' for data in json_data['Data']['Posts']: self.write_to_mongo(data) def parse_page(self, page): base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?' json_data = self.get_json(base_url, page) # print(json_data) self.parse_json(json_data) def run(self): while True: if self.q_task.empty(): break # 1. Remove the page number page = self.q_task.get() print(f'===========No.{page}page====================@{self.name}') # 2. Request, Resolution self.parse_page(page) if __name__ == '__main__': base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?' # 1. Create a task queue q_page = Queue() # 2. Initialize Task Queue - Page Number for i in range(1, 200): q_page.put(i) # 3. Create thread control switch-list crawl_list = ['aa', 'bb', 'cc', 'dd'] # Four threads # 4. Loop through the list above to create a thread for crawl in crawl_list: t = Tencent(base_url, crawl, q_page) t.start()
8. Producer-consumer model crawler
What is the producer-consumer model?
The producer-consumer model solves the strong coupling between producer and consumer through a container. Instead of communicating directly with each other, producers and consumers communicate by blocking queues, so producers do not have to wait for consumers to process the data after they have finished producing it, but throw it directly into the blocked queue. Consumers do not look for data from producers, but pull it directly from the blocked queue. The blocked queue acts as a buffer, balancing the processing power of producers and consumers. The producer-consumer model is implemented based on queues below.
Definition: Using the producer and consumer modes in concurrent programming solves the vast majority of concurrent problems. This mode improves the overall data processing speed of the program by balancing the workability of production lines and consumer threads
Problem: Coupling is too high, and this pattern is decoupling
Steps to achieve:
Code implementation:
import pymongo import requests from queue import Queue import threading class Product(threading.Thread): def __init__(self, base_url, q_page, name=None): super().__init__() self.base_url = base_url self.q_page = q_page # self.name = name def get_json(self, url, page): ''' request ajax. Obtain json data :param url: :param page: :return: ''' headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36', } params = { 'timestamp': '1595207127325', 'countryId': '', 'cityId': '', 'bgIds': '', 'productId': '', 'categoryId': '', 'parentCategoryId': '', 'attrId': '', 'keyword': '', 'pageIndex': page, 'pageSize': '10', 'language': 'zh-cn', 'area': 'cn', } response = requests.get(url, params=params, headers=headers) return response.json() def run(self): while True: if self.q_page.empty(): break page = self.q_page.get() print(f'Producer Threads:=======in {page} page===============@{self.name}') json_data = self.get_json(self.base_url, page) # q_json is the public data pool q_json.put(json_data) class Consumer(threading.Thread): def __init__(self): super().__init__() # 1. Create a connection self.client = pymongo.MongoClient(host='localhost', port=27017) # 2. Connect to a database self.db = self.client['tencent_data2'] # Create directly if the database does not exist def write_to_mongo(self, item): # 3. db is equivalent to a database reference # True:upsert=true -- update if available, insert if none self.db['Recruitment Information'].update({'PostId': item['PostId']}, {'$set': item}, True) print(item, 'Save successfully!') def parse_json(self, json_data): ''' :param json_data: :return: ''' for data in json_data['Data']['Posts']: self.write_to_mongo(data) def run(self): while True: if q_json.empty() and flag: # What? The key point is that we don't monitor whether the producers are done or not. break try: # 1. Get Data--Get From Pool json_data = q_json.get(block=False) print(f'Consumer Threads:===@{self.name}==================data:f{json_data}') # 2. Resolve Save self.parse_json(json_data) except Exception: continue if __name__ == '__main__': base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?' # Polling parameters flag = False # p Not yet off work # 1. Create a pool q_json = Queue() # 2. Create separate threads of p and c to complete the above process. # 2.1 p on # Initialize Task Pairing q_page = Queue() for page in range(1, 200): q_page.put(page) # Save a reference to each thread of the producer. crawl_p = [] for i in range(3): t = Product(base_url, q_page) t.start() crawl_p.append(t) # 2.2 Create c for i in range(3): t = Consumer() t.start() # 3| Make sure that p is done before flag-->true # Block here--monitor if p is done. --- join() a = [p.join() for p in crawl_p] flag = True