🎁 New User? Get 20% off your first purchase with code NEWUSER20 Register Now →
Menu

Categories

Python Automation: 10 Scripts to Replace Your Manual Tasks Today

Python Automation: 10 Scripts to Replace Your Manual Tasks Today

Python has become the go-to language for system administrators who want to automate their daily tasks. Unlike complex shell scripts that quickly become unmaintainable, Python scripts are readable, portable, and easy to extend. Here are 10 practical scripts you can start using immediately.

Each script is self-contained and tested on Python 3.10+. Copy them, modify them for your environment, and start saving time today.

1. Automated Log File Analyzer

Parse log files and extract meaningful patterns — failed logins, error rates, and suspicious activity:

import re
from collections import Counter
from pathlib import Path

def analyze_auth_log(log_path="/var/log/auth.log"):
    failed_ips = Counter()
    successful_logins = []

    with open(log_path, "r") as f:
        for line in f:
            if "Failed password" in line:
                match = re.search(r"from (\d+\.\d+\.\d+\.\d+)", line)
                if match:
                    failed_ips[match.group(1)] += 1
            elif "Accepted" in line:
                match = re.search(r"for (\w+) from (\d+\.\d+\.\d+\.\d+)", line)
                if match:
                    successful_logins.append((match.group(1), match.group(2)))

    print("Top 10 Failed Login IPs:")
    for ip, count in failed_ips.most_common(10):
        print(f"  {ip}: {count} attempts")

    print(f"\nSuccessful Logins: {len(successful_logins)}")
    for user, ip in successful_logins[-5:]:
        print(f"  {user} from {ip}")

analyze_auth_log()

2. Disk Space Monitor with Email Alerts

Monitor disk usage and send alerts before you run out of space:

import shutil
import smtplib
from email.mime.text import MIMEText

def check_disk_space(threshold=80):
    alerts = []
    partitions = ["/", "/var", "/home", "/tmp"]

    for partition in partitions:
        try:
            usage = shutil.disk_usage(partition)
            percent_used = (usage.used / usage.total) * 100
            free_gb = usage.free / (1024**3)

            if percent_used > threshold:
                alerts.append(
                    f"{partition}: {percent_used:.1f}% used, "
                    f"{free_gb:.1f}GB free"
                )
        except FileNotFoundError:
            continue

    if alerts:
        message = "Disk Space Alerts:\n\n" + "\n".join(alerts)
        print(message)
        # Add email sending logic here

    return alerts

check_disk_space()

3. Bulk DNS Lookup Tool

Resolve multiple domains and check DNS records efficiently:

import socket
import concurrent.futures

def dns_lookup(domain):
    results = {"domain": domain}
    try:
        results["ip"] = socket.gethostbyname(domain)
        results["fqdn"] = socket.getfqdn(domain)
        results["aliases"] = socket.gethostbyname_ex(domain)[1]
        results["status"] = "resolved"
    except socket.gaierror as e:
        results["status"] = f"failed: {e}"
    return results

domains = [
    "google.com", "github.com", "stackoverflow.com",
    "python.org", "linux.org", "invalid-domain-test.xyz"
]

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(dns_lookup, domains)

for result in results:
    status = result["status"]
    if status == "resolved":
        print(f"{result['domain']} -> {result['ip']}")
    else:
        print(f"{result['domain']} -> {status}")

4. Server Uptime Checker

Monitor the availability of your web services:

import urllib.request
import time
import json
from datetime import datetime

def check_endpoints(endpoints):
    results = []
    for url in endpoints:
        start = time.time()
        try:
            response = urllib.request.urlopen(url, timeout=10)
            elapsed = (time.time() - start) * 1000
            results.append({
                "url": url,
                "status": response.status,
                "response_ms": round(elapsed, 1),
                "ok": True
            })
        except Exception as e:
            results.append({
                "url": url,
                "error": str(e),
                "ok": False
            })
    return results

urls = [
    "https://your-site.com",
    "https://api.your-site.com/health",
    "https://monitoring.your-site.com"
]

