Automate Daily Tasks with Python Scripts: Complete Guide

Learn to automate repetitive tasks using Python. Complete guide covering file management, email automation, web scraping, and scheduling workflows.

How to Automate Daily Tasks with Python Scripts: A Complete Guide to Real-World Automation

In today's fast-paced digital world, automation has become essential for productivity and efficiency. Python, with its simple syntax and powerful libraries, stands out as the perfect tool for automating repetitive daily tasks. Whether you're a developer, data analyst, content creator, or business professional, learning to automate routine tasks can save hours of manual work and reduce human error.

This comprehensive guide will walk you through practical automation examples that you can implement immediately. We'll cover file management, email automation, web scraping, and task scheduling – four fundamental areas where automation can make a significant impact on your daily workflow.

Why Python for Automation?

Python's popularity in automation stems from several key advantages:

Simplicity and Readability: Python's clean syntax makes it accessible to beginners while remaining powerful enough for complex tasks.

Rich Ecosystem: Extensive libraries like os, shutil, smtplib, requests, BeautifulSoup, and schedule provide ready-made solutions for common automation needs.

Cross-Platform Compatibility: Python scripts run seamlessly across Windows, macOS, and Linux systems.

Strong Community Support: Abundant documentation, tutorials, and community-contributed solutions make problem-solving easier.

Integration Capabilities: Python easily integrates with APIs, databases, and other systems, making it ideal for comprehensive automation workflows.

Setting Up Your Python Environment

Before diving into automation examples, ensure you have Python installed and the necessary libraries. Here's a quick setup guide:

`bash

Check Python installation

python --version

Install required libraries

pip install requests beautifulsoup4 schedule python-dotenv `

For email automation, you'll also need to configure app-specific passwords for Gmail or your email provider's SMTP settings.

1. File Management Automation: Renaming and Organizing Files

File management is one of the most common automation needs. Let's explore various scenarios from basic renaming to complex organization systems.

Basic File Renaming

Here's a simple script to rename files with a consistent pattern:

`python import os import glob from pathlib import Path

def rename_files_basic(directory, old_pattern, new_pattern): """ Rename files matching a pattern in a directory """ os.chdir(directory) files = glob.glob(old_pattern) for i, filename in enumerate(files, 1): file_extension = os.path.splitext(filename)[1] new_name = f"{new_pattern}_{i:03d}{file_extension}" try: os.rename(filename, new_name) print(f"Renamed: {filename} → {new_name}") except OSError as e: print(f"Error renaming {filename}: {e}")

Example usage

rename_files_basic("/path/to/photos", "*.jpg", "vacation_photo") `

Advanced File Organization System

For more complex file management, here's a comprehensive script that organizes files by type, date, and size:

`python import os import shutil from datetime import datetime from pathlib import Path import mimetypes

class FileOrganizer: def __init__(self, source_directory): self.source_dir = Path(source_directory) self.file_types = { 'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.svg'], 'documents': ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt'], 'spreadsheets': ['.xls', '.xlsx', '.csv', '.ods'], 'presentations': ['.ppt', '.pptx', '.odp'], 'videos': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv'], 'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'], 'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'], 'code': ['.py', '.js', '.html', '.css', '.cpp', '.java'] } def get_file_category(self, file_path): """Determine file category based on extension""" extension = file_path.suffix.lower() for category, extensions in self.file_types.items(): if extension in extensions: return category return 'miscellaneous' def get_file_date(self, file_path): """Get file creation date""" timestamp = os.path.getctime(file_path) return datetime.fromtimestamp(timestamp) def organize_by_type_and_date(self): """Organize files by type and creation date""" if not self.source_dir.exists(): print(f"Source directory {self.source_dir} does not exist") return organized_count = 0 for file_path in self.source_dir.iterdir(): if file_path.is_file(): # Get file category and date category = self.get_file_category(file_path) file_date = self.get_file_date(file_path) # Create destination directory structure dest_dir = self.source_dir / 'organized' / category / f"{file_date.year}" / f"{file_date.month:02d}" dest_dir.mkdir(parents=True, exist_ok=True) # Move file dest_path = dest_dir / file_path.name # Handle duplicate names counter = 1 while dest_path.exists(): name_part = dest_path.stem extension = dest_path.suffix dest_path = dest_dir / f"{name_part}_{counter}{extension}" counter += 1 try: shutil.move(str(file_path), str(dest_path)) print(f"Moved: {file_path.name} → {dest_path.relative_to(self.source_dir)}") organized_count += 1 except Exception as e: print(f"Error moving {file_path.name}: {e}") print(f"\nOrganization complete! {organized_count} files organized.") def clean_empty_directories(self): """Remove empty directories""" for root, dirs, files in os.walk(self.source_dir, topdown=False): for directory in dirs: dir_path = os.path.join(root, directory) try: if not os.listdir(dir_path): os.rmdir(dir_path) print(f"Removed empty directory: {dir_path}") except OSError: pass

Usage example

organizer = FileOrganizer("/path/to/messy/directory") organizer.organize_by_type_and_date() organizer.clean_empty_directories() `

