[daily] example of communication between multiprocessing and multi process crawler queue

preface

Summarizes the use of multiprocessing library. Previously found Western thorn agency After hanging up, many of the previously written multi process crawler modules can't be used. Recently yy told me that he found that the original Xi thorn changed its name to Zhang Pi Sila agent , the interface has been improved a lot, and some tutorials have been added to enrich the content, which is actually a curve to save the country.

When I wrote this multi process crawler before, I didn't write it very well. The idea is to allocate roughly the same number of IPS to each process, and then each process runs independently with the assigned IP. If the quality of the IP assigned to some processes is very poor, the process will soon find that no IP can be used, and then quickly enter the zombie state. In the end, only one or two processes may be alive, and the efficiency is actually very poor. Moreover, in order to protect the network that benefits the people, such as Sila agent, it is not very interesting to visit it frequently (in fact, the access frequency of this network is very high. There must be a large number of crawlers catching its IP address every day, which is easy to squeeze in).

Therefore, the improved version of multi process chooses to use a Queue (Queue module in Section 3 of this article). Each process takes IP from the Queue. When the length of the Queue is lower than a certain threshold, it will call XiLaProxy module (XiLaProxy module code is a complete example of multi process crawler project in section 7, which is a class to obtain IP on Xila proxy) to call a batch of IP to fill the Queue.

After the test, the efficiency will be very high. Theoretically, all processes can be in a very healthy state. In Section 7, we still take the brushing traffic as an example (however, it has recently been found that CSDN can no longer improve the score through brushing traffic. Even through proxy IP, it can not improve the score. Is it true that the score below 10000 reads in the rule and 100 reads = 1 is invalid?).

