Python疯狂练习60天——第十八天

B站影视 电影资讯 2025-09-28 09:19 1

摘要:import requestsfrom Requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.retry import Retryimport timeimport js

今天我们将学习使用Python进行Web爬虫和数据采集,包括requests、BeautifulSoup、Scrapy等工具的使用。

import requestsfrom Requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.retry import Retryimport timeimport jsondef requests_basics:"""Requests库基础使用"""print("=== Requests库基础 ===")# 1. 基本GET请求print("1. 基本GET请求")# 测试用的公共APIurl = "https://httpbin.org/get"try:response = requests.get(url)print(f"状态码: {response.status_code}")print(f"响应头: {dict(response.headers)}")print(f"响应内容前200字符: {response.text[:200]}...")# JSON响应处理if response.headers.get('Content-Type', '').startswith('application/json'):data = response.jsonprint(f"解析的JSON数据: {data}")except requests.Exceptions.RequestException as e:print(f"请求错误: {e}")# 2. 带参数的GET请求print("\n2. 带参数的GET请求")params = {'key1': 'value1','key2': 'value2','page': 1,'limit': 10}response = requests.get("https://httpbin.org/get", params=params)print(f"请求URL: {response.url}")print(f"请求参数: {params}")# 3. POST请求print("\n3. POST请求")data = {'username': 'testuser','password': 'testpass','email': 'test@example.com'}response = requests.post("https://httpbin.org/post", data=data)print(f"状态码: {response.status_code}")if response.status_code == 200:result = response.jsonprint(f"表单数据: {result.get('form', {})}")# 4. 带JSON数据的POST请求print("\n4. JSON POST请求")json_data = {'title': '测试文章','content': '这是测试内容','author': '测试作者','tags': ['python', '爬虫', '测试']}response = requests.post("https://httpbin.org/post",json=json_data,headers={'Content-Type': 'application/json'})if response.status_code == 200:result = response.jsonprint(f"JSON数据: {result.get('json', {})}")# 5. 设置请求头print("\n5. 自定义请求头")headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Accept': 'application/json','Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8','Referer': 'https://www.example.com'}response = requests.get("https://httpbin.org/headers", headers=headers)if response.status_code == 200:result = response.jsonprint(f"请求头信息: {result.get('headers', {})}")# 6. 处理cookiesprint("\n6. Cookies处理")# 设置cookiescookies = {'Session_id': 'abc123', 'user_id': '456'}response = requests.get("https://httpbin.org/cookies", cookies=cookies)if response.status_code == 200:result = response.jsonprint(f"发送的Cookies: {result.get('cookies', {})}")# 获取cookiesresponse = requests.get("https://httpbin.org/cookies/set/sessionid/123456")print(f"服务器设置的Cookies: {response.cookies.get_dict}")def advanced_requests:"""Requests高级功能"""print("\n=== Requests高级功能 ===")# 1. 会话保持print("1. 会话保持")with requests.Session as session:# 第一次请求设置cookiesresponse1 = session.get("https://httpbin.org/cookies/set/sessionid/789012")print(f"第一次请求Cookies: {session.cookies.get_dict}")# 第二次请求会携带cookiesresponse2 = session.get("https://httpbin.org/cookies")if response2.status_code == 200:result = response2.jsonprint(f"第二次请求Cookies: {result.get('cookies', {})}")# 2. 超时设置print("\n2. 超时设置")try:# 设置连接超时和读取超时response = requests.get("https://httpbin.org/delay/2", timeout=(3.05, 5))print("请求成功完成")except requests.exceptions.Timeout:print("请求超时")except requests.exceptions.RequestException as e:print(f"请求错误: {e}")# 3. 重试机制print("\n3. 重试机制")def create_session_with_retries:session = requests.Session# 定义重试策略retry_strategy = Retry(total=3, # 总重试次数backoff_factor=1, # 退避因子status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码)# 创建适配器adapter = HTTPAdapter(max_retries=retry_strategy)session.mount("http://", adapter)session.mount("https://", adapter)return sessionsession = create_session_with_retriestry:response = session.get("https://httpbin.org/status/500")print(f"最终状态码: {response.status_code}")except requests.exceptions.RequestException as e:print(f"请求失败: {e}")# 4. 代理设置print("\n4. 代理设置")# 注意:这里使用免费代理示例,实际使用时需要有效的代理服务器proxies = {'http': 'http://proxy.example.com:8080','https': 'https://proxy.example.com:8080',}# 实际使用时取消注释# try:# response = requests.get("https://httpbin.org/ip", proxies=proxies, timeout=10)# print(f"通过代理获取的IP: {response.json}")# except:# print("代理请求失败")print("代理示例已注释,需要有效代理服务器才能运行")# 5. 文件下载print("\n5. 文件下载")def download_file(url, filename):"""下载文件"""try:response = requests.get(url, stream=True)response.raise_for_statuswith open(filename, 'wb') as f:for chunk in response.iter_content(chunk_size=8192):if chunk:f.write(chunk)print(f"文件下载成功: {filename}")return Trueexcept requests.exceptions.RequestException as e:print(f"下载失败: {e}")return False# 下载一个小图片示例image_url = "https://httpbin.org/image/png"download_file(image_url, "downloaded_image.png")# 运行Requests示例requests_basicsadvanced_requestsfrom bs4 import BeautifulSoupimport requestsimport redef beautifulsoup_basics:"""BeautifulSoup基础使用"""print("=== BeautifulSoup网页解析 ===")# 1. 解析html字符串print("1. 解析HTML字符串")html_doc = """测试网页