Batch File Processing with Metadata

Here's a script that processes multiple files while preserving and updating metadata:

`python import os from PIL import Image from PIL.ExifTags import TAGS import json from datetime import datetime

class BatchFileProcessor: def __init__(self, directory): self.directory = directory self.processed_files = [] def extract_image_metadata(self, image_path): """Extract metadata from image files""" try: image = Image.open(image_path) exifdata = image.getexif() metadata = {} for tag_id in exifdata: tag = TAGS.get(tag_id, tag_id) data = exifdata.get(tag_id) # Convert bytes to string if necessary if isinstance(data, bytes): data = data.decode() metadata[tag] = data return metadata except Exception as e: print(f"Error extracting metadata from {image_path}: {e}") return {} def rename_with_timestamp(self, file_path): """Rename file with timestamp prefix""" try: # Get file modification time mtime = os.path.getmtime(file_path) timestamp = datetime.fromtimestamp(mtime).strftime("%Y%m%d_%H%M%S") # Create new filename directory = os.path.dirname(file_path) filename = os.path.basename(file_path) name, extension = os.path.splitext(filename) new_filename = f"{timestamp}_{name}{extension}" new_path = os.path.join(directory, new_filename) # Rename file os.rename(file_path, new_path) return new_path except Exception as e: print(f"Error renaming {file_path}: {e}") return file_path def process_images(self): """Process all images in directory""" image_extensions = ['.jpg', '.jpeg', '.png', '.tiff', '.bmp'] for filename in os.listdir(self.directory): if any(filename.lower().endswith(ext) for ext in image_extensions): file_path = os.path.join(self.directory, filename) # Extract metadata metadata = self.extract_image_metadata(file_path) # Rename with timestamp new_path = self.rename_with_timestamp(file_path) # Store processing info self.processed_files.append({ 'original_name': filename, 'new_name': os.path.basename(new_path), 'metadata': metadata, 'processed_at': datetime.now().isoformat() }) print(f"Processed: {filename} → {os.path.basename(new_path)}") def save_processing_log(self): """Save processing log to JSON file""" log_path = os.path.join(self.directory, 'processing_log.json') with open(log_path, 'w') as f: json.dump(self.processed_files, f, indent=2) print(f"Processing log saved to {log_path}")

Usage

processor = BatchFileProcessor("/path/to/images") processor.process_images() processor.save_processing_log() `

2. Email Automation: Sending Automated Emails

Email automation can handle everything from simple notifications to complex newsletter systems. Let's explore various email automation scenarios.

Basic Email Sending

Here's a foundation for sending emails with Python:

`python import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders import os from pathlib import Path

class EmailAutomation: def __init__(self, smtp_server, smtp_port, email, password): self.smtp_server = smtp_server self.smtp_port = smtp_port self.email = email self.password = password def send_simple_email(self, to_email, subject, message): """Send a simple text email""" try: # Create message msg = MIMEText(message) msg['From'] = self.email msg['To'] = to_email msg['Subject'] = subject # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.email, self.password) server.send_message(msg) print(f"Email sent successfully to {to_email}") return True except Exception as e: print(f"Error sending email: {e}") return False def send_html_email(self, to_email, subject, html_content, text_content=None): """Send HTML email with optional text fallback""" try: msg = MIMEMultipart('alternative') msg['From'] = self.email msg['To'] = to_email msg['Subject'] = subject # Add text version if provided if text_content: text_part = MIMEText(text_content, 'plain') msg.attach(text_part) # Add HTML version html_part = MIMEText(html_content, 'html') msg.attach(html_part) # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.email, self.password) server.send_message(msg) print(f"HTML email sent successfully to {to_email}") return True except Exception as e: print(f"Error sending HTML email: {e}") return False def send_email_with_attachment(self, to_email, subject, message, attachment_path): """Send email with file attachment""" try: msg = MIMEMultipart() msg['From'] = self.email msg['To'] = to_email msg['Subject'] = subject # Add message body msg.attach(MIMEText(message, 'plain')) # Add attachment if os.path.exists(attachment_path): with open(attachment_path, "rb") as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename= {os.path.basename(attachment_path)}' ) msg.attach(part) else: print(f"Attachment not found: {attachment_path}") return False # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.email, self.password) server.send_message(msg) print(f"Email with attachment sent successfully to {to_email}") return True except Exception as e: print(f"Error sending email with attachment: {e}") return False

Gmail configuration example

email_client = EmailAutomation( smtp_server="smtp.gmail.com", smtp_port=587, email="your_email@gmail.com", password="your_app_password" ) `