Here is a common problem in multi process:

  • Firstly, it is impossible to run multi process code in jupyter notebook;
  • Secondly, the process created in the ordinary script and the parts of start and join must be written in the main function (i.e. in if _name _ = = '_main _'); Otherwise, an error will be reported. At least if you use the process pool, an error will be reported;
  • Then, if multiple processes are written in a class, the objective function of multiple processes cannot be used as a sub function of class functions. For example:
    from multiprocessing import Process
    
    class A:
    	def __init__(self):
    		pass
    
    	def test(self):
    		def f1(x):
    			print(x)
    			
    		def f2(x):
    			print(x)
    			
    		p1 = Process(target=f1, args=(1, ))
    		p2 = Process(target=f2, args=(2, ))
    		p1.start()
    		p2.start()
    		p1.join()
    		p2.join()
    
    if __name__ == '__main__':
    	a = A()
    	a.test()
    
    • The above code will report an error when executing test (OSError may sometimes report an access denied error, but the previous AttributeError will not change)
    ...
    AttributeError: Can't pickle local object 'A.test.<locals>.f1'
    ...
    OSError: [WinError 87] Parameter error.
    
    • In fact, the reason is the same as that the creation process must be written in the main function, and start and join are the same; If you write this part of code at the same level as the objective function, it is essentially the same as this writing method:
    from multiprocessing import Process
    
    def f1(x):
    	print(x)
    	
    def f2(x):
    	print(x)
    	
    p1 = Process(target=f1, args=(1, ))
    p2 = Process(target=f2, args=(2, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    • If you write this, you will still report an error:
    RuntimeError:
    		An attempt has been made to start a new process before the
    		current process has finished its bootstrapping phase.
    
    		This probably means that you are not using fork to start your
    		child processes and you have forgotten to use the proper idiom
    		in the main module:
    
    			if __name__ == '__main__':
    				freeze_support()
    				...
    
    		The "freeze_support()" line can be omitted if the program
    		is not going to be frozen to produce an executable.
    
    • Change to this:
    from multiprocessing import Process
    
    def f1(x):
    	print(x)
    	
    def f2(x):
    	print(x)
    
    if __name__ == '__main__':
    	p1 = Process(target=f1, args=(1, ))
    	p2 = Process(target=f2, args=(2, ))
    	p1.start()
    	p2.start()
    	p1.join()
    	p2.join()
    
    • Therefore, at the beginning, write multiple processes in the class function. The objective function cannot be defined as a sub function of the class function. It must be rewritten as follows:
    from multiprocessing import Process
    
    class A:
    	def __init__(self):
    		pass
    
    	def f1(self, x):
    		print(x)
    		
    	def f2(self, x):
    		print(x)
    
    	def test(self):
    		p1 = Process(target=self.f1, args=(1, ))
    		p2 = Process(target=self.f2, args=(2, ))
    		p1.start()
    		p2.start()
    		p1.join()
    		p2.join()
    
    if __name__ == '__main__':
    	a = A()
    	a.test()	
    
    • However, in this way, you will find that many functions unrelated to the class will be written in the class, which will actually make the class very messy and ugly. You can consider moving f1 and f2 outside the class, but in this case, they can't call class properties and methods, so it may be a difficult thing to achieve both. In fact, it seems that there are two inexplicable f1 and f2 functions in class A, which looks like a stomachache;

In the example in Section 7 of this article, you will see the same processing problem. Taking 16 processes opened by 8-core CPU as an example, one process is allocated to the crawler that obtains Sila proxy IP, and the other 15 processes are all used as proxy multi process crawler. Here, two objective functions will be involved. The author chooses to write them in CSDN class, although it doesn't look very neat

1 Process module

The Process module is used to define the Process;

  1. class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
  • Instantiation object: process = Process(target=f, args=(1,);
  • Object method:
    • process.run(): run the process directly, that is, execute f(1);
    • process.start(): the process is ready for scheduling;
    • process.terminate(): terminate the process (send SIGTERM signal);
    • process.kill(): kill the process (send SIGKILL signal);
    • process.close(): close the process;
      • If the current process of the process has a running child process, the call will report an error;
    • process.join(timeout=None):
      • If timeout is None, the method will block until the process whose method is called by join() terminates;
      • If timeout is a positive number, it will block timeout seconds at most;
      • If the process of the method terminates or the method times out, the method is returned;
      • You can check the process of the process Exitcode to determine whether it is terminated;
      • The so-called blocking means that other processes will not work until the method is completely executed. Non blocking means that different processes can be executed asynchronously (effectively at the same time);
    • process.name: process name;
    • process.is_alive(): whether the process is alive;
      • From the moment when the start() method returns to the moment when the child process terminates, the process object will be active;
    • process.daemon: daemon flag of the process;
      • The default value is False;
      • Must be set before the start() call;
      • When it is set to True, all child processes will be terminated if it attempts to exit;
    • process.pid: process ID;
    • process.exitcode: exit code of the process;
      • If the process has not been terminated, return None;
      • Otherwise, - n is returned, indicating that the process is terminated by signal N;
  1. About process Examples of comparison of daemon:
  • Code example:
    import os
    import time
    from multiprocessing import Process
    
    def child():
    	print('module name: ', __name__)
    	print('parent process: ', os.getppid())
    	print('process id: ', os.getpid())
    
    def parent(name):
    	child()
    	time.sleep(3)
    	print('hello', name)
    
    if __name__ == '__main__':
    	process = Process(target=parent, args=('caoyang', ))
    	#process.daemon = True
    	print('process.daemon: ', process.daemon)
    	process.start()
    	process.join(1)
    	print('name: ', process.name)
    	print('is_alive: ', process.is_alive())
    	print('exitcode: ', process.exitcode)
    
    • Output result:
    process.daemon:  False
    module name:  __mp_main__
    parent process:  8896
    process id:  12480
    name:  Process-1
    is_alive:  True
    exitcode:  None
    hello caoyang
    
    • Put the process in the code Daemon = true uncomment, the output becomes:
    process.daemon:  True
    module name:  __mp_main__
    parent process:  11720
    process id:  7836
    hello caoyang
    name:  Process-1
    is_alive:  False
    exitcode:  0
    
    • It can be found that if the daemon is set, the child process will be terminated (process.exitcode changes from None to 0). Because the child process is terminated, print('hello', name) will not get execution output;

2 Pool module

The Pool module is a repository for managing processes;

  1. class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • Parameter Description:
    • Processes: refers to the total number of processes. It is understood as the maximum capacity of the process pool. The default value is OS cpu_ count();
    • Initializer: initializer, without changing the default None;
    • maxtasksperchild: the number of tasks that can be completed before the work process exits. The default value is None, that is, the process pool will work endlessly and charge as much as it comes;
    • Context: used to formulate the context when the work process is started; No setting required;
  • Instantiated object: pool = Pool(processes=16);
  • Add tasks to the process pool:
    • pool.apply(f, args=(1,): this will block, that is, simply execute the tasks added to the process pool one by one;
    • pool.apply_async(f, args=(1,): This is non blocking, that is, tasks are executed in parallel, and the effect is to run in batches;
  • Map tasks directly to the process pool:
    • pool.map(f, range(500)): execute f(0) to f(499); Will block, that is, it runs one by one;
    • pool.map_async(f, range(500)): execute f(0) to f(499); Non blocking, that is, parallel;
  • Close process pool: pool Close() or pool terminate();
    • close(): prevent any more tasks from being submitted to the pool. Once all tasks are completed, the work process will exit;
    • Terminate(): stop the work process immediately without completing the unfinished work; When the pool object is garbage collected, terminate() will be called immediately;
  • Start process pool: pool join();
    • You must execute pool before starting Close() or pool terminate();
  • Code example:
    • Add tasks one by one:
    import os
    import time
    from multiprocessing import Process, Pool
    
    def f(n):
    	print(n)
    	time.sleep(3)
    
    if __name__=="__main__":
    	pool = Pool(processes=16)
    	for i in range(500):
    		pool.apply_async(f, args=(i, ))
    	pool.close()
    	pool.join()
    
    • Mapping tasks:
    import os
    import time
    from multiprocessing import Process, Pool
    
    def f(n):
    	print(n)
    	time.sleep(3)
    
    if __name__=="__main__":
    	pool = Pool(processes=16)
    	pool.map_async(f, range(500))
    	pool.close()
    	pool.join()
    
    • When you run the code, you can find that if you add tasks one by one, they will basically be executed in the order of addition, and the output of mapping tasks will become very random;

3 Queue module

The Queue module is a Queue for storing shared data;

  1. class multiprocessing.Queue(maxsize):
  • Parameter maxsize is the maximum length of the queue; Use queue = Queue(100) to generate a multi process queue;
  • Common methods:
    • queue.put(obj, block=True, timeout=None): add data obj to the queue;
      • The default value of parameter block is True, that is, it is blocked by default when it is put into the queue, that is, it needs to be put in one by one;
      • The parameter timeout defines the maximum blocking time. The default value is None, i.e. infinity. You must wait for the previous data to be put into the queue before you can put it in the next time;
      • Note that if the correction is non blocking (block=False), it is necessary to prevent the error reported when the team is full;
    • queue.get(block=True, timeout=None): read the data in the queue. It is also blocked by default. The meaning of the parameter is similar to that of put;
    • queue.qsize(): get the current queue length;
  • A simple example of put and get:
    import os
    import time
    import random
    from multiprocessing import Process, Queue
    
    def write(q,urls):
    	print('Process({}) is writing...'.format(os.getpid()))
    	for url in urls:
    		q.put(url)
    		print('Put {} to queue...'.format(url), q.qsize())
    
    def read(q):
    	print('Process({}) is reading...'.format(os.getpid()))
    	while True:
    		url = q.get(True)
    		print('Get {} from queue.'.format(url), q.qsize())		
    		
    if __name__=='__main__':
    	q = Queue()
    	
    	writer1 = Process(target=write, args=(q, ['url1', 'url2', 'url3']))
    	writer2 = Process(target=write, args=(q, ['url4', 'url5', 'url6']))
    	
    	reader = Process(target=read, args=(q,))
    	
    	writer1.start()
    	writer2.start()
    	reader.start()
    	
    	writer1.join()
    	writer1.join()
    	
    	reader.terminate()	
    
    • Note that the read() function is written in an endless loop, and you need to use terminate() to forcibly terminate the process reader;
    • Output result:
    Process(9776) is writing...
    Put url1 to queue...
    Process(13196) is writing...
    Put url4 to queue...
    Process(13892) is reading...
    Get url1 from queue.
    Get url4 from queue.
    Put url5 to queue...
    Get url5 from queue.
    Put url6 to queue...
    Get url6 from queue.
    Put url2 to queue...
    Get url2 from queue.
    Put url3 to queue...
    Get url3 from queue.
    

4 Pipe module

The Pipe module is used for communication between two processes, which are located at both ends of the pipeline;

  1. One way data transmission code example:
from multiprocessing import Process, Pipe

def send(pipe):
	pipe.send([1, 2, 3, 4, 5])
	pipe.close()

if __name__ == '__main__':
	con1, con2 = Pipe()
	sender = Process(target=send, args=(con1, ))  
	sender.start()                            
	print("con2 got: {}".format(con2.recv()))          
	con2.close()
  1. Code example of two-way simultaneous sending and receiving data:
from multiprocessing import Process, Pipe

def talk(pipe):
	pipe.send({'name': 'Bob', 'spam': 42})          
	reply = pipe.recv()                         
	print('talker got: {}'.format(reply))

if __name__ == '__main__':
	parentEnd, childEnd = Pipe()                 
	child = Process(target=talk, args=(childEnd, )) 
	child.start()                                  
	print('parent got: {}'.format(parentEnd.recv()))          
	parentEnd.send({x * 2 for x in 'spam'})    
	child.join()                               
	print('parent exit.')
  • Output result:
    parent got: {'name': 'Bob', 'spam': 42}
    talker got: {'aa', 'pp', 'mm', 'ss'}
    parent exit.
    

5 Lock module

The Lock module itself is a custom block; Add blocking wherever you want;

  1. Do not use locks for synchronization: the two processes will constantly compete for variables, and then may modify the shared variables v at the same time;
import multiprocessing
import time

def job(v, num):
    for i in range(5):
        time.sleep(0.1)
        v.value += num 
        print(v.value, end=",")

def multicore():
    v = multiprocessing.Value('i', 0)  
    p1 = multiprocessing.Process(target=job, args=(v, 1))
    p2 = multiprocessing.Process(target=job, args=(v, 3))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()
  • Output results: 3, 7, 11, 15, 19, 4, 8, 12, 16, 20,;
  • Logically, it may be + 3 or + 1, but you will find that variables may be added wildly and disorderly;
  1. Synchronize with lock:
import multiprocessing
import time
# lock = multiprocessing.Lock()
lock = multiprocessing.RLock()
def job(v, num,lock):
    lock.acquire()
    for _ in range(5):
        time.sleep(0.1)
        v.value += num 
        print(v.value, end=", ")
    lock.release()

def multicore():
    v = multiprocessing.Value('i', 0) 
    p1 = multiprocessing.Process(target=job, args=(v, 1, lock))
    p2 = multiprocessing.Process(target=job, args=(v, 3, lock)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
	multicore()
  • Output results: 1, 2, 3, 4, 5, 8, 11, 14, 17, 20,
  1. The difference between Lock and RLock:
  • The former must be release d before acquiring again; If the following code is not feasible, deadlock will occur:
    lock.acquire()
    lock.acquire() # deadlock
    lock.release()
    lock.release()
    
  • The latter can acquire continuously and then release successively, as long as the two quantities are equal; That is, the above form of code is feasible;

6 Manager module

The Manager module mainly encapsulates more types of process shared data. The most commonly used are list and dict. In fact, these two are almost available;
Note that when instantiating the object of the Manager class, you must do it in the main() function, otherwise RuntimeError will be reported;

  1. Take dict as an example and give a simple code example. The list can be completely copied:
import time
from multiprocessing import Process, Manager

def f(mdict, key, value):
	mdict[key] = value
 
if __name__ == '__main__':
	manager = Manager()
	mdict = manager.dict()
	processes = [Process(target=f, args=(mdict, i, i**2)) for i in range(10)]
	for processe in processes:
		processe.start()
	for processe in processes:
		processe.join()
	print ('Results: ')
	for key, value in dict(mdict).items():
		print("{}: {}".format(key, value))
  • Output result:
    Results:
    1: 1
    0: 0
    2: 4
    3: 9
    4: 16
    5: 25
    6: 36
    7: 49
    9: 81
    8: 64
    

7 a complete multi process crawler project example

  1. proxy.py
# -*- coding: UTF-8 -*-
# @author: caoyang
# @email: caoyang@163.sufe.edu.cn

import math
import pandas
import requests

from bs4 import BeautifulSoup

class XiLaProxy(object):
	"""
	http://www.xiladaili.com/
	"""
	def __init__(self) -> None:
		
		self.index_url = 'http://www.xiladaili.com'
		self.total_proxy_per_page = 50									 # There are exactly 50 proxies on one page. Please modifiy the number if the page source changes in the future.		
		self.proxy_list_urls = {
			'gaoni': self.index_url + '/gaoni',
			'https': self.index_url + '/https',
			'http': self.index_url + '/http',
		}
		self.headers = {												 # Request headers.
			'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0',
		}

	def get_proxy_table(self, proxy_type: str='https', total: str=50, export_path: str=None) -> list:
		"""
		Request for the proxy table on xiladaili website.
		You can call ```table_df.values[:, 0].tolist()```to get the proxy list.
		
		:param proxy_type: range from values {'gaoni', 'https', 'http'}.
		:param total: total number of ip needed.
		:param export_path: export IP table to CSV file. 
		:return table_df: proxy table as the type of ```pandas.DataFrame```.
		"""
		assert proxy_type in self.proxy_list_urls
		total_page = math.ceil(total / self.total_proxy_per_page)
		proxy_list_url = self.proxy_list_urls.get(proxy_type)
		
		def _get_proxy_table(url, table_dict):
			while True:
				try: 
					response = requests.get(url, headers=self.headers, timeout=30)
					break
				except Exception as e: 
					print('Fail to connect {} ...'.format(url))
					print(e)
					continue
			html = response.text
			soup = BeautifulSoup(html, 'lxml')
			table = soup.find('table', class_='fl-table')	
			if table_dict is None:
				table_dict = {}
				for th in table.find('thead').find_all('th'):
					table_dict[str(th.string)] = []
			for tr in table.find('tbody').find_all('tr'):
				for td, column in zip(tr.find_all('td'), table_dict):
					table_dict[column].append(str(td.string))
			return table_dict
		
		table_dict = None
		for page in range(1, total_page+1):
			print('Fetch proxies on page {}'.format(page))
			if page==1: 
				table_dict = _get_proxy_table(proxy_list_url, table_dict)
			else: 
				table_dict = _get_proxy_table(proxy_list_url + '/{}'.format(page), table_dict)
			print('  - There are total {} proxies.'.format(len(table_dict[list(table_dict.keys())[0]])))
		table_df = pandas.DataFrame(table_dict, columns=list(table_dict.keys()))
		if export_path is not None:
			table_df.to_csv(export_path, header=True, index=False, sep='\t')
		return table_df
	
		
if __name__ == '__main__':
	
	xila = XiLaProxy()
	
	#gaoni_df = xila.get_proxy_table(proxy_type='gaoni', export_path='gaoni.csv')
	https_df = xila.get_proxy_table(proxy_type='https', export_path='https.csv')
	#http_df = xila.get_proxy_table(proxy_type='http', export_path='http.csv')
	
	print(gaoni_df.values[:, 0].tolist())

  1. csdn.py:
  • read_article is non multi process;
  • read_article_with_proxies are multi process;
  • _ put_proxies_queue and_ read_article_with_proxies is the objective function of two multi process;
  • It is mainly an application of the Queue method; At present, the test is very stable;
  • Attached is a screenshot: basically, there is no problem more than ten times a minute;
# -*- coding: UTF-8 -*-
# @author: caoyang
# @email: caoyang@163.sufe.edu.cn

import os
import sys

sys.path.append(os.path.dirname(os.getcwd()))

import re
import time
import requests
from functools import wraps
from bs4 import BeautifulSoup
from multiprocessing import Process, Pool, Queue, Lock

from proxy import XiLaProxy

class CSDN(object):

	def __init__(self) -> None:

		self.headers = {												 # Request headers.
			'Host': 'blog.csdn.net',
			'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0',
			'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
			'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
			'Accept-Encoding': 'gzip, deflate, br',
			'Referer': 'https://blog.csdn.net/baidu_39633459',
			'Connection': 'keep-alive',
			# 'Cookie': 'Your Cookies', # 20201217 note: it has been found recently that it is feasible to bring cookie s without them. If you want to bring them, you can add them yourself
			'Upgrade-Insecure-Requests': '1',
			'Cache-Control': 'max-age=0',
			'TE': 'Trailers',
		}
		self.article_ids = [ # You can add some article ID S by yourself. Only one article is put here
			'110294311',
		]
		assert self.article_ids											 # It should not be an empty list.
		self.article_url = 'https://blog.csdn.net/baidu_39633459/article/details/{}'.format 		#  The url can be changed to your own, but this is not mine I asked a friend for a test

		
		
	
	def read_article(self, proxy: str=None, start_article_id: int=0, end_article_id: int=-1, with_cookie: bool=True, early_stop: bool=True, early_stop_point: int=95, max_read_count: int=10000, regular_interval: int=75, reset_interval: int=300) -> None:
		"""
		A simple crawler used to increase pageview of several articles.
		
		:param proxy: IP address of proxy server, default None means localhost. 
		:param start_article_id: Start index in ```self.article_ids```. 
		:param end_article_id: End index in ```self.article_ids```. 
		:param with_cookie: Whether to use cookie in crawler.
		:param early_stop: Whether to stop before whole hundred(count for bonus).
		:param early_stop_point: Early stop at ```pageview%100 == early_stop_point```, default 95.
		:param max_read_count: Required upper bound of pageview, default 10000.
		:param regular_interval: Sleep interval of seconds between two successive requests, default 75.
		:param reset_interval: Sleep interval of seconds when occuring request exception, default 300.
		"""
		headers = self.headers.copy()
		if not with_cookie: 
			headers.pop('Cookie')
		if early_stop:
			early_stop_point = early_stop_point % 100
		article_ids = self.article_ids[start_article_id:] if end_article_id == -1 else self.article_ids[start_article_id:end_article_id+1]
		compiler = re.compile('\d+')									 # An regular expression compiler used to parse digital data.
		previous_read_counts = [None] * len(article_ids)				 # A list used to save current page view of each article.
		count = 0
		while True:
			if not article_ids:											 # If it is an empty list, then break the loop and return.
				break
			try: 
				start_time = time.time()
				count += 1
				print('Pid: {} - Count: {} - '.format(os.getpid(), str(count).zfill(3)), end='')
				for i, article_id in enumerate(article_ids):
					if proxy is None:
						response = requests.get(self.article_url(article_id), headers=self.headers, timeout=30)
					else: 
						try:
							response = requests.get(self.article_url(article_id), headers=self.headers, proxies={'https': 'https://{}'.format(proxy)}, timeout=30)
						except: 
							print('Proxy {} cannot be used ...'.format(proxy))
							return False
					html = response.text
					soup = BeautifulSoup(html, 'lxml')
					span = soup.find('span', class_='read-count')
					span_string = str(span.string)
					print(span_string, end='')
					read_count = int(compiler.findall(span_string)[0])
					if previous_read_counts[i] is None:
						print('(..)', end='\t')
						previous_read_counts[i] = read_count
					else:
						read_increment = read_count - previous_read_counts[i]
						if read_increment == 0:
							print('(!!)', end='\t')
						elif read_increment == 1:
							print('(..)', end='\t')
						elif read_increment > 1:
							print('(+{})'.format(read_increment), end='\t')
						else:
							print('(??)', end='\t')
						previous_read_counts[i] = read_count		
				div = soup.find('div', id='asideProfile')
				dls = div.find_all('dl', class_='text-center')
				for dl in dls:											 # Parse blog profile data such as 'like', 'favorite', 'followers'.
					try:
						print(int(dl.attrs['title']), end=',')
					except:
						continue
				if proxy is not None: 
					print(proxy, end=',')
				print(time.strftime('%Y-%m-%d %H:%M:%S'))
				if early_stop:											 # Do early stopping.
					index = -1
					for article_id, previous_read_count in zip(article_ids[:], previous_read_counts[:]):
						index += 1
						if previous_read_count % 100 >= early_stop_point or previous_read_count>=10000: 
							previous_read_counts.pop(index)
							article_ids.pop(index)
							index -= 1
				end_time = time.time()
				consumed_time = end_time - start_time
				if consumed_time < regular_interval:
					time.sleep(regular_interval - consumed_time)
			except Exception as exception:
				print(exception)
				time.sleep(reset_interval)

	def read_article_with_proxies(self, total_processes=15, start_article_id: int=0, end_article_id: int=-1, with_cookie: bool=True, early_stop: bool=True, early_stop_point: int=95, max_read_count: int=10000, regular_interval: int=75, reset_interval: int=300) -> None:
		"""
		A multiprocessing crawler used to increase pageview of several articles.
		Note that if :param ip_list: is None, then ```read_article```will be called instead.
	
		:param total_processes: total number of multiprocesses.
		:param start_article_id: Start index in ```self.article_ids```. 
		:param end_article_id: End index in ```self.article_ids```. 
		:param with_cookie: Whether to use cookie in crawler.
		:param early_stop: Whether to stop before whole hundred(count for bonus).
		:param early_stop_point: Early stop at ```pageview%100 == early_stop_point```, default 95.
		:param max_read_count: Required upper bound of pageview, default 10000.
		:param regular_interval: Sleep interval of seconds between two successive requests, default 75.
		:param reset_interval: Sleep interval of seconds when occuring request exception, default 300.
		"""
		xila = XiLaProxy()
		queue = Queue()
		proxy_table = xila.get_proxy_table(proxy_type='https', total=50, export_path=None)
		proxies = proxy_table.values[:, 0].tolist()
		print('Total {} proxies, list as below: '.format(len(proxies)))
		print(proxies)
		
		for proxy in proxies:
			queue.put(proxy)	
		
		params = {
			'queue': queue,
			'start_article_id': start_article_id,
			'end_article_id': end_article_id,
			'with_cookie': with_cookie,
			'early_stop': early_stop,
			'early_stop_point': early_stop_point,
			'max_read_count': max_read_count,
			'regular_interval': regular_interval,
			'reset_interval': reset_interval,
		}

		processes = [Process(target=self._read_article_with_proxies, kwargs=params) for i in range(total_processes)]
		processes.append(Process(target=self._put_proxies_queue, args=(queue, total_processes, 30, 50, )))
				
		for process in processes:
			process.start()
		for process in processes:
			process.join()

		time.sleep(1000)
		for process in processes:
			process.terminate()
		
	def _put_proxies_queue(self, queue: Queue, min_queue_size: int=15, check_interval: int=30, batch_size: int=50) -> None:
		"""
		Target function for multiprocessing script ```read_article_with_proxies()```.
		"""
		xila = XiLaProxy()
		while True:
			_queue_size = queue.qsize()
			if _queue_size < min_queue_size:
				print('Queue size is {}, which is lower than than {} ...'.format(_queue_size, min_queue_size))
				print('  - Fetching a batch of proxies ...')
				_proxy_table = xila.get_proxy_table(proxy_type='https', total=batch_size, export_path=None)
				_proxies = _proxy_table.values[:, 0].tolist()
				print('  - Successfully fetch a batch proxies of {}.'.format(len(_proxies)))
				for _proxy in _proxies:
					queue.put(_proxy)
				print('Queue size is refreshed to {}.'.format(queue.qsize()))
			time.sleep(check_interval)
	

	def _read_article_with_proxies(self, queue: Queue, start_article_id: int=0, end_article_id: int=-1, with_cookie: bool=True, early_stop: bool=True, early_stop_point: int=95, max_read_count: int=10000, regular_interval: int=75, reset_interval: int=300) -> None:
		"""
		Target function for multiprocessing script ```read_article_with_proxies()```.
		"""
		while True:
			_proxy = queue.get()
			self.read_article(
				proxy=_proxy,
				start_article_id=start_article_id,
				end_article_id=end_article_id,
				with_cookie=with_cookie,
				early_stop=early_stop,
				early_stop_point=early_stop_point,
				max_read_count=max_read_count,
				regular_interval=regular_interval,
				reset_interval=reset_interval,
			)
		
if __name__ == '__main__':

	csdn = CSDN()

	#csdn.read_article(early_stop_point=60, early_stop=True)
	#https_df = xila.get_proxy_table(proxy_type='https', export_path='https.csv')
	
	csdn.read_article_with_proxies(total_processes=15, early_stop=False)

Tags: Python multiple processes

Posted by Jabop on Tue, 03 May 2022 08:48:43 +0300