python生产者消费者多线程数据挖掘

生产者消费者模型

爬虫抓取和数据清洗分别对应一个Thread,两个线程之间通过顺序队列queue传递数据,抓取线程负责抓取网站数据,并将原始数据存入队列,清洗线程从队列中按入队顺序读取原始数据并提取出有效数据。

多线程数据通信的queue.Queue

# 导入
import queue
 
# 创建Queue
q = queue.Queue()
 
# 添加元素(如果队列已满,则进行阻塞,等待队列不满时添加)
q.put(item)
 
# 获取元素(如果队列为空,则进行阻塞,等待队列不空时获取)
item = q.get()
 
# 查看元素的多少
q.qsize()
 
# 判断是否为空
q.empty()
 
# 判断是否已满
q.full()

代码实现

先创建一个爬虫类和一个解析类

爬虫类

# 爬虫类
class CrawlDz(Thread):
    def __init__(self, url_queue, html_queue):
        Thread.__init__(self)
        self.url_queue = url_queue
        self.html_queue = html_queue

    # 重写多线程执行函数
    def run(self) -> None:
        headers = {
            "User-Agent": UserAgent().random
        }

        while not self.url_queue.empty():
            url = self.url_queue.get()
            response = requests.get(url=url, headers=headers)
            if response.status_code == 200:
                self.html_queue.put(response.text)

解析类

# 解析类
class Parse(Thread):
    def __init__(self, html_queue):
        # super().__init__()
        Thread.__init__(self)
        self.html_queue = html_queue
        self.headers = {
            "User-Agent": UserAgent().random
        }

    # 重写多线程执行函数
    def run(self) -> None:
        while not self.html_queue.empty():
            tree = etree.HTML(self.html_queue.get())
            div_list = tree.xpath("/html/body/section/div/div")

            for div in div_list:
                title = div.xpath("./article/header/h2/a/text()")
                subpage = div.xpath("./article/header/h2/a/@href")

                for titles, url in zip(title, subpage):
                    response = requests.get(url, headers=self.headers).text
                    # 将数据添加到队列中
                    self.html_queue.put(response)
                    tree1 = etree.HTML(self.html_queue.get())
                    content = tree1.xpath("/html/body/section/div/div/article/p/text()")
                    contents = "".join(content)
                    # print("标题:", titles, "内容页面地址:", url, "内容:", contents)

                    with open("./段子.txt", mode="a", encoding="utf-8") as f:
                        f.write(contents + "
")
                        f.close()
                        print("全部抓取完毕", contents)

main方法

if __name__ == "__main__":
    url_queue = Queue()
    html_queue = Queue()
    base_url = "https://duanzixing.com/%E6%AE%B5%E5%AD%90/{}/"
    for page in range(1, 3):
        new_url = base_url.format(page)
        url_queue.put(new_url)

    # 创建一个列表去控制爬虫线程
    crawl_list = []
    for i in range(0, 3):
        crawl = CrawlDz(url_queue, html_queue)
        crawl_list.append(crawl)
        crawl.start()

    for i in crawl_list:
        i.join()

    # 创建一个列表去控制解析线程
    parse_list = []
    for i in range(0, 3):
        parse = Parse(html_queue)
        parse_list.append(parse)
        parse.start()

    for i in parse_list:
        i.join()

完整代码 

# -*- codding = utf-8 -*-
# @Time : 2022/8/11 21:24
# @Author : 火之意志拥有者
# @File : 生产消费爬虫.py
# @Software : PyCharm

from threading import Thread
from queue import Queue
from fake_useragent import UserAgent
import requests
from lxml import etree


# 爬虫类
class CrawlDz(Thread):
    def __init__(self, url_queue, html_queue):
        Thread.__init__(self)
        self.url_queue = url_queue
        self.html_queue = html_queue

    # 重写多线程执行函数
    def run(self) -> None:
        headers = {
            "User-Agent": UserAgent().random
        }

        while not self.url_queue.empty():
            url = self.url_queue.get()
            response = requests.get(url=url, headers=headers)
            if response.status_code == 200:
                self.html_queue.put(response.text)


# 解析类
class Parse(Thread):
    def __init__(self, html_queue):
        # super().__init__()
        Thread.__init__(self)
        self.html_queue = html_queue
        self.headers = {
            "User-Agent": UserAgent().random
        }

    # 重写多线程执行函数
    def run(self) -> None:
        while not self.html_queue.empty():
            tree = etree.HTML(self.html_queue.get())
            div_list = tree.xpath("/html/body/section/div/div")

            for div in div_list:
                title = div.xpath("./article/header/h2/a/text()")
                subpage = div.xpath("./article/header/h2/a/@href")

                for titles, url in zip(title, subpage):
                    response = requests.get(url, headers=self.headers).text
                    # 将数据添加到队列中
                    self.html_queue.put(response)
                    tree1 = etree.HTML(self.html_queue.get())
                    content = tree1.xpath("/html/body/section/div/div/article/p/text()")
                    contents = "".join(content)
                    # print("标题:", titles, "内容页面地址:", url, "内容:", contents)

                    with open("./段子.txt", mode="a", encoding="utf-8") as f:
                        f.write(contents + "
")
                        f.close()
                        print("全部抓取完毕", contents)


if __name__ == "__main__":
    url_queue = Queue()
    html_queue = Queue()
    base_url = "https://duanzixing.com/%E6%AE%B5%E5%AD%90/{}/"
    for page in range(1, 3):
        new_url = base_url.format(page)
        url_queue.put(new_url)

    # 创建一个列表去控制爬虫线程
    crawl_list = []
    for i in range(0, 3):
        crawl = CrawlDz(url_queue, html_queue)
        crawl_list.append(crawl)
        crawl.start()

    for i in crawl_list:
        i.join()

    # 创建一个列表去控制解析线程
    parse_list = []
    for i in range(0, 3):
        parse = Parse(html_queue)
        parse_list.append(parse)
        parse.start()

    for i in parse_list:
        i.join()

 

运行结果

 

 

数据展示(保存到本地文本)

 

原文地址:https://www.cnblogs.com/tanling/p/16578444.html