摘要:def basic_email_check(email: str) -> bool: """ Perform basic email format validation. Args: email: The email address to Validate R
电子邮件验证很棘手 — 简单的 “@” 检查是不够的,但使用复杂的正则表达式也会导致问题。
让我们从一个简单但有用的验证方法开始:
def basic_email_check(email: str) -> bool: """ Perform basic email format validation. Args: email: The email address to Validate Returns: bool: True if email passes basic checks, False otherwise """ # Remove leading/trailing whitespace email = email.strip # Basic checks if not email: # Check if empty return False if len(email) > 254: # RFC 5321 length limit return False # Check for exactly one @ if email.count('@') != 1: return False # Split into local and domain parts local, domain = email.split('@') # Check local and domain part lengths if len(local) > 64: # RFC 5321 limit return False if len(domain) > 255: return False # Check if local or domain are empty if not local or not domain: return False return True# Example usagetest_emails = [ 'user@example.com', 'invalid.email@', '@invalid.com', 'no.at.sign', 'multiple@@signs.com', 'space in@email.com']for email in test_emails: result = basic_email_check(email) print(f"{email}: {'Valid' if result else 'Invalid'}")这个基本的验证器:
- 检查是否存在一个 “@”
- 验证两个部分都不是空的
- 强制实施 RFC 5321 长度限制
- 处理常见的边缘情况
让我们用模式匹配来增强我们的验证:
import redef pattern_email_check(email: str) -> tuple[bool, str]: """ Validate email using pattern matching. Args: email: Email address to validate Returns: tuple: (is_valid, reason) """ if not basic_email_check(email): return False, "Failed basic format check" # Pattern for allowed characters in local part local_pattern = r'^[a-zA-Z0-9.!#$%&\'*+/=?^_`{|}~-]+$' # Pattern for domain (includes internationalized domains) domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$' local, domain = email.split('@') # Check local part if not re.match(local_pattern, local): return False, "Invalid characters in local part" # Check for consecutive special characters if '..' in local: return False, "Consecutive dots not allowed" if local[0] == '.' or local[-1] == '.': return False, "Local part cannot start or end with dot" # Check domain if not re.match(domain_pattern, domain): return False, "Invalid domain format" return True, "Valid email address"# Example usage with detailed feedbacktest_emails = [ 'user.name@example.com', 'user..name@example.com', '@example.com">user.@example.com', 'user@subdomain.example.co.uk', 'user@invalid', 'user@.com', 'user.name@example.', 'user+filter@example.com']print("Detailed Email Validation Results:")for email in test_emails: is_valid, reason = pattern_email_check(email) print(f"\nEmail: {email}") print(f"Valid: {is_valid}") print(f"Reason: {reason}")此验证器添加:
- 使用正则表达式进行字符集验证
- 连续点检查
- 正确的域格式验证
- 详细的反馈消息
以下是您可以在生产环境中使用的更完整的解决方案:
from dataclasses import dataclassfrom typing import List, Optionalimport dns.resolverimport re@dataclassclass ValidationResult: """Store email validation results""" is_valid: bool errors: List[str] warnings: List[str]class EmailValidator: def __init__(self, check_dns: bool = False): self.check_dns = check_dns self.errors = self.warnings = # Common disposable email domains self.disposable_domains = { 'tempmail.com', 'throwaway.com', 'temporarymail.com' } def _check_format(self, email: str) -> bool: """Check email format using comprehensive pattern""" pattern = r""" ^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+ @ [a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])? (?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$ """ if not re.match(pattern, email, re.VERBOSE): self.errors.append("Invalid email format") return False return True def _check_dns(self, domain: str) -> bool: """Verify domain has MX records""" try: dns.resolver.resolve(domain, 'MX') return True except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers): self.errors.append(f"Domain {domain} has no MX records") return False except Exception as e: self.warnings.append(f"DNS check failed: {str(e)}") return True def _check_disposable(self, domain: str) -> None: """Check if domain is a known disposable email provider""" if domain in self.disposable_domains: self.warnings.append( f"Domain {domain} appears to be a disposable email service" ) def validate(self, email: str) -> ValidationResult: """ Validate an email address. Args: email: Email address to validate Returns: ValidationResult with validation status and messages """ self.errors = self.warnings = # Basic format check if not email or not isinstance(email, str): self.errors.append("Invalid input") return ValidationResult(False, self.errors, self.warnings) # Remove whitespace email = email.strip # Length check if len(email) > 254: self.errors.append("Email too long") return ValidationResult(False, self.errors, self.warnings) # Format check if not self._check_format(email): return ValidationResult(False, self.errors, self.warnings) # Split email into parts local, domain = email.split('@') # Check lengths if len(local) > 64: self.errors.append("Local part too long") # Check for disposable email service self._check_disposable(domain) # Perform DNS check if enabled if self.check_dns and not self._check_dns(domain): return ValidationResult(False, self.errors, self.warnings) # Return final result is_valid = len(self.errors) == 0 return ValidationResult(is_valid, self.errors, self.warnings)# Example usagevalidator = EmailValidator(check_dns=True)test_emails = [ 'user@example.com', 'invalid.email@nonexistent.domain', 'user@tempmail.com', 'too..many.dots@example.com', 'valid.email@gmail.com']print("Validation Results:")for email in test_emails: result = validator.validate(email) print(f"\nEmail: {email}") print(f"Valid: {result.is_valid}") if result.errors: print("Errors:", result.errors) if result.warnings: print("Warnings:", result.warnings)此生产就绪型验证器包括:
- DNS MX 记录检查
- 一次性电子邮件检测
- 全面的格式验证
- 详细的错误和警告消息
- 清晰的关注点分离
以下是将电子邮件验证与 Flask Web 应用程序集成的方法:
from Flask import Flask, request, jsonifyfrom typing import Dict, Anyimport asyncioimport aiohttpclass WebEmailValidator: def __init__(self): self.validator = EmailValidator(check_dns=True) async def check_email_deliverability(self, email: str) -> Dict[str, Any]: """Check email deliverability using external API""" # Note: Replace with your preferred email verification service API_KEY = 'your_api_key' url = f'https://api.emailverifier.com/check/{email}' async with aiohttp.ClientSession as session: async with session.get(url, headers={'ApiKey': API_KEY}) as response: if response.status == 200: return await response.json return {'deliverable': None, 'error': 'API check failed'} async def validate_email(self, email: str) -> Dict[str, Any]: """Complete email validation with deliverability check""" # First, do basic validation result = self.validator.validate(email) response = { 'email': email, 'is_valid': result.is_valid, 'errors': result.errors, 'warnings': result.warnings } # If basic validation passes, check deliverability if result.is_valid: deliverability = await self.check_email_deliverability(email) response['deliverability'] = deliverability return responseapp = Flask(__name__)validator = WebEmailValidator@app.route('/validate-email', methods=['POST'])async def validate_email: email = request.json.get('email') if not email: return jsonify({'error': 'Email required'}), 400 result = await validator.validate_email(email) return jsonify(result)# Form validation helperdef validate_signup_form(form_data: Dict[str, str]) -> Dict[str, Any]: """Validate signup form with email validation""" result = { 'is_valid': True, 'errors': {}, 'warnings': {} } # Validate email email = form_data.get('email', '').strip email_validation = validator.validator.validate(email) if not email_validation.is_valid: result['is_valid'] = False result['errors']['email'] = email_validation.errors if email_validation.warnings: result['warnings']['email'] = email_validation.warnings return result@app.route('/signup', methods=['POST'])def signup: validation = validate_signup_form(request.form) if not validation['is_valid']: return jsonify(validation), 400 # Continue with signup process...以下是有效验证多封电子邮件的方法:
from concurrent.futures import ThreadPoolExecutorfrom typing import List, Dictimport csvfrom pathlib import Pathclass BatchEmailValidator: def __init__(self, max_workers: int = 5): self.validator = EmailValidator(check_dns=True) self.max_workers = max_workers def validate_emails(self, emails: List[str]) -> List[Dict[str, Any]]: """Validate multiple emails in parallel""" with ThreadPoolExecutor(max_workers=self.max_workers) as executor: results = list(executor.map(self.validator.validate, emails)) return [ { 'email': email, 'is_valid': result.is_valid, 'errors': result.errors, 'warnings': result.warnings } for email, result in zip(emails, results) ] def validate_csv(self, input_path: str, output_path: str, email_column: str = 'email') -> Dict[str, int]: """Validate emails from CSV file""" stats = {'total': 0, 'valid': 0, 'invalid': 0} # Read input CSV with open(input_path, 'r') as infile: reader = csv.DictReader(infile) rows = list(reader) # Validate emails emails = [row[email_column] for row in rows] results = self.validate_emails(emails) # Write results with open(output_path, 'w', newline='') as outfile: fieldnames = ['email', 'is_valid', 'errors', 'warnings'] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader for result in results: writer.writerow({ 'email': result['email'], 'is_valid': result['is_valid'], 'errors': '; '.join(result['errors']), 'warnings': '; '.join(result['warnings']) }) stats['total'] += 1 stats['valid'] += 1 if result['is_valid'] else 0 stats['invalid'] += 0 if result['is_valid'] else 1 return stats# Example usagevalidator = BatchEmailValidator# Validate list of emailsemails = [ 'user@example.com', 'invalid.email', 'user@nonexistent.domain', 'valid.user@gmail.com']results = validator.validate_emails(emails)print("\nBatch Validation Results:")for result in results: print(f"\nEmail: {result['email']}") print(f"Valid: {result['is_valid']}") if result['errors']: print("Errors:", result['errors']) if result['warnings']: print("Warnings:", result['warnings'])# Validate CSV filestats = validator.validate_csv( 'input_emails.csv', 'validation_results.csv')print("\nCSV Validation Stats:", stats)以下是使用您自己的规则扩展验证器的方法:
from typing import Callable, Listclass CustomEmailValidator(EmailValidator): def __init__(self, check_dns: bool = False): super.__init__(check_dns) self.custom_rules: List[Callable] = def add_rule(self, rule: Callable[[str], tuple[bool, str]]): """Add custom validation rule""" self.custom_rules.append(rule) def validate(self, email: str) -> ValidationResult: """Run basic validation plus custom rules""" # Run basic validation first result = super.validate(email) # If basic validation passed, run custom rules if result.is_valid: for rule in self.custom_rules: passed, message = rule(email) if not passed: result.is_valid = False result.errors.append(message) return result# Example custom rulesdef no_role_accounts(email: str) -> tuple[bool, str]: """Reject common role-based email addresses""" roles = {'admin', 'support', 'info', 'sales', 'contact'} local = email.split('@')[0].lower if local in roles: return False, "Role-based email addresses not allowed" return True, ""def required_domain(email: str) -> tuple[bool, str]: """Ensure email is from allowed domain""" allowed_domains = {'company.com', 'subsidiary.com'} domain = email.split('@')[1].lower if domain not in allowed_domains: return False, "Email must be from an approved domain" return True, ""# Example usagevalidator = CustomEmailValidatorvalidator.add_rule(no_role_accounts)validator.add_rule(required_domain)test_emails = [ 'user@company.com', 'admin@company.com', 'contact@example.com', 'valid.user@subsidiary.com']print("\nCustom Validation Results:")for email in test_emails: result = validator.validate(email) print(f"\nEmail: {email}") print(f"Valid: {result.is_valid}") if result.errors: print("Errors:", result.errors) if result.warnings: print("Warnings:", result.warnings)这些示例演示如何:
- 将电子邮件验证与 Web 应用程序集成
- 高效处理多封电子邮件
- 添加自定义验证规则
- 处理实际验证场景
来源:自由坦荡的湖泊AI一点号
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!