Python Multithreaded Crawler Details

1. Relationship between program processes and threads

Program: An application is a program, such as: qq, crawler

Process: The smallest unit of resource allocation a program runs.

Many people learn python and don't know where to start.
Many people learn python, learn the basic grammar, and don't know where to start looking for cases.
Many people who have already done cases do not know how to learn more.
So for these three types of people, I will provide you with a good learning platform, free access to video tutorials, e-books, and the source code of the course!
QQ Group: 101677771


A program can have at least one process

Threads: The smallest dispatching unit of a cpu, which must exist depending on a process. A process has at least one thread. Threads do not have independent resources. All threads under a process share all the resources of that process.

A program has at least one process and a process has at least one thread

Understanding multi-threading and multi-process

Multitask Purpose: Make full use of the physical performance of the computer to improve program speed

Single-threaded programs: cpu utilization is low during program run

Encountered various blockages, wait for these situations, then the cpu is idle
There are two ways to improve cpu utilization:

1. cpu has multiple cores, how to make good use of each core multi-process programming,

2. How to improve the utilization rate of a single cpu is through multi-threaded programming

3. Concurrency and Parallelism


Concurrent is that two queues use one coffee machine alternately, and parallel is that two queues use two coffee machines at the same time. If serial, one queue uses one coffee machine

Two solutions:

1. Vertical expansion:

Buy a better cpu and improve your hardware level. Disadvantage: There is always a limit.

2. Horizontal expansion:

Add a computer. Distributed Ideas

Parallel: true simultaneous execution

Parallel implementation in python through multiple processes

Concurrency: The cpu can only run one task at a uniform time. However, the cpu switches between tasks, because there are many time intervals, it can generally be seen that multiple programs execute together
python's multithreaded execution is concurrent

Serial: There is one unit of task execution, physically only one task, one task to execute

Prerequisite: cpython compiler

GIL: Global Interpretation Lock. He lets multiple threads execute at the same time. At the same time, only one thread can get the GIL lock. If the thread gets the lock, the cpu can execute. The purpose is to simplify multi-threaded programming and avoid data confusion between processes.

Disadvantages: Severe constraints on multithreaded execution efficiency
If we use multi-threading and multi-process to maximize the efficiency of our program execution. The utilization rate of cpu is 100%- not enough, how to solve it?

Two solutions:

1. Vertical expansion: https://user-gold-cdn.xitu.io/2020/7/17/1735bf415da0617a?w=6https://user -gold-cdn. Xitu. Io/2020/7/17/1735bf415da0617a? W=692&h=452&f=png&s=7952192&h=452&f=png&s=79521

Buy a better cpu and improve your hardware level. Disadvantages: There's always a limit

2. Horizontal expansion:

Add computers, distributed thinking

4. Thread Creation Methods

There are two options to complete a function:

Complete Thread Creation

1. Use modules that python has already set up.

threadin module

(1) Create a thread

​ t = threading.Thread(

target = what a thread does, it usually only needs to specify a reference to a method.

args = (write parameter list in order) - a tuple

​ )

(2) Start thread

t.start() - Both the thread and the main thread are threads after startup

After startup, he and the main thread decide who executes the cpu-only dispatch queue first

2. Make your own wheels

It's not easy to show that you want to make a car in life

In contrast, it is not easy to implement a thread class yourself in code. But object-oriented has an idea: inheritance makes wheels easy

Inheritance:

  • Subclasses inherit all properties and methods of the parent class that are not private

  • Subclasses override the properties and methods of the parent class, and subclasses have their own. (After the subclass is instantiated, the call to these properties and methods is actually your own schedule)

    For inheriting programming ideas:

    1. What functions do we want to accomplish, we can use this module

    2. When the writing function of this module cannot meet our needs, we can inherit it and have its own characteristics.

    3. If you are not satisfied with that method, rewrite it

    Second way to create threads:

    1. Write a class

    2. Inherit threading.Thread

    3. Override run Method

    Because the underlying run method executes after the thread starts

    4. Instantiating this class is equivalent to creating a thread, an object. start() can start this thread

    Be sure to have the parent init method trigger during thread class use

    If the custom thread class implements the init method, the parent init method must be called manually in the init method to execute

    ​ super().init()

    ​ threading.Thread.init(self)

V. Thread Status

6. Writing mongo using Python

import pymongo
#1. Create a connection
client=pymongo.MongoClient(host='localhost',port=27017)
#2. Connect to a database
db = client['tencent_data']#Create directly if the database does not exist
#3. db is equivalent to a database reference
#True:upsert=true -- update if available, insert if none
db['Recruitment Information'].update({'PostId':item['PostId']},{'$set':item},True)
print(item,'Save successfully!')

7. Multi-threaded implementation ideas

Method 1: Use the method of creating threads

Code implementation:

for i in range(1, 100):
    # Parse_ Page(i) --->Replace with Threads
    t = threading.Thread(target=parse_page, args=(i,))
    t.start()

Method 2: Create a thread class

Code implementation:

import pymongo
import requests, threading
from queue import Queue