网页标题

这是一个段落。

这是另一个段落。

项目1项目2项目3示例链接嵌套内容"""soup = BeautifulSoup(html_doc, 'html.parser')# 基本操作print(f"网页标题: {soup.title.string}")print(f"第一个p标签: {soup.p.text}")print(f"所有p标签数量: {len(soup.find_all('p'))}")# 2. 标签选择方法print("\n2. 标签选择方法")# 通过标签名print("所有li标签:")for li in soup.find_all('li'):print(f" - {li.text}")# 通过classprint("\nclass为content的标签:")for p in soup.find_all(class_='content'):print(f" - {p.text}")# 通过idmain_title = soup.find(id='main-title')print(f"\nid为main-title的标签: {main_title.text}")# 通过属性link = soup.find('a', href='https://www.example.com')print(f"链接: {link.text} -> {link['href']}")# 3. CSS选择器print("\n3. CSS选择器")# 选择所有item类items = soup.select('.item')print("CSS选择器 .item:")for item in items:print(f" - {item.text}")# 选择active类的itemactive_item = soup.select('.item.active')print(f"激活的项目: {active_item[0].text if active_item else '无'}")# 选择嵌套内容nested = soup.select('.nested span')print(f"嵌套内容: {nested[0].text if nested else '无'}")# 4. 实际网页解析示例print("\n4. 实际网页解析示例")def parse_example_website:"""解析示例网站"""try:# 使用一个简单的测试网站url = "https://httpbin.org/html"response = requests.get(url)if response.status_code == 200:soup = BeautifulSoup(response.text, 'html.parser')# 提取标题title = soup.find('h1')if title:print(f"页面标题: {title.text}")# 提取所有段落paragraphs = soup.find_all('p')print("页面段落:")for i, p in enumerate(paragraphs, 1):print(f" {i}. {p.text.strip}")# 提取所有链接links = soup.find_all('a')print("页面链接:")for link in links:href = link.get('href', '')text = link.text.stripif text or href:print(f" - {text} -> {href}")except requests.exceptions.RequestException as e:print(f"请求错误: {e}")parse_example_websitedef advanced_beautifulsoup:"""BeautifulSoup高级功能"""print("\n=== BeautifulSoup高级功能 ===")# 创建测试HTMLhtml_content = """

商品A

¥100.00

这是商品A的描述