Advanced Email Campaign System

Here's a more sophisticated system for managing email campaigns:

`python import csv import json import time from datetime import datetime, timedelta import random from dataclasses import dataclass from typing import List, Dict import logging

@dataclass class Contact: email: str name: str tags: List[str] last_contacted: datetime = None

class EmailCampaignManager: def __init__(self, email_client): self.email_client = email_client self.contacts = [] self.templates = {} self.campaign_log = [] # Setup logging logging.basicConfig( filename='email_campaign.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def load_contacts_from_csv(self, csv_file): """Load contacts from CSV file""" try: with open(csv_file, 'r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: contact = Contact( email=row['email'], name=row['name'], tags=row.get('tags', '').split(',') if row.get('tags') else [] ) self.contacts.append(contact) print(f"Loaded {len(self.contacts)} contacts from {csv_file}") except Exception as e: print(f"Error loading contacts: {e}") def load_email_template(self, template_name, template_file): """Load email template from HTML file""" try: with open(template_file, 'r', encoding='utf-8') as file: self.templates[template_name] = file.read() print(f"Loaded template: {template_name}") except Exception as e: print(f"Error loading template {template_name}: {e}") def personalize_template(self, template, contact): """Personalize email template for specific contact""" personalized = template.replace('#', contact.name) personalized = personalized.replace('#', contact.email) # Add more personalization logic as needed return personalized def send_campaign(self, template_name, subject, tag_filter=None, delay_range=(1, 3)): """Send email campaign to filtered contacts""" if template_name not in self.templates: print(f"Template {template_name} not found") return # Filter contacts target_contacts = self.contacts if tag_filter: target_contacts = [c for c in self.contacts if tag_filter in c.tags] print(f"Sending campaign to {len(target_contacts)} contacts") success_count = 0 error_count = 0 for contact in target_contacts: try: # Personalize content personalized_content = self.personalize_template( self.templates[template_name], contact ) # Send email success = self.email_client.send_html_email( contact.email, subject, personalized_content ) if success: success_count += 1 contact.last_contacted = datetime.now() # Log success self.campaign_log.append({ 'contact': contact.email, 'template': template_name, 'status': 'sent', 'timestamp': datetime.now().isoformat() }) logging.info(f"Campaign email sent to {contact.email}") else: error_count += 1 logging.error(f"Failed to send campaign email to {contact.email}") # Random delay to avoid being flagged as spam delay = random.uniform(delay_range[0], delay_range[1]) time.sleep(delay) except Exception as e: error_count += 1 logging.error(f"Error sending to {contact.email}: {e}") print(f"Campaign complete: {success_count} sent, {error_count} failed") def save_campaign_report(self, filename): """Save campaign report to JSON file""" with open(filename, 'w') as f: json.dump(self.campaign_log, f, indent=2) print(f"Campaign report saved to {filename}")

Usage example

campaign_manager = EmailCampaignManager(email_client) campaign_manager.load_contacts_from_csv('contacts.csv') campaign_manager.load_email_template('newsletter', 'newsletter_template.html') campaign_manager.send_campaign('newsletter', 'Monthly Newsletter', tag_filter='subscribers') campaign_manager.save_campaign_report('campaign_report.json') `

Email Template Example

Here's a sample HTML email template (newsletter_template.html):

`html

Monthly Newsletter

Hello #!

Welcome to our monthly newsletter. Here are the latest updates:

🚀 New Features

  • Enhanced automation tools
  • Improved user interface
  • Better performance

📊 This Month's Stats

Our community has grown by 25% this month! Thank you for being part of our journey.

💡 Tips & Tricks

Did you know you can automate your daily tasks with Python? Check out our latest blog post for practical examples.

`

