爬虫的实战

356次阅读
没有评论
爬虫的实战

多线程多进程总结

import time

def func1(num): time.sleep(3) print(f'子线程任务{num}')

# 方案一:直接使用多线程 – 用于需要同时执行的任务个数确定并且量小(比如,一两个任务) from threading import Thread

# 1. 直接使用Thread # t = Thread(target=func1, args=(1,)) # t.start() # # join中的timeout是设置最长等待时间,如果超时子线程中的任务还没有完成,那么后面代码会直接执行(不等任务完成才执行) # t.join(timeout=1) # print('——线程任务完成!—–')

# 2. 使用Thread类的子类 # class FThread(Thread): # def __init__(self, num): # super().__init__() # self.num = num # # def run(self) -> None: # time.sleep(1) # print(f'子线程任务{self.num}') # # # t = FThread(1) # t.start() # t.join() # print('—————任务完成—————-')

# 2. 直接使用多进程 from multiprocessing import Process

# class FProcess(Process): # def __init__(self, num): # super().__init__() # self.num = num # # def run(self) -> None: # time.sleep(3) # print(f'子线程任务{self.num}') # # # if __name__ == '__main__': # p = Process(target=func1, args=(1,)) # p.start() # # p.join() # # print('—————任务完成—————-') # # p2 = FProcess(2) # p2.start() # print(p2.num)

# 方案一、方案二数据获取问题:直接使用多线程、多进程的数据获取问题 # 1. 数据跨线程:定义全局变量,直接获取相关数据;(如果要用全局队列,可以用线程队列:queue模块中的Queue) # 2. 数据跨进程:只能使用进程队列(multiprocessing模块中的Queue) # 3. 线程队列和进程队列用法的区别 """ 线程队列: 1)定义全局队列对象 2)添加数据:队列对象.put(数据) 3)获取数据:队列对象.get() / 队列对象.get(timeout=超时时间)

