Python JSON File Handling Examples for Developers: Complete Guide with Practical Code Samples
Introduction to Python JSON File Handling
JSON (JavaScript Object Notation) has become the de facto standard for data interchange in modern web development and API communication. For Python developers, mastering JSON file handling is essential for building robust applications that can efficiently process, store, and exchange data. This comprehensive guide provides practical examples and best practices for handling JSON files in Python, covering everything from basic operations to advanced techniques.
Understanding JSON Structure and Python Integration
JSON's lightweight, text-based format makes it ideal for data serialization and transmission. Python's built-in json module provides seamless integration for working with JSON data, allowing developers to easily convert between Python objects and JSON strings or files.
Basic JSON Structure Elements
JSON supports several data types that map directly to Python equivalents: - Objects (dictionaries in Python) - Arrays (lists in Python) - Strings - Numbers (integers and floats) - Booleans (True/False) - null (None in Python)
Essential Python JSON Module Methods
The Python json module offers four primary methods for JSON handling:
json.dumps() - Converting Python Objects to JSON Strings
`python
import json
Converting dictionary to JSON string
data = { "name": "John Doe", "age": 30, "city": "New York", "is_employee": True, "skills": ["Python", "JavaScript", "SQL"] }json_string = json.dumps(data, indent=4)
print(json_string)
`
json.loads() - Converting JSON Strings to Python Objects
`python
import json
json_string = '{"name": "Jane Smith", "age": 25, "city": "Los Angeles"}'
python_dict = json.loads(json_string)
print(python_dict["name"]) # Output: Jane Smith
`
json.dump() - Writing Python Objects to JSON Files
`python
import json
employee_data = { "employees": [ {"id": 1, "name": "Alice Johnson", "department": "Engineering"}, {"id": 2, "name": "Bob Wilson", "department": "Marketing"}, {"id": 3, "name": "Carol Davis", "department": "Sales"} ] }
with open('employees.json', 'w') as json_file:
json.dump(employee_data, json_file, indent=4)
`
json.load() - Reading JSON Files into Python Objects
`python
import json
with open('employees.json', 'r') as json_file:
data = json.load(json_file)
for employee in data['employees']:
print(f"Name: {employee['name']}, Department: {employee['department']}")
`
Reading JSON Files in Python: Comprehensive Examples
Basic JSON File Reading
`python
import json
import os
def read_json_file(filename): """ Read JSON file and return parsed data """ try: if os.path.exists(filename): with open(filename, 'r', encoding='utf-8') as file: return json.load(file) else: print(f"File {filename} not found") return None except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") return None except Exception as e: print(f"Error reading file: {e}") return None
Usage example
config_data = read_json_file('config.json') if config_data: print("Configuration loaded successfully")`Reading Large JSON Files Efficiently
`python
import json
from typing import Generator, Dict, Any
def read_large_json_file(filename: str) -> Generator[Dict[Any, Any], None, None]: """ Generator function for reading large JSON files line by line Useful for JSONL (JSON Lines) format """ with open(filename, 'r', encoding='utf-8') as file: for line in file: try: yield json.loads(line.strip()) except json.JSONDecodeError: continue
Example usage for processing large datasets
def process_large_dataset(filename: str): record_count = 0 for record in read_large_json_file(filename): # Process each record individually record_count += 1 if record_count % 1000 == 0: print(f"Processed {record_count} records")`Reading Nested JSON Structures
`python
import json
def extract_nested_data(json_file_path): """ Extract data from complex nested JSON structures """ with open(json_file_path, 'r') as file: data = json.load(file) # Example: Extracting user information from nested structure users = [] if 'response' in data and 'users' in data['response']: for user in data['response']['users']: user_info = { 'id': user.get('id'), 'name': user.get('profile', {}).get('name'), 'email': user.get('contact', {}).get('email'), 'address': { 'street': user.get('address', {}).get('street'), 'city': user.get('address', {}).get('city'), 'zipcode': user.get('address', {}).get('zipcode') } } users.append(user_info) return users
Example nested JSON structure
nested_json_example = { "response": { "status": "success", "users": [ { "id": 1, "profile": {"name": "John Doe", "age": 30}, "contact": {"email": "john@example.com", "phone": "123-456-7890"}, "address": {"street": "123 Main St", "city": "New York", "zipcode": "10001"} } ] } }`Writing JSON Files in Python: Best Practices and Examples
Basic JSON File Writing
`python
import json
from datetime import datetime
def write_json_file(data, filename, indent=4): """ Write Python data structure to JSON file """ try: with open(filename, 'w', encoding='utf-8') as file: json.dump(data, file, indent=indent, ensure_ascii=False) return True except Exception as e: print(f"Error writing JSON file: {e}") return False
Example usage
user_data = { "user_id": 12345, "username": "developer123", "profile": { "first_name": "Sarah", "last_name": "Connor", "email": "sarah.connor@example.com", "created_at": datetime.now().isoformat(), "preferences": { "theme": "dark", "language": "en-US", "notifications": True } } }success = write_json_file(user_data, 'user_profile.json')
`
Appending Data to JSON Files
`python
import json
import os
def append_to_json_file(new_data, filename, list_key=None): """ Append new data to existing JSON file """ # Read existing data if os.path.exists(filename): with open(filename, 'r', encoding='utf-8') as file: try: existing_data = json.load(file) except json.JSONDecodeError: existing_data = {} else: existing_data = {} # Append new data if list_key: if list_key not in existing_data: existing_data[list_key] = [] existing_data[list_key].append(new_data) else: existing_data.update(new_data) # Write back to file with open(filename, 'w', encoding='utf-8') as file: json.dump(existing_data, file, indent=4, ensure_ascii=False)
Example: Adding new log entries
log_entry = { "timestamp": datetime.now().isoformat(), "level": "INFO", "message": "User logged in successfully", "user_id": 12345 }append_to_json_file(log_entry, 'application_logs.json', 'logs')
`
Writing Pretty-Formatted JSON Files
`python
import json
def write_pretty_json(data, filename): """ Write JSON file with custom formatting for better readability """ with open(filename, 'w', encoding='utf-8') as file: json.dump( data, file, indent=4, separators=(',', ': '), sort_keys=True, ensure_ascii=False )
Example with complex data structure
complex_data = { "api_config": { "base_url": "https://api.example.com/v1", "endpoints": { "users": "/users", "posts": "/posts", "comments": "/comments" }, "authentication": { "type": "bearer_token", "token_expiry": 3600 } }, "database_config": { "host": "localhost", "port": 5432, "database": "myapp", "ssl_enabled": True } }write_pretty_json(complex_data, 'application_config.json')
`
Advanced JSON File Operations
JSON Schema Validation
`python
import json
import jsonschema
from jsonschema import validate
def validate_json_data(json_data, schema): """ Validate JSON data against a schema """ try: validate(instance=json_data, schema=schema) return True, "Valid" except jsonschema.exceptions.ValidationError as e: return False, str(e)
Example schema
user_schema = { "type": "object", "properties": { "name": {"type": "string", "minLength": 1}, "age": {"type": "number", "minimum": 0}, "email": {"type": "string", "format": "email"}, "skills": { "type": "array", "items": {"type": "string"} } }, "required": ["name", "age", "email"] }Validate user data
user_data = { "name": "John Doe", "age": 30, "email": "john@example.com", "skills": ["Python", "JavaScript"] }is_valid, message = validate_json_data(user_data, user_schema)
print(f"Validation result: {is_valid}, Message: {message}")
`
JSON File Backup and Versioning
`python
import json
import shutil
import os
from datetime import datetime
class JSONFileManager: def __init__(self, filename): self.filename = filename self.backup_dir = 'backups' def create_backup(self): """Create a timestamped backup of the JSON file""" if not os.path.exists(self.backup_dir): os.makedirs(self.backup_dir) if os.path.exists(self.filename): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"{self.backup_dir}/{os.path.splitext(self.filename)[0]}_{timestamp}.json" shutil.copy2(self.filename, backup_filename) return backup_filename return None def safe_write(self, data): """Write JSON data with automatic backup""" backup_file = self.create_backup() try: with open(self.filename, 'w', encoding='utf-8') as file: json.dump(data, file, indent=4, ensure_ascii=False) return True, f"Data written successfully. Backup: {backup_file}" except Exception as e: return False, f"Error writing file: {e}" def read_with_fallback(self): """Read JSON file with fallback to latest backup""" try: with open(self.filename, 'r', encoding='utf-8') as file: return json.load(file) except Exception as e: print(f"Error reading main file: {e}") # Try to read from latest backup return self._read_latest_backup() def _read_latest_backup(self): """Read from the most recent backup file""" if not os.path.exists(self.backup_dir): return None backup_files = [f for f in os.listdir(self.backup_dir) if f.endswith('.json')] if not backup_files: return None latest_backup = max(backup_files, key=lambda x: os.path.getctime(os.path.join(self.backup_dir, x))) try: with open(os.path.join(self.backup_dir, latest_backup), 'r', encoding='utf-8') as file: return json.load(file) except Exception as e: print(f"Error reading backup file: {e}") return None
Usage example
json_manager = JSONFileManager('important_data.json') sample_data = {"version": "1.0", "data": [1, 2, 3, 4, 5]} success, message = json_manager.safe_write(sample_data) print(message)`Error Handling in JSON File Operations
Comprehensive Error Handling
`python
import json
import logging
from typing import Optional, Union, Dict, Any
Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__)class JSONFileHandler: """ Robust JSON file handler with comprehensive error handling """ @staticmethod def read_json_file(filename: str) -> Optional[Dict[Any, Any]]: """ Read JSON file with comprehensive error handling """ try: with open(filename, 'r', encoding='utf-8') as file: data = json.load(file) logger.info(f"Successfully read JSON file: {filename}") return data except FileNotFoundError: logger.error(f"File not found: {filename}") return None except PermissionError: logger.error(f"Permission denied accessing file: {filename}") return None except json.JSONDecodeError as e: logger.error(f"Invalid JSON format in {filename}: {e}") return None except UnicodeDecodeError as e: logger.error(f"Encoding error reading {filename}: {e}") return None except Exception as e: logger.error(f"Unexpected error reading {filename}: {e}") return None @staticmethod def write_json_file(data: Dict[Any, Any], filename: str, kwargs) -> bool: """ Write JSON file with comprehensive error handling """ try: with open(filename, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, kwargs) logger.info(f"Successfully wrote JSON file: {filename}") return True except PermissionError: logger.error(f"Permission denied writing to file: {filename}") return False except TypeError as e: logger.error(f"Data serialization error for {filename}: {e}") return False except OSError as e: logger.error(f"OS error writing {filename}: {e}") return False except Exception as e: logger.error(f"Unexpected error writing {filename}: {e}") return False @staticmethod def validate_and_read(filename: str, required_keys: list = None) -> Optional[Dict[Any, Any]]: """ Read and validate JSON file structure """ data = JSONFileHandler.read_json_file(filename) if data is None: return None if required_keys: missing_keys = [key for key in required_keys if key not in data] if missing_keys: logger.error(f"Missing required keys in {filename}: {missing_keys}") return None return data
Usage examples
handler = JSONFileHandler()Reading with validation
config_data = handler.validate_and_read('config.json', ['database', 'api_key', 'debug_mode'])Safe writing
user_preferences = { "theme": "dark", "language": "en-US", "auto_save": True }success = handler.write_json_file(user_preferences, 'user_preferences.json', indent=4)
`
Working with JSON APIs and HTTP Requests
Fetching and Processing JSON from APIs
`python
import json
import requests
from typing import Optional, Dict, Any
class JSONAPIHandler: """ Handle JSON data from API endpoints """ def __init__(self, base_url: str, headers: Dict[str, str] = None): self.base_url = base_url self.headers = headers or {} self.session = requests.Session() self.session.headers.update(self.headers) def fetch_json_data(self, endpoint: str, params: Dict[str, Any] = None) -> Optional[Dict[Any, Any]]: """ Fetch JSON data from API endpoint """ try: url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" response = self.session.get(url, params=params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Request error: {e}") return None except json.JSONDecodeError as e: logger.error(f"JSON decode error: {e}") return None def post_json_data(self, endpoint: str, data: Dict[Any, Any]) -> Optional[Dict[Any, Any]]: """ Send JSON data to API endpoint """ try: url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" response = self.session.post(url, json=data) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Request error: {e}") return None def save_api_response(self, endpoint: str, filename: str, params: Dict[str, Any] = None) -> bool: """ Fetch API data and save to JSON file """ data = self.fetch_json_data(endpoint, params) if data: return JSONFileHandler.write_json_file(data, filename, indent=4) return False
Example usage
api_handler = JSONAPIHandler( base_url="https://jsonplaceholder.typicode.com", headers={"User-Agent": "Python JSON Handler"} )Fetch and save user data
users_data = api_handler.fetch_json_data("users") if users_data: JSONFileHandler.write_json_file(users_data, "users_data.json", indent=4)Fetch posts for specific user
posts_data = api_handler.fetch_json_data("posts", params={"userId": 1})`JSON Performance Optimization Techniques
Streaming JSON Processing for Large Files
`python
import json
import ijson
from typing import Iterator, Dict, Any
class JSONStreamProcessor: """ Process large JSON files using streaming techniques """ @staticmethod def stream_json_array(filename: str, array_key: str = None) -> Iterator[Dict[Any, Any]]: """ Stream process JSON array elements """ with open(filename, 'rb') as file: if array_key: parser = ijson.items(file, f'{array_key}.item') else: parser = ijson.items(file, 'item') for item in parser: yield item @staticmethod def filter_and_save(input_filename: str, output_filename: str, filter_func: callable, array_key: str = None): """ Filter large JSON file and save filtered results """ filtered_items = [] for item in JSONStreamProcessor.stream_json_array(input_filename, array_key): if filter_func(item): filtered_items.append(item) with open(output_filename, 'w', encoding='utf-8') as file: json.dump(filtered_items, file, indent=4, ensure_ascii=False) return len(filtered_items) @staticmethod def aggregate_data(filename: str, aggregate_func: callable, array_key: str = None): """ Aggregate data from large JSON file """ result = {} for item in JSONStreamProcessor.stream_json_array(filename, array_key): result = aggregate_func(result, item) return result
Example usage
def filter_active_users(user): """Filter function for active users""" return user.get('status') == 'active'def aggregate_by_department(result, employee): """Aggregate employees by department""" dept = employee.get('department', 'Unknown') if dept not in result: result[dept] = 0 result[dept] += 1 return result
Process large employee dataset
processor = JSONStreamProcessor()Filter active users
active_count = processor.filter_and_save( 'large_employee_data.json', 'active_employees.json', filter_active_users, 'employees' )Aggregate by department
department_stats = processor.aggregate_data( 'large_employee_data.json', aggregate_by_department, 'employees' )`Memory-Efficient JSON Operations
`python
import json
import gc
from contextlib import contextmanager
@contextmanager def memory_efficient_json_processing(): """ Context manager for memory-efficient JSON processing """ try: yield finally: gc.collect() # Force garbage collection
def process_large_json_in_chunks(filename: str, chunk_size: int = 1000): """ Process large JSON files in chunks to manage memory usage """ with memory_efficient_json_processing(): with open(filename, 'r', encoding='utf-8') as file: data = json.load(file) if isinstance(data, list): for i in range(0, len(data), chunk_size): chunk = data[i:i + chunk_size] yield chunk del chunk # Explicitly delete chunk to free memory else: yield data
Example: Processing large dataset in chunks
def analyze_large_dataset(filename: str): """ Analyze large JSON dataset using chunked processing """ total_records = 0 statistics = {'total': 0, 'categories': {}} for chunk in process_large_json_in_chunks(filename, chunk_size=500): for record in chunk: total_records += 1 category = record.get('category', 'Unknown') if category not in statistics['categories']: statistics['categories'][category] = 0 statistics['categories'][category] += 1 statistics['total'] = total_records return statistics`JSON Configuration Management
Configuration File Handler
`python
import json
import os
from typing import Any, Dict, Optional
from pathlib import Path
class ConfigManager: """ Manage application configuration using JSON files """ def __init__(self, config_file: str = 'config.json', default_config: Dict[str, Any] = None): self.config_file = Path(config_file) self.default_config = default_config or {} self._config = {} self.load_config() def load_config(self) -> bool: """ Load configuration from JSON file """ try: if self.config_file.exists(): with open(self.config_file, 'r', encoding='utf-8') as file: self._config = json.load(file) logger.info(f"Configuration loaded from {self.config_file}") else: self._config = self.default_config.copy() self.save_config() logger.info(f"Created default configuration file: {self.config_file}") return True except Exception as e: logger.error(f"Error loading configuration: {e}") self._config = self.default_config.copy() return False def save_config(self) -> bool: """ Save current configuration to JSON file """ try: # Create directory if it doesn't exist self.config_file.parent.mkdir(parents=True, exist_ok=True) with open(self.config_file, 'w', encoding='utf-8') as file: json.dump(self._config, file, indent=4, ensure_ascii=False) logger.info(f"Configuration saved to {self.config_file}") return True except Exception as e: logger.error(f"Error saving configuration: {e}") return False def get(self, key: str, default: Any = None) -> Any: """ Get configuration value with dot notation support """ keys = key.split('.') value = self._config for k in keys: if isinstance(value, dict) and k in value: value = value[k] else: return default return value def set(self, key: str, value: Any) -> bool: """ Set configuration value with dot notation support """ keys = key.split('.') config = self._config # Navigate to the parent of the target key for k in keys[:-1]: if k not in config or not isinstance(config[k], dict): config[k] = {} config = config[k] # Set the value config[keys[-1]] = value return self.save_config() def update(self, updates: Dict[str, Any]) -> bool: """ Update multiple configuration values """ def deep_update(base_dict, update_dict): for key, value in update_dict.items(): if isinstance(value, dict) and key in base_dict and isinstance(base_dict[key], dict): deep_update(base_dict[key], value) else: base_dict[key] = value deep_update(self._config, updates) return self.save_config() def reset_to_default(self) -> bool: """ Reset configuration to default values """ self._config = self.default_config.copy() return self.save_config()
Example usage
default_app_config = { "app": { "name": "MyApplication", "version": "1.0.0", "debug": False }, "database": { "host": "localhost", "port": 5432, "name": "myapp_db" }, "api": { "base_url": "https://api.example.com", "timeout": 30, "retries": 3 } }config = ConfigManager('app_config.json', default_app_config)
Get configuration values
app_name = config.get('app.name') db_host = config.get('database.host') api_timeout = config.get('api.timeout', 60) # Default fallbackSet configuration values
config.set('app.debug', True) config.set('database.port', 5433)Update multiple values
config.update({ 'api': { 'timeout': 45, 'retries': 5 }, 'logging': { 'level': 'INFO', 'file': 'app.log' } })`JSON Data Transformation and Manipulation
Data Transformation Utilities
`python
import json
from typing import Any, Dict, List, Callable
from datetime import datetime
import re
class JSONTransformer: """ Transform and manipulate JSON data structures """ @staticmethod def flatten_json(data: Dict[str, Any], separator: str = '.') -> Dict[str, Any]: """ Flatten nested JSON structure """ def _flatten(obj, parent_key=''): items = [] if isinstance(obj, dict): for key, value in obj.items(): new_key = f"{parent_key}{separator}{key}" if parent_key else key items.extend(_flatten(value, new_key).items()) elif isinstance(obj, list): for i, value in enumerate(obj): new_key = f"{parent_key}{separator}{i}" if parent_key else str(i) items.extend(_flatten(value, new_key).items()) else: return {parent_key: obj} return dict(items) return _flatten(data) @staticmethod def unflatten_json(data: Dict[str, Any], separator: str = '.') -> Dict[str, Any]: """ Unflatten a flattened JSON structure """ result = {} for key, value in data.items(): keys = key.split(separator) current = result for k in keys[:-1]: if k.isdigit(): k = int(k) if not isinstance(current, list): current = [] while len(current) <= k: current.append({}) current = current[k] else: if k not in current: current[k] = {} current = current[k] final_key = keys[-1] if final_key.isdigit(): final_key = int(final_key) if not isinstance(current, list): current = [] while len(current) <= final_key: current.append(None) current[final_key] = value else: current[final_key] = value return result @staticmethod def filter_json(data: Dict[str, Any], filter_func: Callable) -> Dict[str, Any]: """ Filter JSON data based on custom function """ if isinstance(data, dict): return {k: JSONTransformer.filter_json(v, filter_func) for k, v in data.items() if filter_func(k, v)} elif isinstance(data, list): return [JSONTransformer.filter_json(item, filter_func) for item in data if filter_func(None, item)] else: return data @staticmethod def transform_values(data: Dict[str, Any], transform_func: Callable) -> Dict[str, Any]: """ Transform values in JSON structure """ if isinstance(data, dict): return {k: JSONTransformer.transform_values(v, transform_func) for k, v in data.items()} elif isinstance(data, list): return [JSONTransformer.transform_values(item, transform_func) for item in data] else: return transform_func(data) @staticmethod def merge_json(base: Dict[str, Any], *others: Dict[str, Any]) -> Dict[str, Any]: """ Deep merge multiple JSON objects """ def deep_merge(base_dict, merge_dict): for key, value in merge_dict.items(): if (key in base_dict and isinstance(base_dict[key], dict) and isinstance(value, dict)): deep_merge(base_dict[key], value) else: base_dict[key] = value result = base.copy() for other in others: deep_merge(result, other) return result
Example transformations
sample_data = { "user": { "name": "John Doe", "age": 30, "contacts": { "email": "john@example.com", "phone": "123-456-7890" } }, "orders": [ {"id": 1, "amount": 100.50, "date": "2024-01-15"}, {"id": 2, "amount": 75.25, "date": "2024-01-20"} ] }transformer = JSONTransformer()
Flatten the structure
flattened = transformer.flatten_json(sample_data) print("Flattened:", json.dumps(flattened, indent=2))Transform string values to uppercase
def uppercase_strings(value): return value.upper() if isinstance(value, str) else valuetransformed = transformer.transform_values(sample_data, uppercase_strings)
Filter out sensitive information
def filter_sensitive(key, value): sensitive_keys = ['phone', 'email'] return key not in sensitive_keys if key else Truefiltered = transformer.filter_json(sample_data, filter_sensitive)
`
JSON File Monitoring and Synchronization
File Monitoring System
`python
import json
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from typing import Dict, Any, Callable
import threading
class JSONFileMonitor(FileSystemEventHandler): """ Monitor JSON files for changes and trigger callbacks """ def __init__(self, callback: Callable = None): self.callback = callback self.last_modified = {} def on_modified(self, event): if not event.is_directory and event.src_path.endswith('.json'): # Debounce rapid file changes current_time = time.time() if (event.src_path not in self.last_modified or current_time - self.last_modified[event.src_path] > 1): self.last_modified[event.src_path] = current_time if self.callback: self.callback(event.src_path, 'modified')
class JSONSyncManager: """ Synchronize JSON files across different locations """ def __init__(self): self.sync_pairs = [] self.observers = [] def add_sync_pair(self, source: str, target: str): """ Add a file synchronization pair """ self.sync_pairs.append((source, target)) def sync_file(self, source: str, target: str): """ Synchronize a single file pair """ try: if os.path.exists(source): with open(source, 'r', encoding='utf-8') as src_file: data = json.load(src_file) with open(target, 'w', encoding='utf-8') as tgt_file: json.dump(data, tgt_file, indent=4, ensure_ascii=False) logger.info(f"Synchronized {source} -> {target}") return True except Exception as e: logger.error(f"Sync error {source} -> {target}: {e}") return False def start_monitoring(self): """ Start monitoring all source files for changes """ def sync_callback(file_path, event_type): for source, target in self.sync_pairs: if os.path.abspath(file_path) == os.path.abspath(source): self.sync_file(source, target) break for source, target in self.sync_pairs: source_dir = os.path.dirname(source) if os.path.exists(source_dir): observer = Observer() handler = JSONFileMonitor(sync_callback) observer.schedule(handler, source_dir, recursive=False) observer.start() self.observers.append(observer) logger.info(f"Started monitoring {len(self.sync_pairs)} file pairs") def stop_monitoring(self): """ Stop all file monitoring """ for observer in self.observers: observer.stop() observer.join() self.observers.clear() logger.info("Stopped file monitoring")
Example usage
sync_manager = JSONSyncManager() sync_manager.add_sync_pair('config.json', 'backup/config.json') sync_manager.add_sync_pair('data.json', 'mirror/data.json')Start monitoring (runs in background)
sync_manager.start_monitoring()Keep the script running
try: while True: time.sleep(1) except KeyboardInterrupt: sync_manager.stop_monitoring()`Testing JSON File Operations
Unit Testing Framework
`python
import json
import unittest
import tempfile
import os
from unittest.mock import patch, mock_open
class TestJSONFileOperations(unittest.TestCase): """ Unit tests for JSON file operations """ def setUp(self): """Set up test fixtures""" self.test_data = { "name": "Test User", "age": 25, "skills": ["Python", "JSON", "Testing"] } self.temp_dir = tempfile.mkdtemp() self.test_file = os.path.join(self.temp_dir, 'test.json') def tearDown(self): """Clean up test fixtures""" if os.path.exists(self.test_file): os.remove(self.test_file) os.rmdir(self.temp_dir) def test_write_json_file(self): """Test writing JSON data to file""" success = JSONFileHandler.write_json_file(self.test_data, self.test_file) self.assertTrue(success) self.assertTrue(os.path.exists(self.test_file)) # Verify file contents with open(self.test_file, 'r') as file: loaded_data = json.load(file) self.assertEqual(loaded_data, self.test_data) def test_read_json_file(self): """Test reading JSON data from file""" # First write test data with open(self.test_file, 'w') as file: json.dump(self.test_data, file) # Then read it back loaded_data = JSONFileHandler.read_json_file(self.test_file) self.assertIsNotNone(loaded_data) self.assertEqual(loaded_data, self.test_data) def test_read_nonexistent_file(self): """Test reading from nonexistent file""" result = JSONFileHandler.read_json_file('nonexistent.json') self.assertIsNone(result) def test_read_invalid_json(self): """Test reading invalid JSON content""" with open(self.test_file, 'w') as file: file.write('{"invalid": json content}') result = JSONFileHandler.read_json_file(self.test_file) self.assertIsNone(result) def test_config_manager(self): """Test configuration manager functionality""" default_config = {"debug": False, "timeout": 30} config = ConfigManager(self.test_file, default_config) # Test default config creation self.assertEqual(config.get('debug'), False) self.assertEqual(config.get('timeout'), 30) # Test setting values config.set('debug', True) self.assertEqual(config.get('debug'), True) # Test nested key setting config.set('database.host', 'localhost') self.assertEqual(config.get('database.host'), 'localhost') @patch('builtins.open', mock_open(read_data='{"test": "data"}')) def test_mocked_file_read(self): """Test with mocked file operations""" result = JSONFileHandler.read_json_file('mocked_file.json') self.assertEqual(result, {"test": "data"})
if __name__ == '__main__':
# Run specific test methods
unittest.main(verbosity=2)
`
Best Practices and Security Considerations
Security Best Practices
`python
import json
import hashlib
import hmac
from cryptography.fernet import Fernet
from typing import Dict, Any, Optional
class SecureJSONHandler: """ Secure JSON file handling with encryption and integrity checks """ def __init__(self, encryption_key: bytes = None): self.encryption_key = encryption_key or Fernet.generate_key() self.cipher = Fernet(self.encryption_key) def write_encrypted_json(self, data: Dict[str, Any], filename: str) -> bool: """ Write JSON data with encryption """ try: json_string = json.dumps(data, ensure_ascii=False) encrypted_data = self.cipher.encrypt(json_string.encode('utf-8')) with open(filename, 'wb') as file: file.write(encrypted_data) return True except Exception as e: logger.error(f"Error writing encrypted JSON: {e}") return False def read_encrypted_json(self, filename: str) -> Optional[Dict[str, Any]]: """ Read and decrypt JSON data """ try: with open(filename, 'rb') as file: encrypted_data = file.read() decrypted_data = self.cipher.decrypt(encrypted_data) json_string = decrypted_data.decode('utf-8') return json.loads(json_string) except Exception as e: logger.error(f"Error reading encrypted JSON: {e}") return None def write_with_integrity_check(self, data: Dict[str, Any], filename: str, secret_key: str) -> bool: """ Write JSON with HMAC integrity check """ try: json_string = json.dumps(data, ensure_ascii=False, sort_keys=True) # Generate HMAC hmac_hash = hmac.new( secret_key.encode('utf-8'), json_string.encode('utf-8'), hashlib.sha256 ).hexdigest() # Combine data with HMAC secure_data = { 'data': data, 'hmac': hmac_hash } with open(filename, 'w', encoding='utf-8') as file: json.dump(secure_data, file, indent=4, ensure_ascii=False) return True except Exception as e: logger.error(f"Error writing JSON with integrity check: {e}") return False def read_with_integrity_check(self, filename: str, secret_key: str) -> Optional[Dict[str, Any]]: """ Read JSON and verify integrity """ try: with open(filename, 'r', encoding='utf-8') as file: secure_data = json.load(file) if 'data' not in secure_data or 'hmac' not in secure_data: logger.error("Invalid secure JSON format") return None # Verify HMAC json_string = json.dumps(secure_data['data'], ensure_ascii=False, sort_keys=True) expected_hmac = hmac.new( secret_key.encode('utf-8'), json_string.encode('utf-8'), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(expected_hmac, secure_data['hmac']): logger.error("HMAC verification failed - data may be tampered") return None return secure_data['data'] except Exception as e: logger.error(f"Error reading JSON with integrity check: {e}") return None
Input validation and sanitization
class JSONValidator: """ Validate and sanitize JSON input data """ @staticmethod def sanitize_json_keys(data: Dict[str, Any]) -> Dict[str, Any]: """ Sanitize JSON keys to prevent injection attacks """ import re def clean_key(key: str) -> str: # Remove or replace potentially dangerous characters return re.sub(r'[^\w\-_.]', '', str(key)) if isinstance(data, dict): return {clean_key(k): JSONValidator.sanitize_json_keys(v) for k, v in data.items()} elif isinstance(data, list): return [JSONValidator.sanitize_json_keys(item) for item in data] else: return data @staticmethod def validate_json_size(data: Dict[str, Any], max_size_mb: int = 10) -> bool: """ Validate JSON data size to prevent DoS attacks """ json_string = json.dumps(data) size_mb = len(json_string.encode('utf-8')) / (1024 * 1024) return size_mb <= max_size_mb @staticmethod def validate_json_depth(data: Any, max_depth: int = 10, current_depth: int = 0) -> bool: """ Validate JSON nesting depth to prevent stack overflow """ if current_depth > max_depth: return False if isinstance(data, dict): return all(JSONValidator.validate_json_depth(v, max_depth, current_depth + 1) for v in data.values()) elif isinstance(data, list): return all(JSONValidator.validate_json_depth(item, max_depth, current_depth + 1) for item in data) else: return TrueExample usage of secure JSON handling
secure_handler = SecureJSONHandler() validator = JSONValidator()Secure data handling
sensitive_data = { "user_id": 12345, "api_key": "secret_api_key_here", "permissions": ["read", "write", "admin"] }Validate before processing
if (validator.validate_json_size(sensitive_data) and validator.validate_json_depth(sensitive_data)): # Sanitize keys clean_data = validator.sanitize_json_keys(sensitive_data) # Write with encryption secure_handler.write_encrypted_json(clean_data, 'secure_data.enc') # Write with integrity check secret_key = "your_secret_key_here" secure_handler.write_with_integrity_check(clean_data, 'secure_data.json', secret_key)`Conclusion
This comprehensive guide has covered the essential aspects of Python JSON file handling, from basic operations to advanced security considerations. By implementing these patterns and best practices, developers can build robust applications that efficiently handle JSON data while maintaining security and performance standards.
Key takeaways include:
1. Error Handling: Always implement comprehensive error handling for file operations 2. Performance: Use streaming techniques for large files and memory management 3. Security: Validate input data and consider encryption for sensitive information 4. Configuration Management: Implement robust configuration systems with fallback mechanisms 5. Testing: Write thorough unit tests for all JSON operations 6. Monitoring: Implement file monitoring for real-time synchronization needs
The examples provided serve as a foundation for building more complex JSON handling systems tailored to specific application requirements. Remember to adapt these patterns to your specific use case and always consider the security implications of handling JSON data in production environments.
Whether you're building APIs, configuration systems, or data processing pipelines, these JSON file handling techniques will help you create more reliable and maintainable Python applications.