3. Web Scraping Automation: Extracting Data from Websites

Web scraping automates data collection from websites, enabling you to gather information for analysis, monitoring, or content creation.

Basic Web Scraping Setup

Here's a foundation for web scraping with error handling and respectful practices:

`python import requests from bs4 import BeautifulSoup import time import random from urllib.parse import urljoin, urlparse import csv import json from datetime import datetime import logging

class WebScraper: def __init__(self, base_url, delay_range=(1, 3)): self.base_url = base_url self.delay_range = delay_range self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) # Setup logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def get_page(self, url, retries=3): """Get page content with error handling and retries""" for attempt in range(retries): try: response = self.session.get(url, timeout=10) response.raise_for_status() # Random delay to be respectful delay = random.uniform(self.delay_range[0], self.delay_range[1]) time.sleep(delay) return response except requests.exceptions.RequestException as e: self.logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}") if attempt == retries - 1: self.logger.error(f"Failed to get {url} after {retries} attempts") return None time.sleep(2 attempt) # Exponential backoff def parse_page(self, html_content): """Parse HTML content with BeautifulSoup""" return BeautifulSoup(html_content, 'html.parser') `

News Article Scraper

Here's a practical example that scrapes news articles:

`python class NewsArticleScraper(WebScraper): def __init__(self, base_url): super().__init__(base_url) self.articles = [] def scrape_article_links(self, page_url): """Extract article links from a news page""" response = self.get_page(page_url) if not response: return [] soup = self.parse_page(response.content) links = [] # Adjust selectors based on the website structure article_links = soup.find_all('a', href=True) for link in article_links: href = link.get('href') if href: full_url = urljoin(self.base_url, href) # Filter for article URLs (adjust pattern as needed) if '/article/' in href or '/news/' in href: links.append(full_url) return list(set(links)) # Remove duplicates def scrape_article_content(self, article_url): """Extract content from a single article""" response = self.get_page(article_url) if not response: return None soup = self.parse_page(response.content) # Extract article data (adjust selectors for target website) article_data = { 'url': article_url, 'title': self.extract_title(soup), 'author': self.extract_author(soup), 'date': self.extract_date(soup), 'content': self.extract_content(soup), 'tags': self.extract_tags(soup), 'scraped_at': datetime.now().isoformat() } return article_data def extract_title(self, soup): """Extract article title""" title_selectors = ['h1', '.article-title', '.post-title', 'title'] for selector in title_selectors: title_elem = soup.select_one(selector) if title_elem: return title_elem.get_text().strip() return "No title found" def extract_author(self, soup): """Extract article author""" author_selectors = ['.author', '.byline', '[rel="author"]', '.post-author'] for selector in author_selectors: author_elem = soup.select_one(selector) if author_elem: return author_elem.get_text().strip() return "Unknown author" def extract_date(self, soup): """Extract publication date""" date_selectors = ['time', '.date', '.published', '.post-date'] for selector in date_selectors: date_elem = soup.select_one(selector) if date_elem: # Try to get datetime attribute first date_str = date_elem.get('datetime') or date_elem.get_text().strip() return date_str return "No date found" def extract_content(self, soup): """Extract article content""" content_selectors = ['.article-content', '.post-content', '.entry-content', 'article p'] for selector in content_selectors: content_elems = soup.select(selector) if content_elems: content = ' '.join([elem.get_text().strip() for elem in content_elems]) return content[:1000] + '...' if len(content) > 1000 else content return "No content found" def extract_tags(self, soup): """Extract article tags/categories""" tag_selectors = ['.tags a', '.categories a', '.post-tags a'] tags = [] for selector in tag_selectors: tag_elems = soup.select(selector) tags.extend([tag.get_text().strip() for tag in tag_elems]) return tags def scrape_multiple_articles(self, start_page, max_articles=50): """Scrape multiple articles from a news website""" self.logger.info(f"Starting to scrape articles from {start_page}") # Get article links article_links = self.scrape_article_links(start_page) self.logger.info(f"Found {len(article_links)} article links") # Limit number of articles article_links = article_links[:max_articles] # Scrape each article for i, link in enumerate(article_links, 1): self.logger.info(f"Scraping article {i}/{len(article_links)}: {link}") article_data = self.scrape_article_content(link) if article_data: self.articles.append(article_data) self.logger.info(f"Successfully scraped: {article_data['title']}") else: self.logger.warning(f"Failed to scrape: {link}") self.logger.info(f"Scraping complete. Total articles: {len(self.articles)}") def save_to_csv(self, filename): """Save scraped articles to CSV file""" if not self.articles: self.logger.warning("No articles to save") return fieldnames = ['title', 'author', 'date', 'url', 'content', 'tags', 'scraped_at'] with open(filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for article in self.articles: # Convert tags list to string article_copy = article.copy() article_copy['tags'] = ', '.join(article['tags']) writer.writerow(article_copy) self.logger.info(f"Articles saved to {filename}") def save_to_json(self, filename): """Save scraped articles to JSON file""" with open(filename, 'w', encoding='utf-8') as jsonfile: json.dump(self.articles, jsonfile, indent=2, ensure_ascii=False) self.logger.info(f"Articles saved to {filename}")

Usage example

scraper = NewsArticleScraper("https://example-news-site.com") scraper.scrape_multiple_articles("https://example-news-site.com/latest", max_articles=20) scraper.save_to_csv("scraped_articles.csv") scraper.save_to_json("scraped_articles.json") `

