厦门市建设协会网站,wordpress界面只有文字,织梦网站专题模板,oa软件开发公司摘要本文将深入探讨如何利用Python最新技术栈构建一个功能完整的摄影网站图片爬虫。我们将使用异步编程、智能反爬策略和现代化工具#xff0c;实现高效、可靠的图片下载系统。一、技术选型与准备1.1 核心技术栈异步框架: aiohttp asyncio#xff08;比requests快3-5倍…摘要本文将深入探讨如何利用Python最新技术栈构建一个功能完整的摄影网站图片爬虫。我们将使用异步编程、智能反爬策略和现代化工具实现高效、可靠的图片下载系统。一、技术选型与准备1.1 核心技术栈异步框架: aiohttp asyncio比requests快3-5倍HTML解析: BeautifulSoup4 lxml解析器浏览器模拟: playwright对抗JavaScript渲染并发控制: asyncio.Semaphore智能限流代理支持: 住宅代理轮换策略数据存储: 结构化保存到SQLite 本地文件系统1.2 安装依赖bash# 基础爬虫库 pip install aiohttp beautifulsoup4 lxml # 浏览器自动化 pip install playwright playwright install chromium # 数据处理 pip install pandas sqlalchemy # 进度显示 pip install tqdm rich二、完整爬虫架构设计2.1 项目结构textphotography_crawler/ ├── crawler/ │ ├── __init__.py │ ├── main.py # 主程序入口 │ ├── downloader.py # 下载器模块 │ ├── parser.py # 解析器模块 │ ├── scheduler.py # 调度器模块 │ └── utils.py # 工具函数 ├── config/ │ └── settings.py # 配置文件 ├── data/ │ ├── images/ # 图片存储 │ └── database.db # SQLite数据库 ├── logs/ # 日志文件 └── requirements.txt2.2 配置文件 (config/settings.py)pythonimport os from dataclasses import dataclass from typing import List, Optional dataclass class CrawlerConfig: 爬虫配置类 # 目标网站配置示例Unsplash摄影网站 BASE_URL: str https://unsplash.com SEARCH_URL: str https://unsplash.com/s/photos/{keyword} # 并发设置 MAX_CONCURRENT: int 10 # 最大并发数 REQUEST_DELAY: float 0.5 # 请求延迟秒 TIMEOUT: int 30 # 请求超时时间 # 下载设置 IMAGE_QUALITY: str regular # 图片质量raw, full, regular, small SAVE_DIR: str data/images MAX_IMAGES: int 1000 # 最大下载数量 # 反爬设置 USER_AGENTS: List[str] None PROXY_POOL: List[str] None # 代理池 USE_PLAYWRIGHT: bool True # 是否使用浏览器模拟 # 数据库设置 DB_PATH: str data/database.db def __post_init__(self): if self.USER_AGENTS is None: self.USER_AGENTS [ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15, Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ] # 创建目录 os.makedirs(self.SAVE_DIR, exist_okTrue) os.makedirs(logs, exist_okTrue) config CrawlerConfig()三、核心爬虫实现3.1 异步下载器 (crawler/downloader.py)pythonimport aiohttp import asyncio from typing import Optional, Dict, Any import random import logging from pathlib import Path from tqdm.asyncio import tqdm class AsyncImageDownloader: 异步图片下载器 def __init__(self, config): self.config config self.session None self.semaphore asyncio.Semaphore(config.MAX_CONCURRENT) self.logger self._setup_logger() def _setup_logger(self): 配置日志 logger logging.getLogger(__name__) logger.setLevel(logging.INFO) # 文件处理器 file_handler logging.FileHandler(logs/downloader.log) file_handler.setFormatter( logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) ) # 控制台处理器 console_handler logging.StreamHandler() console_handler.setFormatter( logging.Formatter(%(levelname)s: %(message)s) ) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger async def __aenter__(self): 异步上下文管理器入口 timeout aiohttp.ClientTimeout(totalself.config.TIMEOUT) connector aiohttp.TCPConnector(limit100, sslFalse) self.session aiohttp.ClientSession( timeouttimeout, connectorconnector, headersself._get_headers() ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): 异步上下文管理器退出 if self.session: await self.session.close() def _get_headers(self) - Dict[str, str]: 生成随机请求头 return { User-Agent: random.choice(self.config.USER_AGENTS), Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/webp,*/*;q0.8, Accept-Language: en-US,en;q0.5, Accept-Encoding: gzip, deflate, br, DNT: 1, Connection: keep-alive, Upgrade-Insecure-Requests: 1, } async def fetch(self, url: str, retries: int 3) - Optional[bytes]: 获取网页内容 async with self.semaphore: for attempt in range(retries): try: await asyncio.sleep(self.config.REQUEST_DELAY random.uniform(0, 0.5)) async with self.session.get(url) as response: if response.status 200: return await response.read() elif response.status 429: # 请求过多 wait_time 2 ** attempt self.logger.warning(fRate limited, waiting {wait_time}s) await asyncio.sleep(wait_time) else: self.logger.error(fHTTP {response.status} for {url}) except Exception as e: self.logger.error(fAttempt {attempt 1} failed: {str(e)}) if attempt retries - 1: return None await asyncio.sleep(1 * (attempt 1)) return None async def download_image(self, img_url: str, save_path: Path, pbar: Optional[tqdm] None) - bool: 下载单张图片 try: img_data await self.fetch(img_url) if img_data: save_path.write_bytes(img_data) if pbar: pbar.update(1) pbar.set_description(fDownloaded: {save_path.name[:30]}...) self.logger.info(fSuccess: {save_path.name}) return True except Exception as e: self.logger.error(fFailed to download {img_url}: {str(e)}) return False async def batch_download(self, img_urls: list, keyword: str) - int: 批量下载图片 success_count 0 save_dir Path(self.config.SAVE_DIR) / keyword save_dir.mkdir(exist_okTrue) tasks [] with tqdm(totallen(img_urls), descfDownloading {keyword} images) as pbar: for i, img_url in enumerate(img_urls[:self.config.MAX_IMAGES]): save_path save_dir / f{keyword}_{i:04d}_{hash(img_url) % 10000:04d}.jpg # 如果文件已存在跳过下载 if save_path.exists(): pbar.update(1) success_count 1 continue task asyncio.create_task( self.download_image(img_url, save_path, pbar) ) tasks.append(task) # 等待所有任务完成 results await asyncio.gather(*tasks) success_count sum(results) self.logger.info(fDownloaded {success_count}/{len(img_urls)} images) return success_count3.2 智能解析器 (crawler/parser.py)pythonfrom bs4 import BeautifulSoup import json import re from typing import List, Dict, Any import logging from urllib.parse import urljoin, urlparse import asyncio from playwright.async_api import async_playwright class SmartImageParser: 智能图片解析器支持多种网站 def __init__(self, base_url: str): self.base_url base_url self.logger logging.getLogger(__name__) def parse_unsplash(self, html: str) - List[str]: 解析Unsplash网站 soup BeautifulSoup(html, lxml) img_urls [] # 方法1从JSON-LD结构化数据提取 json_ld soup.find(script, typeapplication/ldjson) if json_ld: try: data json.loads(json_ld.string) if isinstance(data, list): for item in data: if image in item: img_urls.append(item[image]) elif isinstance(data, dict): if image in data: img_urls.append(data[image]) except json.JSONDecodeError: pass # 方法2从图片标签提取 for img in soup.find_all(img, {srcset: True}): srcset img[srcset] # 解析srcset获取最高质量图片 urls re.findall(r(https?://[^\s,]), srcset) if urls: img_urls.append(urls[-1]) # 最后一个通常是最高质量 # 方法3从data-src属性提取 for img in soup.find_all(img, {data-src: True}): img_urls.append(img[data-src]) # 去重并过滤 img_urls list(dict.fromkeys(img_urls)) img_urls [url for url in img_urls if url.endswith((.jpg, .jpeg, .png, .webp))] return img_urls def parse_flickr(self, html: str) - List[str]: 解析Flickr网站 soup BeautifulSoup(html, lxml) img_urls [] # Flickr的图片在特定的div中 for photo_div in soup.find_all(div, class_re.compile(rphoto-.*)): img photo_div.find(img) if img and img.get(src): # 构造大图URL small_url img[src] large_url small_url.replace(_m., _b.) # 小图转大图 img_urls.append(large_url) return img_urls def parse_generic(self, html: str) - List[str]: 通用解析方法 soup BeautifulSoup(html, lxml) img_urls [] patterns [ rhttps?://[^\s\]?\.(?:jpg|jpeg|png|webp|gif), rurl\([\]?(https?://[^\)])[\]?\) ] # 从HTML文本中正则匹配 text str(soup) for pattern in patterns: matches re.findall(pattern, text, re.IGNORECASE) img_urls.extend(matches) # 从img标签获取 for img in soup.find_all(img): for attr in [src, data-src, data-original, data-lazy-src]: if img.get(attr): full_url urljoin(self.base_url, img[attr]) if full_url not in img_urls: img_urls.append(full_url) return list(dict.fromkeys(img_urls)) async def parse_with_playwright(self, url: str) - List[str]: 使用Playwright解析JavaScript渲染的页面 img_urls [] async with async_playwright() as p: browser await p.chromium.launch(headlessTrue) context await browser.new_context( viewport{width: 1920, height: 1080}, user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ) page await context.new_page() try: # 监听图片请求 def handle_response(response): if response.request.resource_type image: img_url response.url if img_url.endswith((.jpg, .jpeg, .png, .webp)): img_urls.append(img_url) page.on(response, handle_response) # 访问页面并滚动加载 await page.goto(url, wait_untilnetworkidle) # 模拟滚动加载更多内容 for _ in range(5): await page.evaluate(window.scrollTo(0, document.body.scrollHeight)) await page.wait_for_timeout(1000) # 等待图片加载 await page.wait_for_load_state(networkidle) # 从页面中提取图片URL page_img_urls await page.evaluate( () { const urls new Set(); document.querySelectorAll(img).forEach(img { const src img.src || img.dataset.src || img.dataset.original; if (src /\.(jpg|jpeg|png|webp)$/i.test(src)) { urls.add(src); } }); return Array.from(urls); } ) img_urls.extend(page_img_urls) finally: await browser.close() return list(dict.fromkeys(img_urls)) def detect_website_type(self, url: str) - str: 自动检测网站类型 domain urlparse(url).netloc.lower() if unsplash in domain: return unsplash elif flickr in domain: return flickr elif 500px in domain: return 500px else: return generic async def parse(self, html: str, url: str, use_playwright: bool False) - List[str]: 智能解析入口 website_type self.detect_website_type(url) if use_playwright: return await self.parse_with_playwright(url) if website_type unsplash: return self.parse_unsplash(html) elif website_type flickr: return self.parse_flickr(html) else: return self.parse_generic(html)3.3 数据管理器 (crawler/database.py)pythonimport sqlite3 from sqlite3 import Error from datetime import datetime from pathlib import Path import pandas as pd from typing import List, Dict, Optional class ImageDatabase: 图片下载记录数据库 def __init__(self, db_path: str): self.db_path db_path self._init_database() def _init_database(self): 初始化数据库表 conn self._create_connection() if conn: sql_create_images_table CREATE TABLE IF NOT EXISTS downloaded_images ( id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT NOT NULL, image_url TEXT NOT NULL UNIQUE, local_path TEXT NOT NULL, file_size INTEGER, resolution TEXT, download_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, website TEXT, tags TEXT ); sql_create_stats_table CREATE TABLE IF NOT EXISTS download_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, date DATE NOT NULL, keyword TEXT NOT NULL, total_downloaded INTEGER DEFAULT 0, failed_count INTEGER DEFAULT 0, UNIQUE(date, keyword) ); try: c conn.cursor() c.execute(sql_create_images_table) c.execute(sql_create_stats_table) conn.commit() except Error as e: print(fDatabase error: {e}) finally: conn.close() def _create_connection(self): 创建数据库连接 try: return sqlite3.connect(self.db_path) except Error as e: print(fConnection error: {e}) return None def save_image_record(self, record: Dict[str, any]): 保存图片下载记录 conn self._create_connection() if conn: try: sql INSERT OR IGNORE INTO downloaded_images (keyword, image_url, local_path, file_size, resolution, website, tags) VALUES (?, ?, ?, ?, ?, ?, ?) c conn.cursor() c.execute(sql, ( record[keyword], record[image_url], record[local_path], record.get(file_size), record.get(resolution), record.get(website), record.get(tags) )) conn.commit() return c.lastrowid except Error as e: print(fSave error: {e}) return None finally: conn.close() def get_downloaded_urls(self, keyword: str None) - List[str]: 获取已下载的图片URL conn self._create_connection() urls [] if conn: try: if keyword: sql SELECT image_url FROM downloaded_images WHERE keyword ? c conn.cursor() c.execute(sql, (keyword,)) else: sql SELECT image_url FROM downloaded_images c conn.cursor() c.execute(sql) urls [row[0] for row in c.fetchall()] except Error as e: print(fQuery error: {e}) finally: conn.close() return urls def export_to_csv(self, output_path: str download_stats.csv): 导出数据到CSV conn self._create_connection() if conn: try: # 读取数据到DataFrame df pd.read_sql_query(SELECT * FROM downloaded_images, conn) # 添加统计信息 stats df.groupby(keyword).agg({ id: count, file_size: sum }).rename(columns{id: image_count, file_size: total_size}) # 保存到CSV df.to_csv(Path(output_path).with_name(image_details.csv), indexFalse) stats.to_csv(Path(output_path).with_name(keyword_stats.csv)) print(f数据已导出到 {output_path}) return True except Error as e: print(fExport error: {e}) return False finally: conn.close()四、主程序实现4.1 主爬虫类 (crawler/main.py)pythonimport asyncio import sys from pathlib import Path from typing import List, Optional import argparse from rich.console import Console from rich.table import Table from rich.progress import Progress, SpinnerColumn, TextColumn from .downloader import AsyncImageDownloader from .parser import SmartImageParser from .database import ImageDatabase from config.settings import config console Console() class PhotographyCrawler: 摄影网站爬虫主类 def __init__(self): self.downloader None self.parser SmartImageParser(config.BASE_URL) self.db ImageDatabase(config.DB_PATH) self.console console async def crawl_keyword(self, keyword: str, max_pages: int 3) - int: 爬取特定关键词的图片 # 获取已下载的URL避免重复 downloaded_urls set(self.db.get_downloaded_urls(keyword)) all_img_urls [] with Progress( SpinnerColumn(), TextColumn([progress.description]{task.description}), consoleself.console ) as progress: task progress.add_task(f[cyan]Crawling {keyword}..., totalmax_pages) for page in range(1, max_pages 1): # 构建搜索URL search_url config.SEARCH_URL.format(keywordkeyword) if page 1: search_url f{search_url}?page{page} # 获取页面HTML async with AsyncImageDownloader(config) as downloader: html await downloader.fetch(search_url) if html: # 解析图片URL img_urls await self.parser.parse( html.decode(utf-8, errorsignore), search_url, config.USE_PLAYWRIGHT ) # 过滤已下载的和无效URL new_urls [ url for url in img_urls if url not in downloaded_urls and url.startswith(http) ] all_img_urls.extend(new_urls) downloaded_urls.update(new_urls) self.console.print(fPage {page}: Found {len(new_urls)} new images) progress.update(task, advance1) await asyncio.sleep(1) # 页面间延迟 # 下载图片 if all_img_urls: self.console.print(f\n[green]Total unique images found: {len(all_img_urls)}[/green]) async with AsyncImageDownloader(config) as downloader: success_count await downloader.batch_download(all_img_urls, keyword) # 保存记录到数据库 for i, img_url in enumerate(all_img_urls[:success_count]): record { keyword: keyword, image_url: img_url, local_path: str(Path(config.SAVE_DIR) / keyword / f{keyword}_{i:04d}.jpg), website: config.BASE_URL, tags: keyword } self.db.save_image_record(record) return success_count return 0 def display_stats(self): 显示统计信息 conn self.db._create_connection() if conn: try: # 获取总统计 c conn.cursor() c.execute(SELECT COUNT(*) FROM downloaded_images) total_images c.fetchone()[0] c.execute(SELECT COUNT(DISTINCT keyword) FROM downloaded_images) total_keywords c.fetchone()[0] c.execute(SELECT keyword, COUNT(*) as count FROM downloaded_images GROUP BY keyword ORDER BY count DESC LIMIT 5) top_keywords c.fetchall() # 创建表格显示 table Table(titleDownload Statistics, show_headerTrue, header_stylebold magenta) table.add_column(Metric, stylecyan) table.add_column(Value, stylegreen) table.add_row(Total Images, str(total_images)) table.add_row(Total Keywords, str(total_keywords)) table.add_row(Storage Directory, config.SAVE_DIR) self.console.print(table) # 显示热门关键词 if top_keywords: kw_table Table(titleTop Keywords, show_headerTrue) kw_table.add_column(Keyword, stylecyan) kw_table.add_column(Image Count, stylegreen) for keyword, count in top_keywords: kw_table.add_row(keyword, str(count)) self.console.print(kw_table) finally: conn.close() async def main(): 主函数 parser argparse.ArgumentParser(description摄影网站图片爬虫) parser.add_argument(keywords, nargs, help搜索关键词) parser.add_argument(--pages, typeint, default3, help每个关键词爬取页数) parser.add_argument(--max-images, typeint, default100, help最大下载图片数) parser.add_argument(--use-playwright, actionstore_true, help使用Playwright渲染) parser.add_argument(--export, actionstore_true, help导出数据到CSV) args parser.parse_args() # 更新配置 config.MAX_IMAGES args.max_images config.USE_PLAYWRIGHT args.use_playwright # 创建爬虫实例 crawler PhotographyCrawler() # 开始爬取 total_downloaded 0 for keyword in args.keywords: console.print(f\n[bold yellow]开始爬取关键词: {keyword}[/bold yellow]) downloaded await crawler.crawl_keyword(keyword, args.pages) total_downloaded downloaded console.print(f[green]✓ {keyword}: 下载了 {downloaded} 张图片[/green]) # 显示统计 crawler.display_stats() # 导出数据 if args.export: crawler.db.export_to_csv() console.print(f\n[bold green]✅ 完成总共下载了 {total_downloaded} 张图片[/bold green]) if __name__ __main__: asyncio.run(main())4.2 使用示例python# 示例1基本使用 # 爬取landscape和portrait关键词的图片 python main.py landscape portrait --pages 2 --max-images 50 # 示例2使用Playwright渲染JavaScript页面 python main.py nature --use-playwright --pages 5 # 示例3导出数据到CSV python main.py cityscape --export --max-images 200五、高级功能与优化5.1 代理IP支持pythonclass ProxyManager: 代理IP管理器 def __init__(self, proxy_list: List[str]): self.proxy_list proxy_list self.current_index 0 def get_proxy(self) - Optional[str]: 获取下一个代理 if not self.proxy_list: return None proxy self.proxy_list[self.current_index] self.current_index (self.current_index 1) % len(self.proxy_list) return proxy async def test_proxy(self, proxy: str) - bool: 测试代理是否可用 try: async with aiohttp.ClientSession() as session: async with session.get( http://httpbin.org/ip, proxyfhttp://{proxy}, timeout5 ) as response: return response.status 200 except: return False5.2 分布式扩展pythonimport redis import pickle from typing import List class DistributedCrawler: 分布式爬虫控制器 def __init__(self, redis_url: str redis://localhost:6379): self.redis redis.from_url(redis_url) self.task_queue crawler:tasks self.result_queue crawler:results def enqueue_task(self, keyword: str, pages: int 3): 添加任务到队列 task { keyword: keyword, pages: pages, timestamp: datetime.now().isoformat() } self.redis.lpush(self.task_queue, pickle.dumps(task)) async def process_results(self): 处理结果 while True: result self.redis.rpop(self.result_queue) if result: data pickle.loads(result) # 处理下载结果 print(fProcessed {data[keyword]}: {data[downloaded]} images) await asyncio.sleep(1)5.3 图片后处理pythonfrom PIL import Image import hashlib class ImageProcessor: 图片后处理器 staticmethod async def compress_image(input_path: Path, output_path: Path, quality: int 85, max_size: tuple (1920, 1080)): 压缩图片 with Image.open(input_path) as img: # 调整尺寸 img.thumbnail(max_size, Image.Resampling.LANCZOS) # 保存压缩版本 img.save(output_path, JPEG, qualityquality, optimizeTrue) staticmethod def calculate_hash(file_path: Path) - str: 计算图片哈希值去重用 with open(file_path, rb) as f: return hashlib.md5(f.read()).hexdigest()六、伦理与法律注意事项6.1 遵守robots.txtpythonimport urllib.robotparser class RobotsChecker: robots.txt检查器 def __init__(self): self.rp urllib.robotparser.RobotFileParser() async def can_fetch(self, url: str, user_agent: str *) - bool: 检查是否允许爬取 base_url f{url.scheme}://{url.netloc} self.rp.set_url(f{base_url}/robots.txt) try: self.rp.read() return self.rp.can_fetch(user_agent, url) except: return True # 如果无法读取robots.txt谨慎起见返回True6.2 使用建议尊重版权仅下载允许免费使用的图片限制频率添加适当延迟避免对服务器造成压力检查条款遵守网站的Terms of Service商业用途如需商业使用请获取正式授权个人使用仅用于个人学习研究目的七、总结本文详细介绍了一个功能完整的摄影网站图片爬虫的实现方案具有以下特点高性能异步架构使用asyncio实现高并发下载智能反爬策略支持多种反爬措施和浏览器模拟多网站适配可解析多种摄影网站结构数据管理完整的下载记录和统计功能扩展性强支持代理、分布式等高级功能用户友好提供进度显示和详细日志