热卖新品

商品B

¥200.00

这是商品B的描述

折扣

商品C

¥150.00

这是商品C的描述

"""soup = BeautifulSoup(html_content, 'html.parser')# 1. 数据提取和清洗print("1. 数据提取和清洗")products = for product_div in soup.find_all('div', class_='product'):product = {}# 提取商品名称name_tag = product_div.find('h3')product['name'] = name_tag.text.strip if name_tag else '未知'# 提取价格(使用正则表达式清理)price_tag = product_div.find('p', class_='price')if price_tag:# 提取数字price_match = re.search(r'[\d.]+', price_tag.text)product['price'] = float(price_match.group) if price_match else 0.0else:product['price'] = 0.0# 提取描述desc_tag = product_div.find('p', class_='description')product['description'] = desc_tag.text.strip if desc_tag else ''# 提取标签tags = [tag.text.strip for tag in product_div.find_all('span', class_='tag')]product['tags'] = tags# 提取自定义属性product['id'] = product_div.get('data-id', '')products.append(product)print("提取的商品信息:")for i, product in enumerate(products, 1):print(f"商品{i}: {product}")# 2. 导航方法print("\n2. 导航方法")# 父节点和子节点first_product = soup.find('div', class_='product')if first_product:print(f"第一个商品的父节点: {first_product.parent.get('class', )}")print(f"第一个商品的直接子节点:")for child in first_product.children:if child.name: # 过滤掉文本节点print(f" - {child.name}: {child.text.strip}")# 兄弟节点second_product = soup.find_all('div', class_='product')[1]if second_product:next_sibling = second_product.find_next_sibling('div', class_='product')prev_sibling = second_product.find_previous_sibling('div', class_='product')print(f"第二个商品的前一个兄弟: {prev_sibling.get('data-id') if prev_sibling else '无'}")print(f"第二个商品的后一个兄弟: {next_sibling.get('data-id') if next_sibling else '无'}")# 3. 修改HTMLprint("\n3. 修改HTML")# 修改内容first_h3 = soup.find('h3')if first_h3:original_text = first_h3.textfirst_h3.string = "修改后的商品A"print(f"修改前: {original_text}")print(f"修改后: {first_h3.text}")# 添加新元素new_product = soup.new_tag('div', **{'class': 'product', 'data-id': '4'})new_product.append(soup.new_tag('h3'))new_product.h3.string = "商品D"products_div = soup.find('div', class_='products')if products_div:products_div.append(new_product)print(f"添加新商品后商品数量: {len(products_div.find_all('div', class_='product'))}")# 4. 输出美化print("\n4. 美化输出")print(soup.prettify[:500] + "...")# 运行BeautifulSoup示例beautifulsoup_basicsadvanced_beautifulsoupimport csvimport jsonimport osfrom datetime import datetimedef news_crawler:"""新闻爬虫实战项目"""print("=== 新闻爬虫实战项目 ===")# 创建数据存储目录os.makedirs('news_data', exist_ok=True)class NewsCrawler:def __init__(self):self.session = requests.Sessionself.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})def crawl_news(self, url):"""爬取新闻页面"""try:response = self.session.get(url, timeout=10)response.raise_for_statussoup = BeautifulSoup(response.text, 'html.parser')# 提取新闻信息(这里使用模拟数据,实际需要根据目标网站调整)news_data = self.extract_news_data(soup)return news_dataexcept requests.exceptions.RequestException as e:print(f"爬取失败 {url}: {e}")return Nonedef extract_news_data(self, soup):"""提取新闻数据(示例实现)"""# 这里是一个示例实现,实际爬取时需要根据目标网站结构调整# 模拟提取数据news = {'title': '示例新闻标题','content': '这是新闻内容...','publish_time': datetime.now.strftime('%Y-%m-%d %H:%M:%S'),'author': '示例作者','source': '示例来源','url': 'https://example.com/news/1','crawl_time': datetime.now.isoformat}# 实际爬取时,需要根据网站结构提取真实数据# 例如:# title = soup.find('h1', class_='news-title')# content = soup.find('div', class_='news-content')# ...return newsdef save_to_json(self, news_list, filename):"""保存到JSON文件"""filepath = os.path.join('news_data', filename)with open(filepath, 'w', encoding='utf-8') as f:json.dump(news_list, f, ensure_ascii=False, indent=2)print(f"已保存 {len(news_list)} 条新闻到 {filepath}")def save_to_csv(self, news_list, filename):"""保存到CSV文件"""filepath = os.path.join('news_data', filename)if news_list:fieldnames = news_list[0].keyswith open(filepath, 'w', encoding='utf-8', newline='') as f:writer = csv.DictWriter(f, fieldnames=fieldnames)writer.writeheaderwriter.writerows(news_list)print(f"已保存 {len(news_list)} 条新闻到 {filepath}")# 使用示例crawler = NewsCrawler# 模拟爬取多个新闻页面news_list = base_url = "https://httpbin.org/html" # 使用测试页面print("开始爬取新闻...")for i in range(5): # 模拟爬取5个页面print(f"爬取第 {i+1} 个页面...")# 在实际项目中,这里应该是真实的新闻URL列表news_data = crawler.crawl_news(base_url)if news_data:# 为每个新闻生成唯一数据news_data['title'] = f"示例新闻标题 {i+1}"news_data['content'] = f"这是第 {i+1} 个新闻的内容..."news_data['url'] = f"https://example.com/news/{i+1}"news_list.append(news_data)# 添加延迟,避免请求过快time.sleep(1)# 保存数据if news_list:crawler.save_to_json(news_list, 'news.json')crawler.save_to_csv(news_list, 'news.csv')# 显示爬取结果print("\n爬取结果摘要:")for i, news in enumerate(news_list[:3], 1): # 显示前3条print(f"{i}. {news['title']}")print(f" 来源: {news['source']}")print(f" 时间: {news['publish_time']}")print(f" 内容预览: {news['content'][:50]}...")printelse:print("没有爬取到新闻数据")# 运行新闻爬虫news_crawlerfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.webdriver.chrome.options import Optionsimport timedef selenium_basics:"""Selenium基础使用"""print("=== Selenium动态内容爬取 ===")# 设置Chrome选项chrome_options = Optionschrome_options.add_argument('--headless') # 无头模式chrome_options.add_argument('--no-sandbox')chrome_options.add_argument('--disable-dev-shm-usage')try:# 初始化浏览器驱动print("初始化浏览器驱动...")driver = webdriver.Chrome(options=chrome_options)# 1. 基本页面操作print("1. 基本页面操作")# 访问页面driver.get("https://httpbin.org/html")print(f"页面标题: {driver.title}")print(f"当前URL: {driver.current_url}")# 2. 元素查找和交互print("\n2. 元素查找和交互")# 等待元素加载wait = WebDriverWait(driver, 10)# 查找元素try:# 查找h1元素h1_element = wait.until(EC.presence_of_element_located((By.TAG_NAME, "h1")))print(f"找到h1元素: {h1_element.text}")# 查找所有p元素p_elements = driver.find_elements(By.TAG_NAME, "p")print(f"找到 {len(p_elements)} 个p元素")for i, p in enumerate(p_elements[:3], 1): # 显示前3个print(f" {i}. {p.text}")except Exception as e:print(f"元素查找错误: {e}")# 3. 执行JavaScriptprint("\n3. 执行JavaScript")# 执行JavaScript代码script_result = driver.execute_script("return document.title;")print(f"通过JavaScript获取标题: {script_result}")# 滚动页面driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")print("已滚动到页面底部")# 4. 表单操作示例(在测试页面上)print("\n4. 表单操作示例")# 访问一个包含表单的测试页面driver.get("https://httpbin.org/forms/post")# 查找表单元素try:# 填写输入框name_input = wait.until(EC.presence_of_element_located((By.NAME, "custname")))name_input.clearname_input.send_keys("测试用户")print("已填写姓名字段")# 选择单选按钮size_radio = driver.find_element(By.CSS_SELECTOR, "input[value='medium']")size_radio.clickprint("已选择中等尺寸")# 选择复选框toppings = driver.find_elements(By.NAME, "topping")for topping in toppings[:2]: # 选择前两个配料topping.clickprint("已选择配料")# 在实际爬虫中,这里可以提交表单# submit_button = driver.find_element(By.TAG_NAME, "button")# submit_button.clickexcept Exception as e:print(f"表单操作错误: {e}")# 5. 截图功能print("\n5. 截图功能")# 保存截图screenshot_path = "news_data/selenium_screenshot.png"driver.save_screenshot(screenshot_path)print(f"截图已保存到: {screenshot_path}")# 6. 处理弹窗和窗口print("\n6. 窗口处理")# 打开新窗口driver.execute_script("window.open('https://httpbin.org/html');")print("已打开新窗口")# 切换窗口windows = driver.window_handlesprint(f"当前窗口数量: {len(windows)}")if len(windows) > 1:driver.switch_to.window(windows[1])print(f"切换到新窗口,标题: {driver.title}")# 关闭新窗口并切换回原窗口driver.closedriver.switch_to.window(windows[0])print("已关闭新窗口并切换回原窗口")except Exception as e:print(f"Selenium错误: {e}")finally:# 关闭浏览器if 'driver' in locals:driver.quitprint("浏览器已关闭")def dynamic_content_crawler:"""动态内容爬虫示例"""print("\n=== 动态内容爬虫示例 ===")# 这个示例需要实际的动态网站,这里使用模拟说明class DynamicCrawler:def __init__(self):chrome_options = Optionschrome_options.add_argument('--headless')self.driver = webdriver.Chrome(options=chrome_options)self.wait = WebDriverWait(self.driver, 10)def crawl_dynamic_content(self, url):"""爬取动态加载的内容"""try:self.driver.get(url)# 等待动态内容加载time.sleep(3) # 简单等待,实际应该使用明确的等待条件# 获取渲染后的页面源码page_source = self.driver.page_source# 使用BeautifulSoup解析soup = BeautifulSoup(page_source, 'html.parser')# 提取动态加载的内容# 这里根据实际网站结构编写提取逻辑return soupexcept Exception as e:print(f"动态内容爬取错误: {e}")return Nonedef close(self):"""关闭浏览器"""self.driver.quit# 使用说明print("动态内容爬虫使用说明:")print("1. 对于JavaScript渲染的内容,使用Selenium")print("2. 等待动态内容加载完成")print("3. 获取渲染后的页面源码")print("4. 使用BeautifulSoup解析内容")print("5. 注意添加适当的延迟和等待条件")# 由于需要实际网站,这里不执行具体爬取print("\n注意: 实际动态爬虫需要指定具体的目标网站")# 运行Selenium示例(需要安装ChromeDriver)try:selenium_basicsdynamic_content_crawlerexcept Exception as e:print(f"Selenium示例运行失败: {e}")print("请确保已安装Chrome浏览器和ChromeDriver")def api_data_collection:"""API数据采集"""print("=== API数据采集 ===")# 1. 公共API数据采集print("1. 公共API数据采集")class APICollector:def __init__(self):self.session = requests.Sessionself.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Accept': 'application/json'})def get_public_data(self, api_url, params=None):"""获取公共API数据"""try:response = self.session.get(api_url, params=params, timeout=10)response.raise_for_statusif response.headers.get('Content-Type', '').startswith('application/json'):return response.jsonelse:return response.textexcept requests.exceptions.RequestException as e:print(f"API请求错误: {e}")return Nonedef collect_weather_data(self, city='Beijing'):"""收集天气数据示例"""# 使用开放的天气API(示例,需要注册获取API key)# 这里使用模拟数据print(f"获取 {city} 的天气数据...")# 模拟天气数据weather_data = {'city': city,'temperature': 25.5,'humidity': 60,'description': '晴朗','wind_speed': 3.2,'timestamp': datetime.now.isoformat}return weather_datadef collect_financial_data(self, symbol='AAPL'):"""收集金融数据示例"""print(f"获取 {symbol} 的金融数据...")# 模拟金融数据financial_data = {'symbol': symbol,'price': 150.25,'change': 2.5,'change_percent': 1.69,'volume': 12500000,'timestamp': datetime.now.isoformat}return financial_datadef save_api_data(self, data, filename):"""保存API数据"""os.makedirs('api_data', exist_ok=True)filepath = os.path.join('api_data', filename)with open(filepath, 'w', encoding='utf-8') as f:json.dump(data, f, ensure_ascii=False, indent=2)print(f"数据已保存到: {filepath}")# 使用示例collector = APICollector# 收集多种数据weather_data = collector.collect_weather_data('Beijing')financial_data = collector.collect_financial_data('AAPL')# 保存数据if weather_data:collector.save_api_data(weather_data, 'weather.json')if financial_data:collector.save_api_data(financial_data, 'financial.json')# 2. 分页数据采集print("\n2. 分页数据采集")def paginated_api_collection:"""分页API数据采集"""all_data = for page in range(1, 4): # 模拟采集3页数据print(f"采集第 {page} 页数据...")# 模拟API响应page_data = {'page': page,'data': [{'id': i, 'name': f'项目{(page-1)*10 + i}'} for i in range(1, 11) # 每页10条数据],'total_pages': 3,'total_items': 30}all_data.extend(page_data['data'])# 添加延迟避免请求过快time.sleep(1)# 检查是否还有下一页if page >= page_data['total_pages']:breakprint(f"共采集 {len(all_data)} 条数据")return all_datapaginated_data = paginated_api_collectioncollector.save_api_data(paginated_data, 'paginated_data.json')# 3. 实时数据监控print("\n3. 实时数据监控")class RealTimeMonitor:def __init__(self):self.data_points = def monitor_api(self, api_url, interval=60, duration=300):"""监控API数据"""print(f开始监控 {api_url},间隔 {interval}秒,持续 {duration}秒")start_time = time.timepoint_count = 0while time.time - start_time def crawler_ethics_best_practices:"""爬虫伦理与最佳实践"""print("=== 爬虫伦理与最佳实践 ===")# 1. 遵守robots.txtprint("1. 遵守robots.txt")def check_robots_txt(base_url):"""检查robots.txt"""robots_url = f"{base_url}/robots.txt"try:response = requests.get(robots_url, timeout=5)if response.status_code == 200:print(f"{base_url} 的robots.txt内容:")print(response.text[:500] + "..." if len(response.text) > 500 else response.text)else:print(f"无法获取 {robots_url},状态码: {response.status_code}")except requests.exceptions.RequestException as e:print(f"获取robots.txt失败: {e}")# 检查示例网站的robots.txtcheck_robots_txt("https://www.example.com")# 2. 设置合理的请求间隔print("\n2. 请求频率控制")class PoliteCrawler:def __init__(self, delay=1.0):self.delay = delay # 请求间隔(秒)self.last_request_time = 0self.session = requests.Sessiondef polite_get(self, url, **kwargs):"""礼貌的GET请求"""# 计算需要等待的时间current_time = time.timetime_since_last = current_time - self.last_request_timewait_time = max(0, self.delay - time_since_last)if wait_time > 0:print(f"等待 {wait_time:.2f} 秒...")time.sleep(wait_time)self.last_request_time = time.timereturn self.session.get(url, **kwargs)# 使用礼貌爬虫polite_crawler = PoliteCrawler(delay=2.0)print("使用礼貌爬虫示例:")for i in range(3):print(f"第 {i+1} 次请求...")try:response = polite_crawler.polite_get("https://httpbin.org/delay/1")print(f"状态码: {response.status_code}")except Exception as e:print(f"请求失败: {e}")# 3. 错误处理和重试机制print("\n3. 健壮的错误处理")def robust_crawler(url, max_retries=3):"""健壮的爬虫函数"""for attempt in range(max_retries):try:response = requests.get(url, timeout=10)response.raise_for_statusreturn responseexcept requests.exceptions.Timeout:print(f"请求超时,第 {attempt+1} 次重试...")except requests.exceptions.HTTPError as e:print(f"HTTP错误: {e}")if e.response.status_code == 404:print("页面不存在,停止重试")breakexcept requests.exceptions.RequestException as e:print(f"请求错误: {e}")if attempt

创建一个完整的爬虫项目,爬取一个实际网站的数据并进行存储和分析。

# 挑战练习:完整爬虫项目 - 图书信息爬取def complete_crawler_project:"""完整爬虫项目:图书信息爬取"""print("=== 完整爬虫项目:图书信息爬取 ===")class BookCrawler:def __init__(self):self.session = requests.Sessionself.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8','Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'})self.data_manager = DataManager('book_data')def crawl_book_list(self, base_url, pages=3):"""爬取图书列表"""print(f"开始爬取图书列表,共 {pages} 页...")all_books = for page in range(1, pages + 1):print(f"爬取第 {page} 页...")# 构建URL(这里使用模拟URL)url = f"{base_url}?page={page}"try:response = self.session.get(url, timeout=10)response.raise_for_status# 解析页面soup = BeautifulSoup(response.text, 'html.parser')# 提取图书信息(模拟实现)books = self.extract_books_from_page(soup, page)all_books.extend(books)print(f"第 {page} 页爬取完成,获得 {len(books)} 本图书")# 礼貌延迟time.sleep(2)except Exception as e:print(f"爬取第 {page} 页失败: {e}")continuereturn all_booksdef extract_books_from_page(self, soup, page_num):"""从页面提取图书信息(模拟实现)"""# 在实际项目中,这里需要根据目标网站的实际HTML结构编写提取逻辑# 这里使用模拟数据books = for i in range(5): # 每页模拟5本书book = {'id': f"book_{page_num}_{i+1}",'title': f"图书标题 {page_num}-{i+1}",'author': f"作者 {i+1}",'price': round(20 + i * 5 + page_num, 2),'rating': round(3 + i * 0.5, 1),'description': f"这是第 {page_num} 页第 {i+1} 本书的描述",'category': ['小说', '文学', '科技'][i % 3],'publish_date': f"2023-{page_num:02d}-{i+1:02d}",'page_count': 200 + i * 50,'crawl_time': datetime.now.isoformat}books.append(book)return booksdef crawl_book_detail(self, book_id):"""爬取图书详细信息"""print(f"爬取图书详情: {book_id}")# 模拟详细爬取detail = {'book_id': book_id,'title': f"图书标题 {book_id}",'isbn': f"978-7-{book_id.replace('_', '-')}",'publisher': "示例出版社",'summary': "这是图书的详细摘要...",'chapters': [f"第{i+1}章" for i in range(10)],'reviews': [{'user': '用户1', 'rating': 5, 'comment': '很好'},{'user': '用户2', 'rating': 4, 'comment': '不错'}]}time.sleep(1) # 延迟return detaildef analyze_books(self, books):"""分析图书数据"""print("\n进行图书数据分析...")if not books:print("没有数据可分析")return# 基本统计total_books = len(books)avg_price = sum(book['price'] for book in books) / total_booksavg_rating = sum(book['rating'] for book in books) / total_booksprint(f"图书总数: {total_books}")print(f"平均价格: ¥{avg_price:.2f}")print(f"平均评分: {avg_rating:.1f}")# 价格分布price_ranges = {'0-20': 0, '21-40': 0, '41-60': 0, '61+': 0}for book in books:price = book['price']if price

明天我们将学习自动化运维和系统管理!坚持练习,你的爬虫技能会越来越强!

来源:琢磨先生起飞吧

相关推荐