How to Automate Daily Tasks with Python Scripts: A Complete Guide to Real-World Automation
In today's fast-paced digital world, automation has become essential for productivity and efficiency. Python, with its simple syntax and powerful libraries, stands out as the perfect tool for automating repetitive daily tasks. Whether you're a developer, data analyst, content creator, or business professional, learning to automate routine tasks can save hours of manual work and reduce human error.
This comprehensive guide will walk you through practical automation examples that you can implement immediately. We'll cover file management, email automation, web scraping, and task scheduling – four fundamental areas where automation can make a significant impact on your daily workflow.
Why Python for Automation?
Python's popularity in automation stems from several key advantages:
Simplicity and Readability: Python's clean syntax makes it accessible to beginners while remaining powerful enough for complex tasks.
Rich Ecosystem: Extensive libraries like os, shutil, smtplib, requests, BeautifulSoup, and schedule provide ready-made solutions for common automation needs.
Cross-Platform Compatibility: Python scripts run seamlessly across Windows, macOS, and Linux systems.
Strong Community Support: Abundant documentation, tutorials, and community-contributed solutions make problem-solving easier.
Integration Capabilities: Python easily integrates with APIs, databases, and other systems, making it ideal for comprehensive automation workflows.
Setting Up Your Python Environment
Before diving into automation examples, ensure you have Python installed and the necessary libraries. Here's a quick setup guide:
`bash
Check Python installation
python --versionInstall required libraries
pip install requests beautifulsoup4 schedule python-dotenv`For email automation, you'll also need to configure app-specific passwords for Gmail or your email provider's SMTP settings.
1. File Management Automation: Renaming and Organizing Files
File management is one of the most common automation needs. Let's explore various scenarios from basic renaming to complex organization systems.
Basic File Renaming
Here's a simple script to rename files with a consistent pattern:
`python
import os
import glob
from pathlib import Path
def rename_files_basic(directory, old_pattern, new_pattern): """ Rename files matching a pattern in a directory """ os.chdir(directory) files = glob.glob(old_pattern) for i, filename in enumerate(files, 1): file_extension = os.path.splitext(filename)[1] new_name = f"{new_pattern}_{i:03d}{file_extension}" try: os.rename(filename, new_name) print(f"Renamed: {filename} → {new_name}") except OSError as e: print(f"Error renaming {filename}: {e}")
Example usage
rename_files_basic("/path/to/photos", "*.jpg", "vacation_photo")`Advanced File Organization System
For more complex file management, here's a comprehensive script that organizes files by type, date, and size:
`python
import os
import shutil
from datetime import datetime
from pathlib import Path
import mimetypes
class FileOrganizer: def __init__(self, source_directory): self.source_dir = Path(source_directory) self.file_types = { 'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.svg'], 'documents': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt'], 'spreadsheets': ['.xls', '.xlsx', '.csv', '.ods'], 'presentations': ['.ppt', '.pptx', '.odp'], 'videos': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv'], 'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'], 'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'], 'code': ['.py', '.js', '.html', '.css', '.cpp', '.java'] } def get_file_category(self, file_path): """Determine file category based on extension""" extension = file_path.suffix.lower() for category, extensions in self.file_types.items(): if extension in extensions: return category return 'miscellaneous' def get_file_date(self, file_path): """Get file creation date""" timestamp = os.path.getctime(file_path) return datetime.fromtimestamp(timestamp) def organize_by_type_and_date(self): """Organize files by type and creation date""" if not self.source_dir.exists(): print(f"Source directory {self.source_dir} does not exist") return organized_count = 0 for file_path in self.source_dir.iterdir(): if file_path.is_file(): # Get file category and date category = self.get_file_category(file_path) file_date = self.get_file_date(file_path) # Create destination directory structure dest_dir = self.source_dir / 'organized' / category / f"{file_date.year}" / f"{file_date.month:02d}" dest_dir.mkdir(parents=True, exist_ok=True) # Move file dest_path = dest_dir / file_path.name # Handle duplicate names counter = 1 while dest_path.exists(): name_part = dest_path.stem extension = dest_path.suffix dest_path = dest_dir / f"{name_part}_{counter}{extension}" counter += 1 try: shutil.move(str(file_path), str(dest_path)) print(f"Moved: {file_path.name} → {dest_path.relative_to(self.source_dir)}") organized_count += 1 except Exception as e: print(f"Error moving {file_path.name}: {e}") print(f"\nOrganization complete! {organized_count} files organized.") def clean_empty_directories(self): """Remove empty directories""" for root, dirs, files in os.walk(self.source_dir, topdown=False): for directory in dirs: dir_path = os.path.join(root, directory) try: if not os.listdir(dir_path): os.rmdir(dir_path) print(f"Removed empty directory: {dir_path}") except OSError: pass
Usage example
organizer = FileOrganizer("/path/to/messy/directory") organizer.organize_by_type_and_date() organizer.clean_empty_directories()`Batch File Processing with Metadata
Here's a script that processes multiple files while preserving and updating metadata:
`python
import os
from PIL import Image
from PIL.ExifTags import TAGS
import json
from datetime import datetime
class BatchFileProcessor: def __init__(self, directory): self.directory = directory self.processed_files = [] def extract_image_metadata(self, image_path): """Extract metadata from image files""" try: image = Image.open(image_path) exifdata = image.getexif() metadata = {} for tag_id in exifdata: tag = TAGS.get(tag_id, tag_id) data = exifdata.get(tag_id) # Convert bytes to string if necessary if isinstance(data, bytes): data = data.decode() metadata[tag] = data return metadata except Exception as e: print(f"Error extracting metadata from {image_path}: {e}") return {} def rename_with_timestamp(self, file_path): """Rename file with timestamp prefix""" try: # Get file modification time mtime = os.path.getmtime(file_path) timestamp = datetime.fromtimestamp(mtime).strftime("%Y%m%d_%H%M%S") # Create new filename directory = os.path.dirname(file_path) filename = os.path.basename(file_path) name, extension = os.path.splitext(filename) new_filename = f"{timestamp}_{name}{extension}" new_path = os.path.join(directory, new_filename) # Rename file os.rename(file_path, new_path) return new_path except Exception as e: print(f"Error renaming {file_path}: {e}") return file_path def process_images(self): """Process all images in directory""" image_extensions = ['.jpg', '.jpeg', '.png', '.tiff', '.bmp'] for filename in os.listdir(self.directory): if any(filename.lower().endswith(ext) for ext in image_extensions): file_path = os.path.join(self.directory, filename) # Extract metadata metadata = self.extract_image_metadata(file_path) # Rename with timestamp new_path = self.rename_with_timestamp(file_path) # Store processing info self.processed_files.append({ 'original_name': filename, 'new_name': os.path.basename(new_path), 'metadata': metadata, 'processed_at': datetime.now().isoformat() }) print(f"Processed: {filename} → {os.path.basename(new_path)}") def save_processing_log(self): """Save processing log to JSON file""" log_path = os.path.join(self.directory, 'processing_log.json') with open(log_path, 'w') as f: json.dump(self.processed_files, f, indent=2) print(f"Processing log saved to {log_path}")
Usage
processor = BatchFileProcessor("/path/to/images") processor.process_images() processor.save_processing_log()`2. Email Automation: Sending Automated Emails
Email automation can handle everything from simple notifications to complex newsletter systems. Let's explore various email automation scenarios.
Basic Email Sending
Here's a foundation for sending emails with Python:
`python
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
from pathlib import Path
class EmailAutomation: def __init__(self, smtp_server, smtp_port, email, password): self.smtp_server = smtp_server self.smtp_port = smtp_port self.email = email self.password = password def send_simple_email(self, to_email, subject, message): """Send a simple text email""" try: # Create message msg = MIMEText(message) msg['From'] = self.email msg['To'] = to_email msg['Subject'] = subject # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.email, self.password) server.send_message(msg) print(f"Email sent successfully to {to_email}") return True except Exception as e: print(f"Error sending email: {e}") return False def send_html_email(self, to_email, subject, html_content, text_content=None): """Send HTML email with optional text fallback""" try: msg = MIMEMultipart('alternative') msg['From'] = self.email msg['To'] = to_email msg['Subject'] = subject # Add text version if provided if text_content: text_part = MIMEText(text_content, 'plain') msg.attach(text_part) # Add HTML version html_part = MIMEText(html_content, 'html') msg.attach(html_part) # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.email, self.password) server.send_message(msg) print(f"HTML email sent successfully to {to_email}") return True except Exception as e: print(f"Error sending HTML email: {e}") return False def send_email_with_attachment(self, to_email, subject, message, attachment_path): """Send email with file attachment""" try: msg = MIMEMultipart() msg['From'] = self.email msg['To'] = to_email msg['Subject'] = subject # Add message body msg.attach(MIMEText(message, 'plain')) # Add attachment if os.path.exists(attachment_path): with open(attachment_path, "rb") as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= {os.path.basename(attachment_path)}' ) msg.attach(part) else: print(f"Attachment not found: {attachment_path}") return False # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.email, self.password) server.send_message(msg) print(f"Email with attachment sent successfully to {to_email}") return True except Exception as e: print(f"Error sending email with attachment: {e}") return False
Gmail configuration example
email_client = EmailAutomation( smtp_server="smtp.gmail.com", smtp_port=587, email="your_email@gmail.com", password="your_app_password" )`Advanced Email Campaign System
Here's a more sophisticated system for managing email campaigns:
`python
import csv
import json
import time
from datetime import datetime, timedelta
import random
from dataclasses import dataclass
from typing import List, Dict
import logging
@dataclass class Contact: email: str name: str tags: List[str] last_contacted: datetime = None
class EmailCampaignManager: def __init__(self, email_client): self.email_client = email_client self.contacts = [] self.templates = {} self.campaign_log = [] # Setup logging logging.basicConfig( filename='email_campaign.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def load_contacts_from_csv(self, csv_file): """Load contacts from CSV file""" try: with open(csv_file, 'r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: contact = Contact( email=row['email'], name=row['name'], tags=row.get('tags', '').split(',') if row.get('tags') else [] ) self.contacts.append(contact) print(f"Loaded {len(self.contacts)} contacts from {csv_file}") except Exception as e: print(f"Error loading contacts: {e}") def load_email_template(self, template_name, template_file): """Load email template from HTML file""" try: with open(template_file, 'r', encoding='utf-8') as file: self.templates[template_name] = file.read() print(f"Loaded template: {template_name}") except Exception as e: print(f"Error loading template {template_name}: {e}") def personalize_template(self, template, contact): """Personalize email template for specific contact""" personalized = template.replace('#', contact.name) personalized = personalized.replace('#', contact.email) # Add more personalization logic as needed return personalized def send_campaign(self, template_name, subject, tag_filter=None, delay_range=(1, 3)): """Send email campaign to filtered contacts""" if template_name not in self.templates: print(f"Template {template_name} not found") return # Filter contacts target_contacts = self.contacts if tag_filter: target_contacts = [c for c in self.contacts if tag_filter in c.tags] print(f"Sending campaign to {len(target_contacts)} contacts") success_count = 0 error_count = 0 for contact in target_contacts: try: # Personalize content personalized_content = self.personalize_template( self.templates[template_name], contact ) # Send email success = self.email_client.send_html_email( contact.email, subject, personalized_content ) if success: success_count += 1 contact.last_contacted = datetime.now() # Log success self.campaign_log.append({ 'contact': contact.email, 'template': template_name, 'status': 'sent', 'timestamp': datetime.now().isoformat() }) logging.info(f"Campaign email sent to {contact.email}") else: error_count += 1 logging.error(f"Failed to send campaign email to {contact.email}") # Random delay to avoid being flagged as spam delay = random.uniform(delay_range[0], delay_range[1]) time.sleep(delay) except Exception as e: error_count += 1 logging.error(f"Error sending to {contact.email}: {e}") print(f"Campaign complete: {success_count} sent, {error_count} failed") def save_campaign_report(self, filename): """Save campaign report to JSON file""" with open(filename, 'w') as f: json.dump(self.campaign_log, f, indent=2) print(f"Campaign report saved to {filename}")
Usage example
campaign_manager = EmailCampaignManager(email_client) campaign_manager.load_contacts_from_csv('contacts.csv') campaign_manager.load_email_template('newsletter', 'newsletter_template.html') campaign_manager.send_campaign('newsletter', 'Monthly Newsletter', tag_filter='subscribers') campaign_manager.save_campaign_report('campaign_report.json')`Email Template Example
Here's a sample HTML email template (newsletter_template.html):
`html
Monthly Newsletter
Hello #!
Welcome to our monthly newsletter. Here are the latest updates:
🚀 New Features
- Enhanced automation tools
- Improved user interface
- Better performance
📊 This Month's Stats
Our community has grown by 25% this month! Thank you for being part of our journey.
💡 Tips & Tricks
Did you know you can automate your daily tasks with Python? Check out our latest blog post for practical examples.
`3. Web Scraping Automation: Extracting Data from Websites
Web scraping automates data collection from websites, enabling you to gather information for analysis, monitoring, or content creation.
Basic Web Scraping Setup
Here's a foundation for web scraping with error handling and respectful practices:
`python
import requests
from bs4 import BeautifulSoup
import time
import random
from urllib.parse import urljoin, urlparse
import csv
import json
from datetime import datetime
import logging
class WebScraper:
def __init__(self, base_url, delay_range=(1, 3)):
self.base_url = base_url
self.delay_range = delay_range
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def get_page(self, url, retries=3):
"""Get page content with error handling and retries"""
for attempt in range(retries):
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
# Random delay to be respectful
delay = random.uniform(self.delay_range[0], self.delay_range[1])
time.sleep(delay)
return response
except requests.exceptions.RequestException as e:
self.logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}")
if attempt == retries - 1:
self.logger.error(f"Failed to get {url} after {retries} attempts")
return None
time.sleep(2 attempt) # Exponential backoff
def parse_page(self, html_content):
"""Parse HTML content with BeautifulSoup"""
return BeautifulSoup(html_content, 'html.parser')
`
News Article Scraper
Here's a practical example that scrapes news articles:
`python
class NewsArticleScraper(WebScraper):
def __init__(self, base_url):
super().__init__(base_url)
self.articles = []
def scrape_article_links(self, page_url):
"""Extract article links from a news page"""
response = self.get_page(page_url)
if not response:
return []
soup = self.parse_page(response.content)
links = []
# Adjust selectors based on the website structure
article_links = soup.find_all('a', href=True)
for link in article_links:
href = link.get('href')
if href:
full_url = urljoin(self.base_url, href)
# Filter for article URLs (adjust pattern as needed)
if '/article/' in href or '/news/' in href:
links.append(full_url)
return list(set(links)) # Remove duplicates
def scrape_article_content(self, article_url):
"""Extract content from a single article"""
response = self.get_page(article_url)
if not response:
return None
soup = self.parse_page(response.content)
# Extract article data (adjust selectors for target website)
article_data = {
'url': article_url,
'title': self.extract_title(soup),
'author': self.extract_author(soup),
'date': self.extract_date(soup),
'content': self.extract_content(soup),
'tags': self.extract_tags(soup),
'scraped_at': datetime.now().isoformat()
}
return article_data
def extract_title(self, soup):
"""Extract article title"""
title_selectors = ['h1', '.article-title', '.post-title', 'title']
for selector in title_selectors:
title_elem = soup.select_one(selector)
if title_elem:
return title_elem.get_text().strip()
return "No title found"
def extract_author(self, soup):
"""Extract article author"""
author_selectors = ['.author', '.byline', '[rel="author"]', '.post-author']
for selector in author_selectors:
author_elem = soup.select_one(selector)
if author_elem:
return author_elem.get_text().strip()
return "Unknown author"
def extract_date(self, soup):
"""Extract publication date"""
date_selectors = ['time', '.date', '.published', '.post-date']
for selector in date_selectors:
date_elem = soup.select_one(selector)
if date_elem:
# Try to get datetime attribute first
date_str = date_elem.get('datetime') or date_elem.get_text().strip()
return date_str
return "No date found"
def extract_content(self, soup):
"""Extract article content"""
content_selectors = ['.article-content', '.post-content', '.entry-content', 'article p']
for selector in content_selectors:
content_elems = soup.select(selector)
if content_elems:
content = ' '.join([elem.get_text().strip() for elem in content_elems])
return content[:1000] + '...' if len(content) > 1000 else content
return "No content found"
def extract_tags(self, soup):
"""Extract article tags/categories"""
tag_selectors = ['.tags a', '.categories a', '.post-tags a']
tags = []
for selector in tag_selectors:
tag_elems = soup.select(selector)
tags.extend([tag.get_text().strip() for tag in tag_elems])
return tags
def scrape_multiple_articles(self, start_page, max_articles=50):
"""Scrape multiple articles from a news website"""
self.logger.info(f"Starting to scrape articles from {start_page}")
# Get article links
article_links = self.scrape_article_links(start_page)
self.logger.info(f"Found {len(article_links)} article links")
# Limit number of articles
article_links = article_links[:max_articles]
# Scrape each article
for i, link in enumerate(article_links, 1):
self.logger.info(f"Scraping article {i}/{len(article_links)}: {link}")
article_data = self.scrape_article_content(link)
if article_data:
self.articles.append(article_data)
self.logger.info(f"Successfully scraped: {article_data['title']}")
else:
self.logger.warning(f"Failed to scrape: {link}")
self.logger.info(f"Scraping complete. Total articles: {len(self.articles)}")
def save_to_csv(self, filename):
"""Save scraped articles to CSV file"""
if not self.articles:
self.logger.warning("No articles to save")
return
fieldnames = ['title', 'author', 'date', 'url', 'content', 'tags', 'scraped_at']
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for article in self.articles:
# Convert tags list to string
article_copy = article.copy()
article_copy['tags'] = ', '.join(article['tags'])
writer.writerow(article_copy)
self.logger.info(f"Articles saved to {filename}")
def save_to_json(self, filename):
"""Save scraped articles to JSON file"""
with open(filename, 'w', encoding='utf-8') as jsonfile:
json.dump(self.articles, jsonfile, indent=2, ensure_ascii=False)
self.logger.info(f"Articles saved to {filename}")
Usage example
scraper = NewsArticleScraper("https://example-news-site.com") scraper.scrape_multiple_articles("https://example-news-site.com/latest", max_articles=20) scraper.save_to_csv("scraped_articles.csv") scraper.save_to_json("scraped_articles.json")`E-commerce Price Monitor
Here's a specialized scraper for monitoring product prices:
`python
class PriceMonitor(WebScraper):
def __init__(self):
super().__init__("")
self.products = []
self.price_history = []
def add_product(self, name, url, price_selector, title_selector=None):
"""Add a product to monitor"""
product = {
'name': name,
'url': url,
'price_selector': price_selector,
'title_selector': title_selector,
'price_history': []
}
self.products.append(product)
self.logger.info(f"Added product to monitor: {name}")
def extract_price(self, soup, selector):
"""Extract price from page using CSS selector"""
price_elem = soup.select_one(selector)
if price_elem:
price_text = price_elem.get_text().strip()
# Extract numeric price (remove currency symbols, etc.)
import re
price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
if price_match:
return float(price_match.group())
return None
def check_product_price(self, product):
"""Check current price for a single product"""
response = self.get_page(product['url'])
if not response:
return None
soup = self.parse_page(response.content)
# Extract current price
current_price = self.extract_price(soup, product['price_selector'])
if current_price:
price_data = {
'product_name': product['name'],
'price': current_price,
'url': product['url'],
'timestamp': datetime.now().isoformat()
}
# Add to product history
product['price_history'].append(price_data)
# Add to global price history
self.price_history.append(price_data)
self.logger.info(f"Price check: {product['name']} - ${current_price}")
return price_data
self.logger.warning(f"Could not extract price for {product['name']}")
return None
def monitor_all_products(self):
"""Check prices for all monitored products"""
self.logger.info("Starting price monitoring for all products")
current_prices = []
for product in self.products:
price_data = self.check_product_price(product)
if price_data:
current_prices.append(price_data)
return current_prices
def check_price_alerts(self, alert_threshold=0.1):
"""Check for significant price changes"""
alerts = []
for product in self.products:
if len(product['price_history']) >= 2:
recent_prices = product['price_history'][-2:]
old_price = recent_prices[0]['price']
new_price = recent_prices[1]['price']
# Calculate percentage change
price_change = (new_price - old_price) / old_price
if abs(price_change) >= alert_threshold:
alert = {
'product_name': product['name'],
'old_price': old_price,
'new_price': new_price,
'change_percent': price_change * 100,
'url': product['url'],
'timestamp': datetime.now().isoformat()
}
alerts.append(alert)
self.logger.info(f"Price alert: {product['name']} changed by {price_change*100:.1f}%")
return alerts
def save_price_history(self, filename):
"""Save price history to CSV"""
if not self.price_history:
return
fieldnames = ['product_name', 'price', 'url', 'timestamp']
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.price_history)
self.logger.info(f"Price history saved to {filename}")
Usage example
monitor = PriceMonitor() monitor.add_product( "Sample Product", "https://example-store.com/product/123", ".price", ".product-title" )Monitor prices
current_prices = monitor.monitor_all_products() alerts = monitor.check_price_alerts(alert_threshold=0.05) # 5% threshold monitor.save_price_history("price_history.csv")`4. Task Scheduling: Automating Script Execution
Task scheduling ensures your automation scripts run at the right time without manual intervention.
Using Python's Schedule Library
Here's a comprehensive scheduling system:
`python
import schedule
import time
import threading
from datetime import datetime, timedelta
import logging
import json
import os
class TaskScheduler: def __init__(self): self.scheduled_tasks = [] self.running = False self.scheduler_thread = None # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('scheduler.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def add_daily_task(self, time_str, task_func, task_name, args, *kwargs): """Add a task to run daily at specified time""" job = schedule.every().day.at(time_str).do( self._run_task, task_func, task_name, args, *kwargs ) self.scheduled_tasks.append({ 'name': task_name, 'schedule': f"Daily at {time_str}", 'function': task_func.__name__, 'next_run': job.next_run.isoformat() if job.next_run else None }) self.logger.info(f"Scheduled daily task: {task_name} at {time_str}") return job def add_weekly_task(self, day, time_str, task_func, task_name, args, *kwargs): """Add a task to run weekly on specified day and time""" job = getattr(schedule.every(), day.lower()).at(time_str).do( self._run_task, task_func, task_name, args, *kwargs ) self.scheduled_tasks.append({ 'name': task_name, 'schedule': f"Weekly on {day} at {time_str}", 'function': task_func.__name__, 'next_run': job.next_run.isoformat() if job.next_run else None }) self.logger.info(f"Scheduled weekly task: {task_name} on {day} at {time_str}") return job def add_interval_task(self, interval_minutes, task_func, task_name, args, *kwargs): """Add a task to run at regular intervals""" job = schedule.every(interval_minutes).minutes.do( self._run_task, task_func, task_name, args, *kwargs ) self.scheduled_tasks.append({ 'name': task_name, 'schedule': f"Every {interval_minutes} minutes", 'function': task_func.__name__, 'next_run': job.next_run.isoformat() if job.next_run else None }) self.logger.info(f"Scheduled interval task: {task_name} every {interval_minutes} minutes") return job def _run_task(self, task_func, task_name, args, *kwargs): """Execute a scheduled task with error handling""" try: self.logger.info(f"Starting task: {task_name}") start_time = datetime.now() # Run the task result = task_func(args, *kwargs) end_time = datetime.now() duration = (end_time - start_time).total_seconds() self.logger.info(f"Task completed: {task_name} (Duration: {duration:.2f}s)") # Log task execution self._log_task_execution(task_name, True, duration, None) return result except Exception as e: self.logger.error(f"Task failed: {task_name} - Error: {e}") self._log_task_execution(task_name, False, 0, str(e)) def _log_task_execution(self, task_name, success, duration, error): """Log task execution details""" log_entry = { 'task_name': task_name, 'timestamp': datetime.now().isoformat(), 'success': success, 'duration_seconds': duration, 'error': error } # Append to execution log file log_file = 'task_execution.json' try: if os.path.exists(log_file): with open(log_file, 'r') as f: logs = json.load(f) else: logs = [] logs.append(log_entry) # Keep only last 1000 entries if len(logs) > 1000: logs = logs[-1000:] with open(log_file, 'w') as f: json.dump(logs, f, indent=2) except Exception as e: self.logger.error(f"Failed to log task execution: {e}") def start_scheduler(self): """Start the scheduler in a separate thread""" if self.running: self.logger.warning("Scheduler is already running") return self.running = True self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread.start() self.logger.info("Scheduler started") def _run_scheduler(self): """Main scheduler loop""" while self.running: schedule.run_pending() time.sleep(1) def stop_scheduler(self): """Stop the scheduler""" self.running = False if self.scheduler_thread: self.scheduler_thread.join() self.logger.info("Scheduler stopped") def list_scheduled_tasks(self): """List all scheduled tasks""" self.logger.info("Scheduled tasks:") for task in self.scheduled_tasks: self.logger.info(f" - {task['name']}: {task['schedule']}") return self.scheduled_tasks def get_next_runs(self): """Get next run times for all scheduled jobs""" next_runs = [] for job in schedule.jobs: next_runs.append({ 'job': str(job), 'next_run': job.next_run.isoformat() if job.next_run else None }) return next_runs
Example automation tasks
def backup_files(): """Example backup task""" import shutil source_dir = "/path/to/important/files" backup_dir = f"/path/to/backup/{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: shutil.copytree(source_dir, backup_dir) print(f"Backup completed: {backup_dir}") return True except Exception as e: print(f"Backup failed: {e}") return Falsedef send_daily_report(): """Example daily report task""" # This would integrate with your email automation report_content = f"Daily report generated at {datetime.now()}" print(f"Sending daily report: {report_content}") return True
def monitor_system_health(): """Example system monitoring task""" import psutil # Get system metrics cpu_percent = psutil.cpu_percent() memory_percent = psutil.virtual_memory().percent disk_percent = psutil.disk_usage('/').percent print(f"System Health - CPU: {cpu_percent}%, Memory: {memory_percent}%, Disk: {disk_percent}%") # Alert if any metric is too high if cpu_percent > 80 or memory_percent > 80 or disk_percent > 90: print("WARNING: System resources running high!") return False return True
Usage example
def main(): scheduler = TaskScheduler() # Schedule various tasks scheduler.add_daily_task("02:00", backup_files, "Daily Backup") scheduler.add_daily_task("09:00", send_daily_report, "Morning Report") scheduler.add_interval_task(30, monitor_system_health, "System Health Check") scheduler.add_weekly_task("monday", "10:00", backup_files, "Weekly Full Backup") # List scheduled tasks scheduler.list_scheduled_tasks() # Start the scheduler scheduler.start_scheduler() try: # Keep the main thread alive while True: time.sleep(60) # Print next runs every minute next_runs = scheduler.get_next_runs() print(f"Next scheduled runs: {len(next_runs)} jobs pending") except KeyboardInterrupt: print("Shutting down scheduler...") scheduler.stop_scheduler()if __name__ == "__main__":
main()
`
Advanced Scheduling with Cron-like Syntax
For more complex scheduling needs, here's an enhanced scheduler:
`python
import croniter
from datetime import datetime
import threading
import time
class CronScheduler: def __init__(self): self.jobs = [] self.running = False self.scheduler_thread = None def add_cron_job(self, cron_expression, task_func, task_name, args, *kwargs): """Add a job using cron expression""" job = { 'cron_expression': cron_expression, 'task_func': task_func, 'task_name': task_name, 'args': args, 'kwargs': kwargs, 'last_run': None, 'next_run': self._get_next_run(cron_expression) } self.jobs.append(job) print(f"Added cron job: {task_name} with schedule '{cron_expression}'") def _get_next_run(self, cron_expression): """Calculate next run time for cron expression""" cron = croniter.croniter(cron_expression, datetime.now()) return cron.get_next(datetime) def start(self): """Start the cron scheduler""" if self.running: return self.running = True self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread.start() print("Cron scheduler started") def _run_scheduler(self): """Main scheduler loop""" while self.running: current_time = datetime.now() for job in self.jobs: if current_time >= job['next_run']: # Run the job try: print(f"Running job: {job['task_name']}") job['task_func'](job['args'], *job['kwargs']) job['last_run'] = current_time except Exception as e: print(f"Job failed: {job['task_name']} - {e}") # Calculate next run time job['next_run'] = self._get_next_run(job['cron_expression']) time.sleep(30) # Check every 30 seconds def stop(self): """Stop the scheduler""" self.running = False if self.scheduler_thread: self.scheduler_thread.join()
Usage example with cron expressions
cron_scheduler = CronScheduler()Run every day at 2:30 AM
cron_scheduler.add_cron_job("30 2 *", backup_files, "Daily Backup")Run every Monday at 9:00 AM
cron_scheduler.add_cron_job("0 9 1", send_daily_report, "Weekly Report")Run every 15 minutes
cron_scheduler.add_cron_job("/15 *", monitor_system_health, "Health Check")cron_scheduler.start()
`
Bringing It All Together: Complete Automation Workflow
Here's how you can combine all these automation techniques into a comprehensive system:
`python
import os
from datetime import datetime
import logging
class AutomationWorkflow: def __init__(self): self.setup_logging() self.file_organizer = None self.email_client = None self.web_scraper = None self.scheduler = TaskScheduler() def setup_logging(self): """Setup centralized logging""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('automation_workflow.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def daily_maintenance_workflow(self): """Complete daily maintenance workflow""" self.logger.info("Starting daily maintenance workflow") try: # 1. Organize files if self.file_organizer: self.file_organizer.organize_by_type_and_date() self.file_organizer.clean_empty_directories() # 2. Backup important files backup_success = backup_files() # 3. Check system health health_status = monitor_system_health() # 4. Send status email if self.email_client: status_report = f""" Daily Maintenance Report - {datetime.now().strftime('%Y-%m-%d')} File Organization: Completed Backup Status: {'Success' if backup_success else 'Failed'} System Health: {'Good' if health_status else 'Issues Detected'} Workflow completed at {datetime.now()} """ self.email_client.send_simple_email( "admin@example.com", "Daily Maintenance Report", status_report ) self.logger.info("Daily maintenance workflow completed successfully") return True except Exception as e: self.logger.error(f"Daily maintenance workflow failed: {e}") return False def setup_automated_schedule(self): """Setup all automated tasks""" # Daily maintenance at 2 AM self.scheduler.add_daily_task("02:00", self.daily_maintenance_workflow, "Daily Maintenance") # Weekly report on Mondays at 9 AM self.scheduler.add_weekly_task("monday", "09:00", send_daily_report, "Weekly Report") # System health check every hour self.scheduler.add_interval_task(60, monitor_system_health, "Hourly Health Check") # Start the scheduler self.scheduler.start_scheduler() self.logger.info("Automated schedule setup complete") def run_workflow(self): """Run the complete automation workflow""" self.logger.info("Starting automation workflow") # Setup scheduled tasks self.setup_automated_schedule() # Keep running try: while True: time.sleep(300) # Check every 5 minutes self.logger.debug("Workflow running...") except KeyboardInterrupt: self.logger.info("Shutting down automation workflow") self.scheduler.stop_scheduler()
Main execution
if __name__ == "__main__": workflow = AutomationWorkflow() workflow.run_workflow()`Best Practices and Security Considerations
Error Handling and Logging
Always implement comprehensive error handling and logging:
`python
import logging
from functools import wraps
def log_errors(func): """Decorator to log function errors""" @wraps(func) def wrapper(args, *kwargs): try: return func(args, *kwargs) except Exception as e: logging.error(f"Error in {func.__name__}: {e}") raise return wrapper
@log_errors
def automated_task():
# Your automation code here
pass
`
Security Best Practices
1. Environment Variables: Store sensitive information in environment variables:
`python
import os
from dotenv import load_dotenv
load_dotenv()
EMAIL_PASSWORD = os.getenv('EMAIL_PASSWORD')
API_KEY = os.getenv('API_KEY')
`
2. Input Validation: Always validate inputs and file paths:
`python
def validate_file_path(file_path):
"""Validate file path for security"""
import os.path
# Check if path exists and is within allowed directory
if not os.path.exists(file_path):
raise ValueError("File does not exist")
# Prevent directory traversal attacks
if '..' in file_path:
raise ValueError("Invalid file path")
return True
`
3. Rate Limiting: Implement rate limiting for web scraping:
`python
import time
from functools import wraps
def rate_limit(calls_per_second=1): """Rate limiting decorator""" def decorator(func): last_called = [0.0] @wraps(func) def wrapper(args, *kwargs): elapsed = time.time() - last_called[0] left_to_wait = 1.0 / calls_per_second - elapsed if left_to_wait > 0: time.sleep(left_to_wait) ret = func(args, *kwargs) last_called[0] = time.time() return ret return wrapper return decorator
@rate_limit(calls_per_second=0.5) # Max 1 call every 2 seconds
def scrape_page(url):
# Scraping code here
pass
`
Conclusion
Python automation can transform your daily workflow by eliminating repetitive tasks and ensuring consistency. The examples in this guide provide a solid foundation for:
- File Management: Automatically organize, rename, and process files - Email Automation: Send personalized emails and manage campaigns - Web Scraping: Collect data from websites systematically - Task Scheduling: Run automation scripts at optimal times
Start with simple scripts and gradually build more complex workflows. Remember to:
1. Test thoroughly before deploying automation scripts 2. Implement proper error handling and logging 3. Follow security best practices 4. Respect website terms of service when scraping 5. Monitor your automated systems regularly
By mastering these automation techniques, you'll save time, reduce errors, and focus on more valuable work. The key is to start small, iterate frequently, and build robust systems that can handle real-world scenarios.
Whether you're managing files, communicating with customers, gathering market data, or maintaining systems, Python automation provides the tools to work smarter, not harder. Begin implementing these examples today and experience the power of automation in your daily workflow.