E-commerce Price Monitor

Here's a specialized scraper for monitoring product prices:

`python class PriceMonitor(WebScraper): def __init__(self): super().__init__("") self.products = [] self.price_history = [] def add_product(self, name, url, price_selector, title_selector=None): """Add a product to monitor""" product = { 'name': name, 'url': url, 'price_selector': price_selector, 'title_selector': title_selector, 'price_history': [] } self.products.append(product) self.logger.info(f"Added product to monitor: {name}") def extract_price(self, soup, selector): """Extract price from page using CSS selector""" price_elem = soup.select_one(selector) if price_elem: price_text = price_elem.get_text().strip() # Extract numeric price (remove currency symbols, etc.) import re price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', '')) if price_match: return float(price_match.group()) return None def check_product_price(self, product): """Check current price for a single product""" response = self.get_page(product['url']) if not response: return None soup = self.parse_page(response.content) # Extract current price current_price = self.extract_price(soup, product['price_selector']) if current_price: price_data = { 'product_name': product['name'], 'price': current_price, 'url': product['url'], 'timestamp': datetime.now().isoformat() } # Add to product history product['price_history'].append(price_data) # Add to global price history self.price_history.append(price_data) self.logger.info(f"Price check: {product['name']} - ${current_price}") return price_data self.logger.warning(f"Could not extract price for {product['name']}") return None def monitor_all_products(self): """Check prices for all monitored products""" self.logger.info("Starting price monitoring for all products") current_prices = [] for product in self.products: price_data = self.check_product_price(product) if price_data: current_prices.append(price_data) return current_prices def check_price_alerts(self, alert_threshold=0.1): """Check for significant price changes""" alerts = [] for product in self.products: if len(product['price_history']) >= 2: recent_prices = product['price_history'][-2:] old_price = recent_prices[0]['price'] new_price = recent_prices[1]['price'] # Calculate percentage change price_change = (new_price - old_price) / old_price if abs(price_change) >= alert_threshold: alert = { 'product_name': product['name'], 'old_price': old_price, 'new_price': new_price, 'change_percent': price_change * 100, 'url': product['url'], 'timestamp': datetime.now().isoformat() } alerts.append(alert) self.logger.info(f"Price alert: {product['name']} changed by {price_change*100:.1f}%") return alerts def save_price_history(self, filename): """Save price history to CSV""" if not self.price_history: return fieldnames = ['product_name', 'price', 'url', 'timestamp'] with open(filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(self.price_history) self.logger.info(f"Price history saved to {filename}")

Usage example

monitor = PriceMonitor() monitor.add_product( "Sample Product", "https://example-store.com/product/123", ".price", ".product-title" )

Monitor prices

current_prices = monitor.monitor_all_products() alerts = monitor.check_price_alerts(alert_threshold=0.05) # 5% threshold monitor.save_price_history("price_history.csv") `

4. Task Scheduling: Automating Script Execution

Task scheduling ensures your automation scripts run at the right time without manual intervention.

Using Python's Schedule Library

Here's a comprehensive scheduling system:

`python import schedule import time import threading from datetime import datetime, timedelta import logging import json import os

class TaskScheduler: def __init__(self): self.scheduled_tasks = [] self.running = False self.scheduler_thread = None # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('scheduler.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def add_daily_task(self, time_str, task_func, task_name, args, *kwargs): """Add a task to run daily at specified time""" job = schedule.every().day.at(time_str).do( self._run_task, task_func, task_name, args, *kwargs ) self.scheduled_tasks.append({ 'name': task_name, 'schedule': f"Daily at {time_str}", 'function': task_func.__name__, 'next_run': job.next_run.isoformat() if job.next_run else None }) self.logger.info(f"Scheduled daily task: {task_name} at {time_str}") return job def add_weekly_task(self, day, time_str, task_func, task_name, args, *kwargs): """Add a task to run weekly on specified day and time""" job = getattr(schedule.every(), day.lower()).at(time_str).do( self._run_task, task_func, task_name, args, *kwargs ) self.scheduled_tasks.append({ 'name': task_name, 'schedule': f"Weekly on {day} at {time_str}", 'function': task_func.__name__, 'next_run': job.next_run.isoformat() if job.next_run else None }) self.logger.info(f"Scheduled weekly task: {task_name} on {day} at {time_str}") return job def add_interval_task(self, interval_minutes, task_func, task_name, args, *kwargs): """Add a task to run at regular intervals""" job = schedule.every(interval_minutes).minutes.do( self._run_task, task_func, task_name, args, *kwargs ) self.scheduled_tasks.append({ 'name': task_name, 'schedule': f"Every {interval_minutes} minutes", 'function': task_func.__name__, 'next_run': job.next_run.isoformat() if job.next_run else None }) self.logger.info(f"Scheduled interval task: {task_name} every {interval_minutes} minutes") return job def _run_task(self, task_func, task_name, args, *kwargs): """Execute a scheduled task with error handling""" try: self.logger.info(f"Starting task: {task_name}") start_time = datetime.now() # Run the task result = task_func(args, *kwargs) end_time = datetime.now() duration = (end_time - start_time).total_seconds() self.logger.info(f"Task completed: {task_name} (Duration: {duration:.2f}s)") # Log task execution self._log_task_execution(task_name, True, duration, None) return result except Exception as e: self.logger.error(f"Task failed: {task_name} - Error: {e}") self._log_task_execution(task_name, False, 0, str(e)) def _log_task_execution(self, task_name, success, duration, error): """Log task execution details""" log_entry = { 'task_name': task_name, 'timestamp': datetime.now().isoformat(), 'success': success, 'duration_seconds': duration, 'error': error } # Append to execution log file log_file = 'task_execution.json' try: if os.path.exists(log_file): with open(log_file, 'r') as f: logs = json.load(f) else: logs = [] logs.append(log_entry) # Keep only last 1000 entries if len(logs) > 1000: logs = logs[-1000:] with open(log_file, 'w') as f: json.dump(logs, f, indent=2) except Exception as e: self.logger.error(f"Failed to log task execution: {e}") def start_scheduler(self): """Start the scheduler in a separate thread""" if self.running: self.logger.warning("Scheduler is already running") return self.running = True self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread.start() self.logger.info("Scheduler started") def _run_scheduler(self): """Main scheduler loop""" while self.running: schedule.run_pending() time.sleep(1) def stop_scheduler(self): """Stop the scheduler""" self.running = False if self.scheduler_thread: self.scheduler_thread.join() self.logger.info("Scheduler stopped") def list_scheduled_tasks(self): """List all scheduled tasks""" self.logger.info("Scheduled tasks:") for task in self.scheduled_tasks: self.logger.info(f" - {task['name']}: {task['schedule']}") return self.scheduled_tasks def get_next_runs(self): """Get next run times for all scheduled jobs""" next_runs = [] for job in schedule.jobs: next_runs.append({ 'job': str(job), 'next_run': job.next_run.isoformat() if job.next_run else None }) return next_runs

Example automation tasks

def backup_files(): """Example backup task""" import shutil source_dir = "/path/to/important/files" backup_dir = f"/path/to/backup/{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: shutil.copytree(source_dir, backup_dir) print(f"Backup completed: {backup_dir}") return True except Exception as e: print(f"Backup failed: {e}") return False

def send_daily_report(): """Example daily report task""" # This would integrate with your email automation report_content = f"Daily report generated at {datetime.now()}" print(f"Sending daily report: {report_content}") return True

def monitor_system_health(): """Example system monitoring task""" import psutil # Get system metrics cpu_percent = psutil.cpu_percent() memory_percent = psutil.virtual_memory().percent disk_percent = psutil.disk_usage('/').percent print(f"System Health - CPU: {cpu_percent}%, Memory: {memory_percent}%, Disk: {disk_percent}%") # Alert if any metric is too high if cpu_percent > 80 or memory_percent > 80 or disk_percent > 90: print("WARNING: System resources running high!") return False return True

Usage example

def main(): scheduler = TaskScheduler() # Schedule various tasks scheduler.add_daily_task("02:00", backup_files, "Daily Backup") scheduler.add_daily_task("09:00", send_daily_report, "Morning Report") scheduler.add_interval_task(30, monitor_system_health, "System Health Check") scheduler.add_weekly_task("monday", "10:00", backup_files, "Weekly Full Backup") # List scheduled tasks scheduler.list_scheduled_tasks() # Start the scheduler scheduler.start_scheduler() try: # Keep the main thread alive while True: time.sleep(60) # Print next runs every minute next_runs = scheduler.get_next_runs() print(f"Next scheduled runs: {len(next_runs)} jobs pending") except KeyboardInterrupt: print("Shutting down scheduler...") scheduler.stop_scheduler()

if __name__ == "__main__": main() `