print(f"Health Check - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
for r in check_endpoints(urls):
    if r["ok"]:
        print(f"  OK  {r['url']} ({r['response_ms']}ms)")
    else:
        print(f"  FAIL {r['url']} - {r['error']}")

5. CSV to JSON Converter with Data Validation

Convert CSV files to JSON with automatic type detection and validation:

import csv
import json
from pathlib import Path

def csv_to_json(csv_path, json_path=None):
    if json_path is None:
        json_path = Path(csv_path).with_suffix(".json")

    records = []
    with open(csv_path, "r", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            cleaned = {}
            for key, value in row.items():
                key = key.strip().lower().replace(" ", "_")
                if value.isdigit():
                    cleaned[key] = int(value)
                else:
                    try:
                        cleaned[key] = float(value)
                    except ValueError:
                        cleaned[key] = value.strip()
            records.append(cleaned)

    with open(json_path, "w") as f:
        json.dump(records, f, indent=2)

    print(f"Converted {len(records)} records to {json_path}")
    return records

6. Automated Backup Script

Create compressed backups with rotation and logging:

import tarfile
import os
from datetime import datetime, timedelta
from pathlib import Path

def create_backup(source_dirs, backup_dir, keep_days=30):
    os.makedirs(backup_dir, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_name = f"backup_{timestamp}.tar.gz"
    backup_path = os.path.join(backup_dir, backup_name)

    with tarfile.open(backup_path, "w:gz") as tar:
        for source in source_dirs:
            if os.path.exists(source):
                tar.add(source, arcname=os.path.basename(source))
                print(f"  Added: {source}")

    size_mb = os.path.getsize(backup_path) / (1024 * 1024)
    print(f"Backup created: {backup_name} ({size_mb:.1f} MB)")

    cutoff = datetime.now() - timedelta(days=keep_days)
    for f in Path(backup_dir).glob("backup_*.tar.gz"):
        if datetime.fromtimestamp(f.stat().st_mtime) < cutoff:
            f.unlink()
            print(f"  Removed old backup: {f.name}")

create_backup(
    source_dirs=["/etc/nginx", "/var/www/html", "/etc/ssh"],
    backup_dir="/backup/daily"
)

7. Process Monitor and Auto-Restart

Watch critical processes and restart them if they crash:

import subprocess
import time
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(message)s",
    filename="/var/log/process_monitor.log"
)

def is_running(process_name):
    result = subprocess.run(
        ["pgrep", "-f", process_name],
        capture_output=True
    )
    return result.returncode == 0

def restart_service(service_name):
    subprocess.run(["systemctl", "restart", service_name])
    logging.warning(f"Restarted service: {service_name}")

services = {
    "nginx": "nginx",
    "php-fpm": "php8.3-fpm",
    "redis-server": "redis"
}

for process, service in services.items():
    if not is_running(process):
        logging.error(f"Process {process} is not running!")
        restart_service(service)
    else:
        logging.info(f"Process {process} is running normally")

8. SSL Certificate Expiry Checker

import ssl
import socket
from datetime import datetime

def check_ssl_expiry(hostname, port=443):
    context = ssl.create_default_context()
    with socket.create_connection((hostname, port), timeout=10) as sock:
        with context.wrap_socket(sock, server_hostname=hostname) as ssock:
            cert = ssock.getpeercert()
            expiry = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
            days_left = (expiry - datetime.now()).days
            return {"hostname": hostname, "expiry": expiry, "days_left": days_left}

domains = ["google.com", "github.com", "your-domain.com"]
for domain in domains:
    try:
        info = check_ssl_expiry(domain)
        status = "OK" if info["days_left"] > 30 else "WARNING"
        print(f"[{status}] {domain}: {info['days_left']} days until expiry")
    except Exception as e:
        print(f"[ERROR] {domain}: {e}")

9. System Information Reporter

import platform
import os
import psutil

def system_report():
    print("=== System Report ===")
    print(f"Hostname: {platform.node()}")
    print(f"OS: {platform.system()} {platform.release()}")
    print(f"Architecture: {platform.machine()}")
    print(f"Python: {platform.python_version()}")

    cpu_count = psutil.cpu_count()
    cpu_percent = psutil.cpu_percent(interval=1)
    print(f"\nCPU: {cpu_count} cores, {cpu_percent}% usage")

    mem = psutil.virtual_memory()
    print(f"Memory: {mem.total // (1024**3)}GB total, "
          f"{mem.percent}% used")

    for part in psutil.disk_partitions():
        usage = psutil.disk_usage(part.mountpoint)
        print(f"Disk {part.mountpoint}: "
              f"{usage.total // (1024**3)}GB total, "
              f"{usage.percent}% used")

system_report()

10. Automated File Organizer

import os
import shutil
from pathlib import Path
from datetime import datetime

def organize_files(source_dir, target_dir):
    extensions = {
        "documents": [".pdf", ".doc", ".docx", ".txt", ".xlsx", ".csv"],
        "images": [".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"],
        "logs": [".log", ".out", ".err"],
        "archives": [".zip", ".tar", ".gz", ".bz2", ".7z"],
        "scripts": [".sh", ".py", ".ps1", ".bat", ".rb"]
    }

    moved = 0
    for file in Path(source_dir).iterdir():
        if not file.is_file():
            continue

        category = "other"
        for cat, exts in extensions.items():
            if file.suffix.lower() in exts:
                category = cat
                break

        dest = Path(target_dir) / category
        dest.mkdir(parents=True, exist_ok=True)
        shutil.move(str(file), str(dest / file.name))
        moved += 1

    print(f"Organized {moved} files into {target_dir}")

organize_files("/home/user/Downloads", "/home/user/Organized")

Getting Started

Save any of these scripts as a .py file, make it executable, and add it to cron or a systemd timer for automated execution. Start with the scripts that address your most time-consuming tasks and customize them for your environment.

Recommended Reading

Level up your Python automation skills with these Dargslan guides:

Share this article:

Stay Updated

Subscribe to our newsletter for the latest tutorials, tips, and exclusive offers.