class Tencent(threading.Thread):
    def __init__(self, url, name, q_task):
        super().__init__()
        self.base_url = url
        self.name = name
        self.q_task = q_task
        # 1. Create a connection
        self.client = pymongo.MongoClient(host='localhost', port=27017)
        # 2. Connect to a database
        self.db = self.client['tencent_data']  # Create directly if the database does not exist

    def get_json(self, url, page):
        '''
        request ajax. Obtain json data
        :param url:
        :param page:
        :return:
        '''
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',

        }
        params = {
            'timestamp': '1595207127325',
            'countryId': '',
            'cityId': '',
            'bgIds': '',
            'productId': '',
            'categoryId': '',
            'parentCategoryId': '',
            'attrId': '',
            'keyword': '',
            'pageIndex': page,
            'pageSize': '10',
            'language': 'zh-cn',
            'area': 'cn',
        }
        response = requests.get(url, params=params, headers=headers)
        return response.json()

    def write_to_mongo(self, item):

        # 3. db is equivalent to a database reference
        # True:upsert=true -- update if available, insert if none
        self.db['Recruitment Information'].update({'PostId': item['PostId']}, {'$set': item}, True)
        print(item, 'Save successfully!')

    def parse_json(self, json_data):
        '''

        :param json_data:
        :return:
        '''
        for data in json_data['Data']['Posts']:
            self.write_to_mongo(data)

    def parse_page(self, page):
        base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?'
        json_data = self.get_json(base_url, page)
        # print(json_data)
        self.parse_json(json_data)

    def run(self):
        while True:
            if self.q_task.empty():
                break
            # 1. Remove the page number
            page = self.q_task.get()
            print(f'===========No.{page}page====================@{self.name}')
            # 2. Request, Resolution
            self.parse_page(page)


if __name__ == '__main__':
    base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?'
    # 1. Create a task queue
    q_page = Queue()
    # 2. Initialize Task Queue - Page Number
    for i in range(1, 200):
        q_page.put(i)
    # 3. Create thread control switch-list
    crawl_list = ['aa', 'bb', 'cc', 'dd']  # Four threads
    # 4. Loop through the list above to create a thread
    for crawl in crawl_list:
        t = Tencent(base_url, crawl, q_page)
        t.start()

8. Producer-consumer model crawler

What is the producer-consumer model?

The producer-consumer model solves the strong coupling between producer and consumer through a container. Instead of communicating directly with each other, producers and consumers communicate by blocking queues, so producers do not have to wait for consumers to process the data after they have finished producing it, but throw it directly into the blocked queue. Consumers do not look for data from producers, but pull it directly from the blocked queue. The blocked queue acts as a buffer, balancing the processing power of producers and consumers. The producer-consumer model is implemented based on queues below.

Definition: Using the producer and consumer modes in concurrent programming solves the vast majority of concurrent problems. This mode improves the overall data processing speed of the program by balancing the workability of production lines and consumer threads

Problem: Coupling is too high, and this pattern is decoupling

Steps to achieve:

Code implementation:

import pymongo
import requests
from queue import Queue
import threading


class Product(threading.Thread):
    def __init__(self, base_url, q_page, name=None):
        super().__init__()
        self.base_url = base_url
        self.q_page = q_page
        # self.name = name

    def get_json(self, url, page):
        '''
        request ajax. Obtain json data
        :param url:
        :param page:
        :return:
        '''
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',

        }
        params = {
            'timestamp': '1595207127325',
            'countryId': '',
            'cityId': '',
            'bgIds': '',
            'productId': '',
            'categoryId': '',
            'parentCategoryId': '',
            'attrId': '',
            'keyword': '',
            'pageIndex': page,
            'pageSize': '10',
            'language': 'zh-cn',
            'area': 'cn',
        }
        response = requests.get(url, params=params, headers=headers)
        return response.json()

    def run(self):
        while True:
            if self.q_page.empty():
                break
            page = self.q_page.get()
            print(f'Producer Threads:=======in {page} page===============@{self.name}')
            json_data = self.get_json(self.base_url, page)
            # q_json is the public data pool
            q_json.put(json_data)


class Consumer(threading.Thread):
    def __init__(self):
        super().__init__()
        # 1. Create a connection
        self.client = pymongo.MongoClient(host='localhost', port=27017)
        # 2. Connect to a database
        self.db = self.client['tencent_data2']  # Create directly if the database does not exist

    def write_to_mongo(self, item):

        # 3. db is equivalent to a database reference
        # True:upsert=true -- update if available, insert if none
        self.db['Recruitment Information'].update({'PostId': item['PostId']}, {'$set': item}, True)
        print(item, 'Save successfully!')

    def parse_json(self, json_data):
        '''
        :param json_data:
        :return:
        '''
        for data in json_data['Data']['Posts']:
            self.write_to_mongo(data)

    def run(self):
        while True:
            if q_json.empty() and flag:  # What? The key point is that we don't monitor whether the producers are done or not.
                break
            try:
                # 1. Get Data--Get From Pool
                json_data = q_json.get(block=False)
                print(f'Consumer Threads:===@{self.name}==================data:f{json_data}')
                # 2. Resolve Save
                self.parse_json(json_data)
            except Exception:
                continue


if __name__ == '__main__':
    base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?'
    # Polling parameters
    flag = False  # p Not yet off work
    # 1. Create a pool
    q_json = Queue()

    # 2. Create separate threads of p and c to complete the above process.
    # 2.1 p on
    # Initialize Task Pairing
    q_page = Queue()
    for page in range(1, 200):
        q_page.put(page)
    # Save a reference to each thread of the producer.
    crawl_p = []
    for i in range(3):
        t = Product(base_url, q_page)
        t.start()
        crawl_p.append(t)

    # 2.2 Create c
    for i in range(3):
        t = Consumer()
        t.start()

    # 3| Make sure that p is done before flag-->true
    # Block here--monitor if p is done. --- join()
    a = [p.join() for p in crawl_p]
    flag = True
 

Tags: Python

Posted by Mortier on Thu, 19 May 2022 19:39:47 +0300