Python is the Swiss Army knife of DevOps. From log parsing to cloud orchestration, a well-crafted Python script can save hours of manual work. Here are 10 essential automation scripts that every DevOps engineer should have in their toolbox.
1. Log Parser & Analyzer
Parse nginx, Apache, or custom application logs to extract useful metrics:
#!/usr/bin/env python3
# log_analyzer.py — Parse and analyze server logs
import re
from collections import Counter
from datetime import datetime
def parse_nginx_log(logfile):
pattern = r'(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) \S+" (\d+) (\d+) "([^"]*)" "([^"]*)"'
status_counts = Counter()
ip_counts = Counter()
path_counts = Counter()
with open(logfile) as f:
for line in f:
match = re.match(pattern, line)
if match:
ip, _, method, path, status = match.group(1, 2, 3, 4, 5)
status_counts[status] += 1
ip_counts[ip] += 1
path_counts[path] += 1
return {
'statuses': status_counts.most_common(10),
'top_ips': ip_counts.most_common(10),
'top_paths': path_counts.most_common(10)
}
if __name__ == '__main__':
stats = parse_nginx_log('/var/log/nginx/access.log')
for key, val in stats.items():
print(f'\n=== {key.upper()} ===')
for item, count in val:
print(f' {item}: {count}')2. Server Health Monitor
Continuously monitor CPU, RAM, disk, and send alerts:
#!/usr/bin/env python3
import psutil
import smtplib
from email.message import EmailMessage
THRESHOLDS = {
'cpu': 80, # Alert at 80%+
'ram': 85, # Alert at 85%+
'disk': 90 # Alert at 90%+
}
def check_system():
alerts = []
cpu = psutil.cpu_percent(interval=1)
if cpu > THRESHOLDS['cpu']:
alerts.append(f'CPU at {cpu}%')
ram = psutil.virtual_memory().percent
if ram > THRESHOLDS['ram']:
alerts.append(f'RAM at {ram}%')
disk = psutil.disk_usage('/').percent
if disk > THRESHOLDS['disk']:
alerts.append(f'Disk at {disk}%')
return alerts
if __name__ == '__main__':
alerts = check_system()
if alerts:
print('ALERTS:', ', '.join(alerts))
# Send email or webhook here
else:
print('✓ System healthy')3. Automated Backup Script
Compress and upload backups to cloud storage with retention management:
#!/usr/bin/env python3
import os, shutil, datetime, glob
from pathlib import Path
BACKUP_DIRS = ['/var/www', '/etc/nginx', '/etc/letsencrypt']
BACKUP_DEST = '/backups'
RETENTION_DAYS = 30
def create_backup():
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = Path(BACKUP_DEST) / f'backup_{timestamp}.tar.gz'
# Create backup archive
shutil.make_archive(
str(backup_file.with_suffix('')),
'gztar',
root_dir='/',
base_dir='.',
logger=None # Add paths via--exclude logic
)
# Clean old backups
cutoff = datetime.datetime.now() - datetime.timedelta(days=RETENTION_DAYS)
for f in Path(BACKUP_DEST).glob('backup_*.tar.gz'):
ctime = datetime.datetime.fromtimestamp(f.stat().st_ctime)
if ctime < cutoff:
f.unlink()
print(f'Removed old backup: {f.name}')
print(f'Backup created: {backup_file}')
if __name__ == '__main__':
create_backup()4. CI/CD Pipeline Health Check
Query GitHub Actions API to report pipeline status across all repos:
#!/usr/bin/env python3
import requests, os, json
GITHUB_TOKEN = os.environ.get('GITHUB_TOKEN')
ORG = 'my-organization'
def get_pipeline_status():
headers = {'Authorization': f'Bearer {GITHUB_TOKEN}'}
# Get all repos in org
repos = requests.get(
f'https://api.github.com/orgs/{ORG}/repos',
headers=headers
).json()
status_report = {}
for repo in repos[:10]: # Limit to 10 repos
try:
runs = requests.get(
f'https://api.github.com/repos/{ORG}/{repo["name"]}/actions/runs?per_page=1',
headers=headers
).json()
if runs.get('workflow_runs'):
latest = runs['workflow_runs'][0]
status_report[repo['name']] = {
'status': latest['status'],
'conclusion': latest['conclusion'],
'url': latest['html_url']
}
except Exception as e:
status_report[repo['name']] = {'error': str(e)}
return status_report
if __name__ == '__main__':
report = get_pipeline_status()
print(json.dumps(report, indent=2))5. SSL Certificate Expiry Checker
#!/usr/bin/env python3
import ssl, socket, datetime
DOMAINS = ['pravidhisolutions.in', 'example.com']
def check_ssl(domain):
ctx = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=10) as sock:
with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
expiry = datetime.datetime.strptime(
cert['notAfter'], '%b %d %H:%M:%S %Y %Z'
)
remaining = (expiry - datetime.datetime.now()).days
return {'domain': domain, 'expires': str(expiry), 'days_left': remaining}
if __name__ == '__main__':
for domain in DOMAINS:
result = check_ssl(domain)
print(f'{result[\"domain\"]}: {result[\"days_left\"]} days remaining')6-10. More Essential Scripts
Here's a quick reference for more automation ideas:
| # | Script | Use Case |
|---|---|---|
| 6 | Docker Cleanup | Remove unused containers, images, and volumes |
| 7 | DNS Propagation Checker | Verify DNS records across global resolvers |
| 8 | Cloud Cost Analyzer | Fetch and summarize AWS/Azure billing |
| 9 | Webhook Forwarder | Receive and re-route webhooks between services |
| 10 | Incident Response Bot | Auto-create tickets from alerts via API |
Pro Tip: Run these scripts as cron jobs or systemd timers for continuous automation. Pair them with a monitoring tool like Prometheus for full observability.
Running Scripts in Production
For production deployments, consider:
- Use
systemd timersinstead of cron for better logging - Containerize scripts with Docker for consistency
- Add proper error handling and logging with Python's
loggingmodule - Store credentials in environment variables or a vault
- Test scripts in a staging environment first
Want more? Check out our free DevOps tools or custom development services.