进程队列: 1)定义全局队列对象 2)添加数据:队列对象.put(数据) 3)获取数据:队列对象.get() / 队列对象.get(timeout=超时时间) 注意:队列对象在子进程中使用的时候必须通过进程任务对应的函数的参数来传递 """

def func2(num): print(f'任务{num}开始') time.sleep(3) print(f'子线程任务{num}完成') return f'任务数据{num}'

# 方案三:使用线程池 from concurrent.futures import ThreadPoolExecutor, wait, as_completed, ALL_COMPLETED # # 1.创建线程池 # pool = ThreadPoolExecutor(40) # # 2.添加任务 # ts = [pool.submit(func2, x) for x in range(1, 101)] # # # 3. 关闭线程池 # # pool.shutdown() # # # 4.单纯等待线程池中所有的任务完成 # # wait(ts, return_when=ALL_COMPLETED) # # print('————–所有任务都完成了——————') # # # 5. 等待任务完成的过程中实时获取数据 # for t in as_completed(ts): # print(t.result())

# 方案四:进程池 from multiprocessing import Pool if __name__ == '__main__': # 1. 创建进程池对象 pool = Pool(10) # 2. 添加任务 # 1)同步添加 – 任务和主进程中的其他任务串行执行 # result1 = pool.apply(func2, (1,)) # result2 = pool.map(func2, [1, 2, 3, 4, 5]) # print(result1) # print(result2)

# 2)异步添加 – 任务和主线程中的任务同时执行(必须关键进程池并且等待,否则任务无法执行) result1 = pool.apply_async(func2, (1, )) result2 = pool.map_async(func2, [1, 2, 3, 4, 5])

# 3.关闭进程池 pool.close()

# 4. 等待进程池中的任务都结束 pool.join() print(result1.get()) print(result2.get())

爬51job详情页

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED import requests, json, csv from re import findall import time from queue import Queue from threading import Thread

headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36' }

def get_one_page(page: int): """获取一页岗位信息""" # 获取数据 url = f'https://search.51job.com/list/000000,000000,0000,00,9,99,数据分析,2,{page}.html?lang=c&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&ord_field=0&dibiaoid=0&line=&welfare=' response = requests.get(url, headers=headers) json_data = json.loads(findall(r'window.__SEARCH_RESULT__ = (.+?)</script>', response.text)[0])

# 解析数据 page_data = [] for job in json_data['engine_search_result']: job_data = {} # 获取岗位相关信息 job_data['name'] = job['job_name'] job_data['providesalary_text'] = job['providesalary_text'] job_data['workarea_text'] = job['workarea_text'] job_data['company_name'] = job['company_name'] job_data['companytype_text'] = job['companytype_text'] job_data['workarea_text'] = job['workarea_text'] job_data['job_href'] = job['job_href'] job_data['jobwelf'] = job['jobwelf']

# 将工资信息中的金额分离,并且将单位统一转换成元 salary = findall(r'(d+.?d*)-?(d+.?d*)([万千])/([月年天])', job_data['providesalary_text']) if salary: salary = salary[0] unit = salary[2] time = salary[3] min_salary = (float(salary[0]) * (10000 if unit == '万' else 1000)) / (1 if time == '月' else 12) max_salary = (float(salary[1]) * (10000 if unit == '万' else 1000)) / (1 if time == '月' else 12) else: min_salary = max_salary = 0

job_data['min_salary'] = min_salary job_data['max_salary'] = max_salary job_data['salary'] = (min_salary + max_salary) / 2 page_data.append(job_data) return page_data

# 获取每个工作的详情页(模拟) def get_details_page(data): print(f'请求工作的详情页:{data["job_href"]}') time.sleep(1) print(f'—————–请求成功!——————')

# response = requests.get(data["job_href"]) # response.text # data['详情页信息字段'] = 详情页解析出的相关数据 data['详情页'] = '详情页数据' queue.put(data)

def save_data():

writer = csv.DictWriter( open('files/数据分析.csv', 'a', encoding='utf-8'), ['name', 'providesalary_text', 'workarea_text', 'company_name', 'companytype_text', 'workarea_text', 'job_href', 'jobwelf', 'min_salary', 'max_salary', 'salary', '详情页'] ) writer.writeheader() while True: data = queue.get() if data == 'end': break writer.writerow(data)

if __name__ == '__main__': # 1.创建线程池请求每个工作的详情页 pool2 = ThreadPoolExecutor(50) ts2 = []

queue = Queue() save_thread = Thread(target=save_data) save_thread.start()

# 2.创建线程池请求搜索结果的每一页数据 pool1 = ThreadPoolExecutor(50) ts = [pool1.submit(get_one_page, page) for page in range(1, 101)] pool1.shutdown()

# 3.如果请求到一个数据,就将一页数据中的50个岗位的详情请求作为任务添加到pool2中 for t in as_completed(ts): for job in t.result(): print(job['job_href']) t2 = pool2.submit(get_details_page, job) ts2.append(t2) pool2.shutdown()

wait(ts2, return_when=ALL_COMPLETED) queue.put('end') print('———————————-全部结束—————————–')

爬京东商品评论

import requests from bs4 import BeautifulSoup from functools import reduce from re import findall from json import loads from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed

headers = { 'cookie': '__jdv=76161171|direct|-|none|-|1628560053224; __jdu=16285600532191992332185; shshshfpb=l7hwQrGrqIi4eCr58XglTaQ%3D%3D; shshshfpa=6f4d3f44-5d90-7676-86d0-e6c4df742265-1617089801; user-key=478b38e2-62c2-4780-abdc-7ee92a3a8757; pinId=vgq34oMxHnsjt7JTMb9B0A; pin=15300022703_p; unick=jd153000xiz; _tp=SS6A2kOuyFHrS5Nm%2BHtauA%3D%3D; _pst=15300022703_p; qrsc=3; areaId=22; ipLoc-djd=22-1930-50947-0; PCSYCityID=CN_510000_510100_510107; rkv=1.0; TrackID=1erZ0XUO8xw_6hUL-f2Hc_nktnWuiTcO4mpb6IvZqKkILGf2FhGrEnjJXwSsmjbmZDcgSFyPaBnzaWacgmu38rV1O4_-kXXn13bNGTCAY6qQ8UbWIlrn9D4FEAlJ2l4Tc; thor=CCB112E41BF7B8A18299559069E1826B2FE40F79A9C40AB9CF6C7ACF4FE9B0D189CA2691BCCB844F401D55F533423E22D93DB74BC974DE5CBAD06E96E66DC8968A44F9A96268E9F0FDC91512405DD5D2CA092862CB2AA3D051F92E8B9714C2A9F110FBDA52F37F63703ECAFFD70A5E993920298C08ABB5DEC447DD3F4611CAE31BB210BC0953C98497CFCDD5B086F499; ceshi3.com=201; 3AB9D23F7A4B3C9B=MBL5WIIFBJTQE7QBGVZP75S4LKWL4F6HAICGTLHP5C7ASWPR3ARXWOZP5CCBRG5MWZV2B7IB6VAIGDPHH4XMIDMXGI; __jda=122270672.16285600532191992332185.1628560053.1628838707.1629443811.8; __jdb=122270672.8.16285600532191992332185|8.1629443811; __jdc=122270672; shshshfp=3e4645d35360fc979933ecbd8a88e64d; shshshsID=d26b51263e90f337994443223e764969_5_1629444224406', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36' }

def get_goods_list(page): url = f'https://search.jd.com/Search?keyword=%E7%94%B5%E8%84%91&wq=%E7%94%B5%E8%84%91&pvid=f1269a60000642a7bdda4078a12138d6&page={page}&s=56&click=0' response = requests.get(url, headers=headers) soup = BeautifulSoup(response.text, 'lxml') all_goods_li = soup.select('.gl-warp>li') all_goods_id = [] for goods_li in all_goods_li: all_goods_id.append(goods_li.attrs['data-sku']) return all_goods_id

def get_one_page_comments(goods_id: str, page: int): url = f'https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98&productId={goods_id}&score=0&sortType=5&page={page}&pageSize=10&isShadowSku=0&fold=1' response = requests.get(url, headers=headers) json_str = findall(r'fetchJSON_comment98((.+))', response.text)[0] comment_info = loads(json_str) comments = comment_info['comments'] for x in comments: print(x['content'])

def get_goods_comments(goods_id: str): url = f'https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98&productId={goods_id}&score=0&sortType=5&page=1&pageSize=10&isShadowSku=0&fold=1' response = requests.get(url, headers=headers) json_str = findall(r'fetchJSON_comment98((.+))', response.text)[0] comment_info = loads(json_str) comment_summary = comment_info['productCommentSummary'] count = reduce(lambda x, y: x+comment_summary[f'score{y}Count'], range(1, 6), 0) for x in range(count//10+1): t = pool.submit(get_one_page_comments, goods_id, x) ts.append(t)

pool.shutdown()

if __name__ == '__main__': # 获取评论信息的线程池 pool = ThreadPoolExecutor(50) ts = []

# 获取数据 all_goods_id = get_goods_list(1) print(all_goods_id) get_goods_comments(all_goods_id[0])

wait(ts, return_when=ALL_COMPLETED)

神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试

相关文章:

版权声明:Python教程2022-10-28发表,共计8704字。
新手QQ群:570568346,欢迎进群讨论 Python51学习