aiosocksy异步爬虫——爬取以太坊Solidity智能合约代码的简约Python爬虫
如果用SS解决国内网络环境问题之后,想要去爬取国外的网站,不能使用aiohttp,aiohttp并不支持socks代理。建议使用aiosocksy,windows的话,先cmd打开控制台,pip install aiosocksy。
import asyncio import aiohttp import requests #import aiosocksy from aiosocksy import Socks5Auth from aiosocksy.connector import ProxyConnector, ProxyClientRequest from bs4 import BeautifulSoup import os import time from sys import stdindef printtime(): print(time.strftime("%Y-%m-%d %H:%M:%S:", time.localtime()), end = " ") return 0 def getSCAddress(eachurl, filepath): # 伪装成某种浏览器,防止被服务器拒绝服务 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"} # 设置访问网址失败的最高次数,达到制定次数后,报告错误,停止程序 failedTimes = 50 while True: # 一直循环,直到在制定的次数内访问站点成功 if (failedTimes <= 0): printtime() print("失败次数过多,请检查网络环境!") break failedTimes -= 1 # 每执行一次就要减1 try: # 以下except都是用来捕获当requests请求出现异常时, # 通过捕获然后等待网络情况的变化,以此来保护程序的不间断运行 print("正在连接的的网址链接是 " + eachurl ) response = requests.get(url=eachurl, headers=headers, timeout=5) # 执行到这一句意味着成功访问,于是退出while循环 break except requests.exceptions.ConnectionError: printtime() print("ConnectionError!请等待3秒!") time.sleep(3) except requests.exceptions.ChunkedEncodingError: printtime() print("ChunkedEncodingError!请等待3秒!") time.sleep(3) except: printtime() print("出现未知错误!请等待3秒!") time.sleep(3) try: # 转换成UTF-8编码 response.encoding = response.apparent_encoding except: return False # 煲汤 soup = BeautifulSoup(response.text, "html.parser") # 查找这个字段,这个字段下,包含智能合约代码的URL地址 targetDiv = soup.find_all("div","table-responsive mb-2 mb-md-0") try: targetTBody = targetDiv[0].table.tbody except: printtime() print("targetTBody未成功获取!") return False # 以追加的方式打开文件。 # 如果文件不存在,则新建;如果文件已存在,则在文件指针末尾追加 fo = open(filepath + "address.txt", "a") # 把每一个地址,都写到文件里面保存下来 for targetTR in targetTBody: if targetTR.name == "tr": fo.write("https://etherscan.io" + targetTR.td.find("a", "hash-tag text-truncate").attrs["href"] + " ") fo.close() return 0 def updatescurl(): urlList = ["https://etherscan.io/contractsVerified/1?ps=100", "https://etherscan.io/contractsVerified/2?ps=100", "https://etherscan.io/contractsVerified/3?ps=100", "https://etherscan.io/contractsVerified/4?ps=100", "https://etherscan.io/contractsVerified/5?ps=100"] # filepath是保存要爬取的智能合约地址的文件的存放路径 # 请根据自己的需求改成自己想要的路径。 filepath = "C:\Users\15321\Desktop\SmartContract\address\" # 把旧的存放合约地址的文件清除干净 try: if (os.path.exists(filepath + "address.txt")): os.remove(filepath + "address.txt") printtime() print("已清除%s目录下的旧文件(仓库)!" % filepath) except IOError: printtime() print("出现一个不能处理的错误,终止程序:IOError!") # 函数不正常执行,返回1 return 1 # 读取urlList里的每一个URL网页里的智能合约地址 for eachurl in urlList: time = 0 while( 1 == getSCAddress(eachurl, filepath)): time += 1 if(time == 10): break pass # 函数正常执行,返回0 return 0 async def parser(eachLine,html,filetotalname): # 煲汤 soup = BeautifulSoup(html, "html.parser") try: targetPRE = soup.find_all("pre", "js-sourcecopyarea editor") fo = open(filetotalname, "w+", encoding="utf-8") try: fo.write(targetPRE[0].text) except: pass fo.close() except: pass if (os.path.exists(filetotalname)): fileSize = os.path.getsize(filetotalname) if(fileSize == 0): os.remove(filetotalname) else: printtime() print(filetotalname + "新建完成!") return True await getsccodecore(eachLine) async def getsccodecore(eachLine): filename = eachLine[29:71] filepath = "C:\Users\15321\Desktop\SmartContract\code\" filetotalname = filepath + filename + ".sol" printtime() print("正在爬取智能合约代码:" + filename) if (os.path.exists(filetotalname)): fileSize = os.path.getsize(filetotalname) #print(fileSize) if(fileSize == 0): os.remove(filetotalname) else: printtime() print(filename + ".sol已存在!") return True #headers = {"User-Agent": "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)"} #下面这个headers,我之前因为用异步爬虫爬得太多太猛,被服务器禁止掉了,所以如果明明 # 已经配置好网络环境了,却总是访问失败,这个时候就要换一下headers了 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.87 Safari/537.36"} auth = Socks5Auth(login="...", password="...") connector = ProxyConnector() socks = "socks5://127.0.0.1:1080" try: # 以下except都是用来捕获当session请求出现异常时, # 通过捕获然后等待网络情况的变化,以此来保护程序的不间断运行 printtime() print("正在连接的的网址链接是 " + eachLine, end = "") async with aiohttp.ClientSession(connector=connector, request_class=ProxyClientRequest) as session: async with session.get(eachLine, proxy=socks, proxy_auth=auth) as response: html = await response.text(encoding="utf-8") #await session.close() printtime() print("成功访问:"+eachLine) await parser(eachLine,html,filename) return True except requests.exceptions.ConnectionError: printtime() print("ConnectionError!请等待3秒!") time.sleep(3) except requests.exceptions.ChunkedEncodingError: printtime() print("ChunkedEncodingError!请等待3秒!") time.sleep(3) except: printtime() print("Unfortunitely,出现未知错误!请等待3秒!") time.sleep(3) await getsccodecore(eachLine) return True def getsccode(): try: SCAddress = open("C:\Users\15321\Desktop\SmartContract\address\address.txt", "r") except: printtime() print("打开智能合约URL地址仓库错误!请检查文件目录是否正确!") return 1 que = [] for eachLine in SCAddress: que.append(eachLine) SCAddress.close() tasks = [] loop = asyncio.get_event_loop() for eachLine in que: task = asyncio.ensure_future(getsccodecore(eachLine)) tasks.append(task) tasks = asyncio.gather(*tasks) loop.run_until_complete(tasks) printtime() print("最新的500个智能合约solidity代码下载完毕!") return 0 if __name__ == "__main__" : # 更新要爬取的智能合约的地址 if(os.path.exists("C:\Users\15321\Desktop\SmartContract\address\address.txt") == True): print("是否更新智能合约地址库?输入Y或者y开头的字符串表示确定更新,其他字符表示不更新") input_string = str(stdin.readline()) if((input_string[0] == "Y") | (input_string[0] == "y")): print("开始更新智能合约地址库:") # 把旧的存放合约地址的文件清除干净 filepath = "C:\Users\15321\Desktop\SmartContract\address\" os.remove(filepath + "address.txt") printtime() print("已清除%s目录下的旧文件(仓库)!" % filepath) updatescurl() else: print("开始新建智能合约地址库:") updatescurl() # 根据智能合约的地址去爬取智能合约的代码 getsccode() input()