Advanced Scheduling with Cron-like Syntax

For more complex scheduling needs, here's an enhanced scheduler:

`python import croniter from datetime import datetime import threading import time

class CronScheduler: def __init__(self): self.jobs = [] self.running = False self.scheduler_thread = None def add_cron_job(self, cron_expression, task_func, task_name, args, *kwargs): """Add a job using cron expression""" job = { 'cron_expression': cron_expression, 'task_func': task_func, 'task_name': task_name, 'args': args, 'kwargs': kwargs, 'last_run': None, 'next_run': self._get_next_run(cron_expression) } self.jobs.append(job) print(f"Added cron job: {task_name} with schedule '{cron_expression}'") def _get_next_run(self, cron_expression): """Calculate next run time for cron expression""" cron = croniter.croniter(cron_expression, datetime.now()) return cron.get_next(datetime) def start(self): """Start the cron scheduler""" if self.running: return self.running = True self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread.start() print("Cron scheduler started") def _run_scheduler(self): """Main scheduler loop""" while self.running: current_time = datetime.now() for job in self.jobs: if current_time >= job['next_run']: # Run the job try: print(f"Running job: {job['task_name']}") job['task_func'](job['args'], *job['kwargs']) job['last_run'] = current_time except Exception as e: print(f"Job failed: {job['task_name']} - {e}") # Calculate next run time job['next_run'] = self._get_next_run(job['cron_expression']) time.sleep(30) # Check every 30 seconds def stop(self): """Stop the scheduler""" self.running = False if self.scheduler_thread: self.scheduler_thread.join()

Usage example with cron expressions

cron_scheduler = CronScheduler()

Run every day at 2:30 AM

cron_scheduler.add_cron_job("30 2 *", backup_files, "Daily Backup")

Run every Monday at 9:00 AM

cron_scheduler.add_cron_job("0 9 1", send_daily_report, "Weekly Report")

Run every 15 minutes

cron_scheduler.add_cron_job("/15 *", monitor_system_health, "Health Check")

cron_scheduler.start() `

Bringing It All Together: Complete Automation Workflow

Here's how you can combine all these automation techniques into a comprehensive system:

`python import os from datetime import datetime import logging

class AutomationWorkflow: def __init__(self): self.setup_logging() self.file_organizer = None self.email_client = None self.web_scraper = None self.scheduler = TaskScheduler() def setup_logging(self): """Setup centralized logging""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('automation_workflow.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def daily_maintenance_workflow(self): """Complete daily maintenance workflow""" self.logger.info("Starting daily maintenance workflow") try: # 1. Organize files if self.file_organizer: self.file_organizer.organize_by_type_and_date() self.file_organizer.clean_empty_directories() # 2. Backup important files backup_success = backup_files() # 3. Check system health health_status = monitor_system_health() # 4. Send status email if self.email_client: status_report = f""" Daily Maintenance Report - {datetime.now().strftime('%Y-%m-%d')} File Organization: Completed Backup Status: {'Success' if backup_success else 'Failed'} System Health: {'Good' if health_status else 'Issues Detected'} Workflow completed at {datetime.now()} """ self.email_client.send_simple_email( "admin@example.com", "Daily Maintenance Report", status_report ) self.logger.info("Daily maintenance workflow completed successfully") return True except Exception as e: self.logger.error(f"Daily maintenance workflow failed: {e}") return False def setup_automated_schedule(self): """Setup all automated tasks""" # Daily maintenance at 2 AM self.scheduler.add_daily_task("02:00", self.daily_maintenance_workflow, "Daily Maintenance") # Weekly report on Mondays at 9 AM self.scheduler.add_weekly_task("monday", "09:00", send_daily_report, "Weekly Report") # System health check every hour self.scheduler.add_interval_task(60, monitor_system_health, "Hourly Health Check") # Start the scheduler self.scheduler.start_scheduler() self.logger.info("Automated schedule setup complete") def run_workflow(self): """Run the complete automation workflow""" self.logger.info("Starting automation workflow") # Setup scheduled tasks self.setup_automated_schedule() # Keep running try: while True: time.sleep(300) # Check every 5 minutes self.logger.debug("Workflow running...") except KeyboardInterrupt: self.logger.info("Shutting down automation workflow") self.scheduler.stop_scheduler()

Main execution

if __name__ == "__main__": workflow = AutomationWorkflow() workflow.run_workflow() `

Best Practices and Security Considerations

Error Handling and Logging

Always implement comprehensive error handling and logging:

`python import logging from functools import wraps

def log_errors(func): """Decorator to log function errors""" @wraps(func) def wrapper(args, *kwargs): try: return func(args, *kwargs) except Exception as e: logging.error(f"Error in {func.__name__}: {e}") raise return wrapper

@log_errors def automated_task(): # Your automation code here pass `

Security Best Practices

1. Environment Variables: Store sensitive information in environment variables:

`python import os from dotenv import load_dotenv

load_dotenv()

EMAIL_PASSWORD = os.getenv('EMAIL_PASSWORD') API_KEY = os.getenv('API_KEY') `

2. Input Validation: Always validate inputs and file paths:

`python def validate_file_path(file_path): """Validate file path for security""" import os.path # Check if path exists and is within allowed directory if not os.path.exists(file_path): raise ValueError("File does not exist") # Prevent directory traversal attacks if '..' in file_path: raise ValueError("Invalid file path") return True `

3. Rate Limiting: Implement rate limiting for web scraping:

`python import time from functools import wraps

def rate_limit(calls_per_second=1): """Rate limiting decorator""" def decorator(func): last_called = [0.0] @wraps(func) def wrapper(args, *kwargs): elapsed = time.time() - last_called[0] left_to_wait = 1.0 / calls_per_second - elapsed if left_to_wait > 0: time.sleep(left_to_wait) ret = func(args, *kwargs) last_called[0] = time.time() return ret return wrapper return decorator

@rate_limit(calls_per_second=0.5) # Max 1 call every 2 seconds def scrape_page(url): # Scraping code here pass `

Conclusion

Python automation can transform your daily workflow by eliminating repetitive tasks and ensuring consistency. The examples in this guide provide a solid foundation for:

- File Management: Automatically organize, rename, and process files - Email Automation: Send personalized emails and manage campaigns - Web Scraping: Collect data from websites systematically - Task Scheduling: Run automation scripts at optimal times

Start with simple scripts and gradually build more complex workflows. Remember to:

1. Test thoroughly before deploying automation scripts 2. Implement proper error handling and logging 3. Follow security best practices 4. Respect website terms of service when scraping 5. Monitor your automated systems regularly

By mastering these automation techniques, you'll save time, reduce errors, and focus on more valuable work. The key is to start small, iterate frequently, and build robust systems that can handle real-world scenarios.

Whether you're managing files, communicating with customers, gathering market data, or maintaining systems, Python automation provides the tools to work smarter, not harder. Begin implementing these examples today and experience the power of automation in your daily workflow.

Tags

  • Automation
  • Productivity
  • file management
  • scripting
  • web scraping

Related Articles

Related Books - Expand Your Knowledge

Explore these Python books to deepen your understanding:

Browse all IT books

Popular Technical Articles & Tutorials

Explore our comprehensive collection of technical articles, programming tutorials, and IT guides written by industry experts:

Browse all 8+ technical articles | Read our IT blog

Automate Daily Tasks with Python Scripts: Complete Guide