Python: From Scripting to AI/ML Engineering
A comprehensive technical guide covering Python 3.12+ features, automation scripting, FastAPI and Flask web frameworks, data science with pandas, AI/ML with PyTorch and LoRA fine-tuning, async programming, testing, web scraping, and modern package management.
Table of Contents
- Python 3.12+ Features
- Automation and Scripting
- FastAPI and Flask Web Frameworks
- Data Science (pandas, numpy, matplotlib)
- AI/ML (PyTorch, Unsloth, LoRA/QLoRA, Hugging Face)
- Async Programming (asyncio, aiohttp)
- Testing (pytest, coverage)
- Web Scraping (BeautifulSoup, Scrapy)
- Package Management (pip, poetry, uv, venv)
- Type Hints (mypy, pyright)
- Python 3.13 and 3.14: Free-Threading and T-Strings
1. Python 3.12-3.14 Features
Modern Syntax and Performance
Python 3.12 introduced improved error messages, per-interpreter GIL (experimental), type statement for type aliases, and f-string improvements. Python 3.13 adds a JIT compiler (experimental) and further free-threading work. These releases focus on performance and developer experience.
# Python 3.12+: type statement for cleaner type aliases
from collections.abc import Sequence, Callable
type Vector = list[float]
type Matrix = list[Vector]
type Transformer[T] = Callable[[T], T]
type JSONValue = str | int | float | bool | None | list["JSONValue"] | dict[str, "JSONValue"]
# Improved f-strings (3.12): any expression allowed, including quotes
data = {"name": "Jose", "weight_kg": 78.5}
msg = f"User {data['name']} weighs {data['weight_kg']:.1f}kg"
# Structural pattern matching (3.10+)
def process_event(event: dict) -> str:
match event:
case {"type": "weight", "value": float(v)} if v > 0:
return f"Weight: {v}kg"
case {"type": "body_fat", "value": float(v)}:
return f"Body fat: {v}%"
case {"type": str(t), **rest}:
return f"Unknown metric: {t}"
case _:
return "Invalid event"
# Exception groups and except* (3.11+)
async def fetch_all(urls: list[str]) -> list[str]:
results: list[str] = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(url)) for url in urls]
except* ConnectionError as eg:
for exc in eg.exceptions:
logger.warning(f"Connection failed: {exc}")
except* TimeoutError as eg:
for exc in eg.exceptions:
logger.warning(f"Timeout: {exc}")
return [t.result() for t in tasks if not t.cancelled()]
2. Automation and Scripting
System Automation and Daemons
Python excels at system automation: file processing, API integration, data pipelines, and long-running daemons. The standard library provides subprocess, pathlib, asyncio, and json for most tasks.
#!/usr/bin/env python3
"""Xiaomi health data export daemon.
Automates data export from Xiaomi Health app via headless AVD,
processes exported body composition data, and syncs to the
health dashboard API.
"""
import asyncio
import subprocess
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
AVD_NAME = "xiaomi_health"
EXPORT_PATH = Path("/tmp/xiaomi_export/health_data.json")
async def trigger_export() -> dict:
"""Use ADB + uiautomator to export data from Xiaomi Health app."""
# Wake the headless AVD and launch the app
subprocess.run(["adb", "shell", "input", "keyevent", "KEYCODE_WAKEUP"])
subprocess.run(["adb", "shell", "am", "start", "-n",
"com.mi.health/.MainAct"])
await asyncio.sleep(3)
# Navigate to export via uiautomator
subprocess.run(["adb", "shell", "uiautomator", "dump"])
subprocess.run(["adb", "shell", "input", "tap", "540", "1200"])
await asyncio.sleep(2)
# Pull exported data
subprocess.run(["adb", "pull", "/sdcard/health_export.json",
str(EXPORT_PATH)])
with open(EXPORT_PATH) as f:
return json.load(f)
async def sync_loop(interval: int = 3600) -> None:
"""Main daemon loop: export data, process, sync to API."""
while True:
try:
data = await trigger_export()
processed = process_body_composition(data)
await push_to_api(processed)
logger.info(f"Synced: {processed['weight_kg']}kg")
except Exception as e:
logger.error(f"Sync failed: {e}")
await asyncio.sleep(interval)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(sync_loop())
CLI Tools with Click, Typer, and Rich
Click provides decorator-based CLI argument parsing. Typer builds on Click with type-hint-driven interfaces. Rich adds formatted output with tables, progress bars, and syntax highlighting. Together they make professional-grade command-line tools.
import click
from rich.console import Console
from rich.table import Table
from rich.progress import track
console = Console()
@click.group()
@click.option("--verbose", "-v", is_flag=True)
@click.pass_context
def cli(ctx, verbose):
ctx.ensure_object(dict)
ctx.obj["verbose"] = verbose
@cli.command()
@click.argument("directory", type=click.Path(exists=True))
@click.option("--output-format", "-f", type=click.Choice(["json", "csv"]), default="json")
def process(directory, output_format):
"""Process all data files in DIRECTORY."""
files = list(Path(directory).glob("*.dat"))
table = Table(title=f"Processing {len(files)} files")
table.add_column("File", style="cyan")
table.add_column("Records", justify="right")
table.add_column("Status", style="green")
for f in track(files, description="Processing..."):
records = parse_data_file(f)
table.add_row(f.name, str(len(records)), "OK")
console.print(table)
# Typer: type-hint-driven CLI (built on Click)
import typer
from pathlib import Path
from enum import Enum
app = typer.Typer(help="Health data CLI tool.")
class OutputFormat(str, Enum):
json = "json"
csv = "csv"
@app.command()
def export(
directory: Path = typer.Argument(..., exists=True, help="Data directory"),
output_format: OutputFormat = typer.Option(OutputFormat.json, "--format", "-f"),
days: int = typer.Option(30, "--days", "-d", min=1, max=365),
):
"""Export health metrics from DIRECTORY."""
console.print(f"[bold]Exporting {days} days in {output_format.value}[/bold]")
data = load_metrics(directory, days=days)
write_output(data, fmt=output_format.value)
if __name__ == "__main__":
app()
3. FastAPI and Flask Web Frameworks
FastAPI: Modern Async API Framework
FastAPI (v0.135.3 as of April 2026) leverages Python type hints for automatic request validation, serialization, and OpenAPI documentation. Built on Starlette and Pydantic v2, it supports dependency injection, async endpoints, and generates interactive API docs out of the box. Requires Python 3.10+. Latest features include streaming JSON Lines and strict content-type checking.
from fastapi import FastAPI, Depends, HTTPException, status
from pydantic import BaseModel, Field
from datetime import datetime, timedelta, timezone
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI(title="Health API", version="2.0.0")
class BodyMetrics(BaseModel):
weight_kg: float = Field(..., gt=0, lt=500, description="Weight in kilograms")
body_fat_pct: Optional[float] = Field(None, ge=0, le=100)
muscle_mass_kg: Optional[float] = Field(None, gt=0)
recorded_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
model_config = {
"json_schema_extra": {
"example": {"weight_kg": 78.5, "body_fat_pct": 15.2, "muscle_mass_kg": 35.1}
}
}
class MetricsResponse(BaseModel):
id: int
metrics: BodyMetrics
trend: str # "up", "down", "stable"
# Dependency injection: reusable auth + DB session
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
payload = decode_jwt(token)
user = await user_repo.get(payload["sub"])
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
return user
@app.post("/api/metrics", response_model=MetricsResponse, status_code=status.HTTP_201_CREATED)
async def create_metrics(
metrics: BodyMetrics,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Record new body composition metrics."""
record = MetricsRecord(user_id=user.id, **metrics.model_dump())
db.add(record)
await db.commit()
await db.refresh(record)
trend = await calculate_trend(db, user.id)
return MetricsResponse(id=record.id, metrics=metrics, trend=trend)
@app.get("/api/metrics", response_model=list[MetricsResponse])
async def list_metrics(
days: int = 30,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""List metrics for the last N days."""
since = datetime.now(timezone.utc) - timedelta(days=days)
result = await db.execute(
select(MetricsRecord)
.where(MetricsRecord.user_id == user.id, MetricsRecord.recorded_at >= since)
.order_by(MetricsRecord.recorded_at.desc())
)
return result.scalars().all()
Flask: Lightweight and Flexible
Flask is a micro-framework with a simple core and rich ecosystem of extensions. It is ideal for smaller APIs, webhooks, and services where full async support is not required.
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from functools import wraps
import jwt
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///health.db"
db = SQLAlchemy(app)
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get("Authorization", "").replace("Bearer ", "")
try:
payload = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
request.user_id = payload["user_id"]
except jwt.InvalidTokenError:
return jsonify({"error": "Invalid token"}), 401
return f(*args, **kwargs)
return decorated
@app.route("/api/webhook/scale", methods=["POST"])
@require_auth
def scale_webhook():
"""Receive scale data from IoT device."""
data = request.get_json()
record = Measurement(user_id=request.user_id, **data)
db.session.add(record)
db.session.commit()
return jsonify({"id": record.id, "status": "recorded"}), 201
4. Data Science (pandas, numpy, matplotlib)
pandas Data Analysis
pandas provides high-performance DataFrames for data manipulation, cleaning, aggregation, and analysis. Combined with numpy for numerical operations, it forms the foundation of Python data science.
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Load and clean health data
df = pd.read_csv("health_metrics.csv", parse_dates=["recorded_at"])
df = df.dropna(subset=["weight_kg"])
df = df[df["weight_kg"].between(40, 200)] # remove outliers
# Rolling averages for trend analysis
df["weight_7d_avg"] = df["weight_kg"].rolling(window=7, min_periods=3).mean()
df["body_fat_7d_avg"] = df["body_fat_pct"].rolling(window=7, min_periods=3).mean()
# Weekly aggregation
weekly = df.resample("W", on="recorded_at").agg({
"weight_kg": ["mean", "min", "max", "std"],
"body_fat_pct": "mean",
"muscle_mass_kg": "mean",
})
# Correlation analysis
correlations = df[["weight_kg", "body_fat_pct", "muscle_mass_kg",
"sleep_hours", "calories_in"]].corr()
# Export for dashboard
dashboard_data = df.tail(90).to_json(orient="records", date_format="iso")
# Pivot table: average metrics by day of week
pivot = df.pivot_table(
values=["weight_kg", "calories_in"],
index=df["recorded_at"].dt.day_name(),
aggfunc={"weight_kg": "mean", "calories_in": ["mean", "sum"]},
)
Visualization with matplotlib
matplotlib is the foundational plotting library for Python. Combined with pandas, it produces publication-quality charts for trend analysis, distributions, and dashboards.
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
fig, axes = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
# Weight trend with rolling average
ax1 = axes[0]
ax1.scatter(df["recorded_at"], df["weight_kg"], alpha=0.3, s=10, label="Daily")
ax1.plot(df["recorded_at"], df["weight_7d_avg"], color="tab:blue", linewidth=2, label="7-day avg")
ax1.fill_between(df["recorded_at"], df["weight_7d_avg"] - df["weight_kg"].std(),
df["weight_7d_avg"] + df["weight_kg"].std(), alpha=0.1)
ax1.set_ylabel("Weight (kg)")
ax1.legend()
ax1.grid(True, alpha=0.3)
# Body fat percentage
ax2 = axes[1]
ax2.plot(df["recorded_at"], df["body_fat_7d_avg"], color="tab:orange", linewidth=2)
ax2.set_ylabel("Body Fat (%)")
ax2.set_xlabel("Date")
ax2.xaxis.set_major_formatter(mdates.DateFormatter("%b %Y"))
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig("health_trends.png", dpi=150, bbox_inches="tight")
5. AI/ML (PyTorch, Unsloth, LoRA/QLoRA, Hugging Face)
LoRA/QLoRA Fine-Tuning with Unsloth
LoRA (Low-Rank Adaptation) fine-tunes large language models by training small adapter matrices instead of all parameters. QLoRA adds 4-bit quantization, making it possible to fine-tune 70B+ models on consumer GPUs. Unsloth optimizes the training loop for 2x speed.
from unsloth import FastLanguageModel
from trl import SFTTrainer
from transformers import TrainingArguments
from datasets import load_dataset
# Load base model with 4-bit quantization (QLoRA)
model, tokenizer = FastLanguageModel.from_pretrained(
model_name="nvidia/Nemotron-4-340B-Instruct",
max_seq_length=4096,
dtype=None, # auto-detect
load_in_4bit=True,
device_map="auto",
)
# Add LoRA adapters
model = FastLanguageModel.get_peft_model(
model,
r=64, # LoRA rank
target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj"],
lora_alpha=128,
lora_dropout=0.05,
bias="none",
use_gradient_checkpointing="unsloth", # 60% less VRAM
)
# Prepare dataset
dataset = load_dataset("json", data_files="training_data.jsonl", split="train")
def format_prompt(example):
return {
"text": f"<|system|>You are a helpful health advisor.<|end|>\n"
f"<|user|>{example['question']}<|end|>\n"
f"<|assistant|>{example['answer']}<|end|>"
}
dataset = dataset.map(format_prompt)
# Training configuration
trainer = SFTTrainer(
model=model,
tokenizer=tokenizer,
train_dataset=dataset,
dataset_text_field="text",
max_seq_length=4096,
args=TrainingArguments(
per_device_train_batch_size=4,
gradient_accumulation_steps=8,
warmup_steps=100,
num_train_epochs=3,
learning_rate=2e-5,
fp16=True,
logging_steps=10,
output_dir="./output",
optim="adamw_8bit",
seed=42,
),
)
trainer.train()
# Save LoRA adapter (only ~100MB vs 680GB full model)
model.save_pretrained("./health-advisor-lora")
tokenizer.save_pretrained("./health-advisor-lora")
Hugging Face Ecosystem
Hugging Face provides the transformers library for model loading, datasets for data processing, peft for parameter-efficient fine-tuning, and the Hub for model sharing. It is the standard ecosystem for modern NLP and multimodal AI.
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from peft import PeftModel, LoraConfig, get_peft_model
from datasets import load_dataset, DatasetDict
from huggingface_hub import HfApi
# Load and merge LoRA adapter with base model
base_model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.1-8B-Instruct",
torch_dtype="auto",
device_map="auto",
)
model = PeftModel.from_pretrained(base_model, "./health-advisor-lora")
model = model.merge_and_unload() # merge LoRA weights into base
# Push merged model to Hugging Face Hub
model.push_to_hub("josenobile/health-advisor-merged")
tokenizer.push_to_hub("josenobile/health-advisor-merged")
# Dataset processing pipeline
dataset = load_dataset("csv", data_files="health_qa.csv")
dataset = dataset["train"].train_test_split(test_size=0.1, seed=42)
def tokenize(examples):
return tokenizer(
examples["text"],
truncation=True,
max_length=2048,
padding="max_length",
)
tokenized = dataset.map(tokenize, batched=True, num_proc=4)
# Quick inference with pipeline API
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)
result = pipe("What exercises help reduce body fat?", max_new_tokens=256)
Model Inference and Serving
Serve fine-tuned models via FastAPI with batched inference, streaming responses, and GPU memory management. vLLM provides optimized serving with PagedAttention for high throughput.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from unsloth import FastLanguageModel
import torch
import asyncio
app = FastAPI()
model, tokenizer = FastLanguageModel.from_pretrained(
"./health-advisor-lora",
max_seq_length=4096,
load_in_4bit=True,
)
FastLanguageModel.for_inference(model)
@app.post("/api/chat")
async def chat(request: ChatRequest):
prompt = format_chat_prompt(request.messages)
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=1024,
temperature=request.temperature or 0.7,
top_p=0.9,
do_sample=True,
repetition_penalty=1.15,
)
response = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
return {"response": response, "model": "health-advisor-lora"}
6. Async Programming (asyncio, aiohttp)
asyncio Patterns
asyncio is Python's built-in framework for concurrent I/O-bound code. TaskGroup (3.11+) replaces gather for structured concurrency with proper error handling. Combine it with asyncio.Queue for producer-consumer patterns.
import asyncio
from asyncio import TaskGroup, Queue
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
# Structured concurrency with TaskGroup (3.11+)
async def fetch_all_metrics(user_ids: list[int]) -> list[dict]:
results: list[dict] = []
async with TaskGroup() as tg:
for uid in user_ids:
tg.create_task(fetch_user_metrics(uid, results))
return results
# Producer-consumer with bounded queue
async def producer(queue: Queue[dict], source: AsyncIterator[dict]) -> None:
async for item in source:
await queue.put(item)
await queue.put(None) # sentinel
async def consumer(queue: Queue[dict], batch_size: int = 50) -> None:
batch: list[dict] = []
while True:
item = await queue.get()
if item is None:
break
batch.append(item)
if len(batch) >= batch_size:
await flush_batch(batch)
batch.clear()
if batch:
await flush_batch(batch)
# Async context manager for resource cleanup
@asynccontextmanager
async def managed_connection(url: str):
conn = await create_connection(url)
try:
yield conn
finally:
await conn.close()
aiohttp: Async HTTP Client and Server
aiohttp provides both an async HTTP client for making concurrent requests and an async web server. Its connection pooling and session reuse make it efficient for API integrations and web scraping at scale.
import aiohttp
import asyncio
from aiohttp import ClientTimeout
async def fetch_health_apis(endpoints: list[str]) -> list[dict]:
"""Fetch data from multiple health APIs concurrently."""
timeout = ClientTimeout(total=30, connect=5)
results: list[dict] = []
async with aiohttp.ClientSession(timeout=timeout) as session:
async def fetch_one(url: str) -> dict:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(ep)) for ep in endpoints]
results = [t.result() for t in tasks]
return results
# Rate-limited concurrent fetcher
async def rate_limited_fetch(
urls: list[str],
max_concurrent: int = 10,
delay: float = 0.1,
) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
async def fetch(url: str) -> dict:
async with semaphore:
await asyncio.sleep(delay)
async with session.get(url) as resp:
return await resp.json()
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(u)) for u in urls]
return [t.result() for t in tasks]
7. Testing (pytest, coverage)
pytest: Modern Testing Framework
pytest is the standard Python testing framework, offering fixtures for setup and teardown, parametrize for data-driven tests, and a rich plugin ecosystem. Use pytest-asyncio for testing async code and pytest-cov for coverage reports.
import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock, patch
from health_api.main import app
from health_api.models import BodyMetrics
# Async fixture for test client
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
# Parametrized test: multiple inputs, one test function
@pytest.mark.parametrize("weight,expected_bmi", [
(70.0, 23.1),
(85.0, 28.1),
(55.0, 18.2),
])
def test_bmi_calculation(weight: float, expected_bmi: float):
bmi = calculate_bmi(weight, height_m=1.74)
assert bmi == pytest.approx(expected_bmi, abs=0.1)
# Async test with mocked dependency
@pytest.mark.asyncio
async def test_create_metrics(client: AsyncClient):
payload = {"weight_kg": 78.5, "body_fat_pct": 15.2}
with patch("health_api.deps.get_current_user", return_value=mock_user):
resp = await client.post("/api/metrics", json=payload)
assert resp.status_code == 201
data = resp.json()
assert data["metrics"]["weight_kg"] == 78.5
assert data["trend"] in ("up", "down", "stable")
# Fixture with database teardown
@pytest.fixture
async def db_session():
async with async_session_factory() as session:
yield session
await session.rollback()
# Custom marker for slow integration tests
@pytest.mark.slow
@pytest.mark.asyncio
async def test_full_sync_pipeline(db_session):
"""End-to-end test: BLE read, transform, store, verify."""
raw = generate_mock_ble_data(weight=80.0, impedance=500)
metrics = parse_ble_data(raw)
record = await store_metrics(db_session, metrics)
assert record.id is not None
assert record.weight_kg == pytest.approx(80.0, abs=0.1)
Coverage and CI Configuration
Use pytest-cov for coverage measurement and enforce minimum thresholds in CI. Combine with ruff for linting and mypy for type checking in a single test pipeline.
# pyproject.toml - pytest and coverage configuration
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
markers = [
"slow: marks tests as slow (deselect with '-m not slow')",
]
addopts = "--strict-markers -ra --cov=health_api --cov-report=term-missing"
[tool.coverage.run]
source = ["health_api"]
branch = true
omit = ["*/tests/*", "*/migrations/*"]
[tool.coverage.report]
fail_under = 85
show_missing = true
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"if __name__ == .__main__.",
]
# CI pipeline (GitHub Actions snippet)
# - run: |
# poetry run ruff check .
# poetry run mypy .
# poetry run pytest --cov-fail-under=85
8. Web Scraping (BeautifulSoup, Scrapy)
BeautifulSoup: Quick Parsing
BeautifulSoup combined with requests or aiohttp is the simplest approach for scraping structured HTML. It excels at one-off scripts and smaller crawls where a full framework is overkill.
import requests
from bs4 import BeautifulSoup
from dataclasses import dataclass
@dataclass
class NutritionInfo:
name: str
calories: int
protein_g: float
carbs_g: float
fat_g: float
def scrape_nutrition(food: str) -> NutritionInfo | None:
"""Scrape nutritional data for a food item."""
resp = requests.get(
"https://example.com/nutrition/search",
params={"q": food},
headers={"User-Agent": "HealthBot/1.0"},
timeout=10,
)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
result = soup.select_one(".nutrition-result")
if not result:
return None
return NutritionInfo(
name=result.select_one("h2").get_text(strip=True),
calories=int(result.select_one("[data-nutrient='calories']").text),
protein_g=float(result.select_one("[data-nutrient='protein']").text),
carbs_g=float(result.select_one("[data-nutrient='carbs']").text),
fat_g=float(result.select_one("[data-nutrient='fat']").text),
)
Scrapy: Industrial-Strength Crawling
Scrapy is a full-featured crawling framework with built-in request scheduling, middleware pipelines, auto-throttling, and export to JSON/CSV/databases. Use it for large-scale, multi-page crawls with structured data extraction.
import scrapy
from scrapy.loader import ItemLoader
from itemloaders.processors import TakeFirst, MapCompose
class ExerciseItem(scrapy.Item):
name = scrapy.Field()
muscle_group = scrapy.Field()
difficulty = scrapy.Field()
description = scrapy.Field()
class ExerciseSpider(scrapy.Spider):
name = "exercises"
start_urls = ["https://example.com/exercises/"]
custom_settings = {
"CONCURRENT_REQUESTS": 8,
"DOWNLOAD_DELAY": 0.5,
"AUTOTHROTTLE_ENABLED": True,
"FEEDS": {"exercises.json": {"format": "json", "overwrite": True}},
}
def parse(self, response):
for card in response.css(".exercise-card"):
loader = ItemLoader(item=ExerciseItem(), selector=card)
loader.default_output_processor = TakeFirst()
loader.add_css("name", "h3::text")
loader.add_css("muscle_group", ".muscle-tag::text")
loader.add_css("difficulty", ".difficulty::attr(data-level)")
loader.add_css("description", ".desc::text")
yield loader.load_item()
next_page = response.css("a.next-page::attr(href)").get()
if next_page:
yield response.follow(next_page, self.parse)
9. Package Management (pip, poetry, uv, venv)
Poetry: Modern Dependency Management
Poetry provides deterministic builds with a lockfile, virtual environment management, dependency resolution, and package publishing. The pyproject.toml file is the single source of truth for project configuration.
# pyproject.toml
[tool.poetry]
name = "health-api"
version = "2.1.0"
description = "Health metrics API and data pipeline"
authors = ["Jose Nobile <jose@josenobile.co>"]
python = "^3.12"
[tool.poetry.dependencies]
python = "^3.12"
fastapi = "^0.115.0"
uvicorn = {extras = ["standard"], version = "^0.34.0"}
sqlalchemy = {extras = ["asyncio"], version = "^2.0.36"}
pydantic = "^2.10.0"
pandas = "^2.2.0"
numpy = "^2.1.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.3.0"
pytest-asyncio = "^0.24.0"
pytest-cov = "^6.0.0"
mypy = "^1.13.0"
pyright = "^1.1.390"
ruff = "^0.8.0"
[tool.poetry.group.ml.dependencies]
torch = "^2.5.0"
unsloth = {extras = ["colab-new"], version = "^2024.12"}
transformers = "^4.46.0"
datasets = "^3.1.0"
trl = "^0.12.0"
[tool.poetry.scripts]
api = "health_api.main:start"
sync = "health_api.sync:main"
# Commands
# poetry install -> install all deps
# poetry install --with ml -> include ML deps
# poetry lock -> regenerate lockfile
# poetry run api -> run the API
# poetry build -> build package
uv: Fast Python Package Manager
uv (v0.11.6 as of April 2026) is a Rust-based Python package installer and resolver that is 10-100x faster than pip. It supports pyproject.toml, lockfiles, virtual environments, and Python version management in a single tool. It has become the de facto standard for new Python projects in 2026.
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create project with uv
uv init health-api
cd health-api
# Add dependencies (resolves and locks automatically)
uv add fastapi uvicorn[standard] sqlalchemy[asyncio] pydantic
uv add --dev pytest pytest-asyncio mypy ruff
# Sync environment from lockfile (fast, deterministic)
uv sync
# Run commands inside the managed environment
uv run uvicorn health_api.main:app --reload
uv run pytest --cov
# Pin Python version for the project
uv python pin 3.12
# Compile requirements.txt from pyproject.toml (for Docker)
uv pip compile pyproject.toml -o requirements.txt
# Install into current venv (pip-compatible mode)
uv pip install -r requirements.txt
Virtual Environments and pip
For simpler projects, venv + pip + requirements.txt remains the most portable approach. Pin exact versions with pip freeze and separate dev from production dependencies.
# Create and activate virtual environment
python3.12 -m venv .venv
source .venv/bin/activate
# Install with constraints
pip install -r requirements.txt -c constraints.txt
# requirements.txt (pinned)
fastapi==0.115.6
uvicorn[standard]==0.34.0
pydantic==2.10.3
sqlalchemy[asyncio]==2.0.36
# requirements-dev.txt
-r requirements.txt
pytest==8.3.4
pytest-cov==6.0.0
mypy==1.13.0
ruff==0.8.3
# Dockerfile multi-stage build
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
FROM python:3.12-slim
COPY --from=builder /install /usr/local
COPY . /app
WORKDIR /app
CMD ["uvicorn", "health_api.main:app", "--host", "0.0.0.0", "--port", "8000"]
10. Type Hints (mypy, pyright)
Advanced Type Annotations
Python's type system supports generics, protocols (structural typing), literal types, overloads, and TypeGuard. Type annotations are validated by mypy and pyright at development time, catching bugs before runtime.
from typing import Protocol, TypeVar, Generic, overload, Literal
from collections.abc import Sequence, Callable, AsyncIterator
T = TypeVar("T")
T_co = TypeVar("T_co", covariant=True)
# Protocol (structural typing - like Go interfaces)
class Measurable(Protocol):
@property
def value(self) -> float: ...
@property
def unit(self) -> str: ...
def to_base_unit(self) -> float: ...
# Generic repository
class Repository(Generic[T]):
async def get(self, id: int) -> T | None: ...
async def list(self, limit: int = 100, offset: int = 0) -> Sequence[T]: ...
async def create(self, entity: T) -> T: ...
async def update(self, id: int, entity: T) -> T: ...
async def delete(self, id: int) -> bool: ...
# Overloaded function signatures
@overload
def parse_metric(raw: str, kind: Literal["weight"]) -> WeightMetric: ...
@overload
def parse_metric(raw: str, kind: Literal["body_fat"]) -> BodyFatMetric: ...
def parse_metric(raw: str, kind: str) -> WeightMetric | BodyFatMetric:
if kind == "weight":
return WeightMetric.from_raw(raw)
return BodyFatMetric.from_raw(raw)
# Callback types with ParamSpec
from typing import ParamSpec
P = ParamSpec("P")
def retry(max_attempts: int = 3) -> Callable[[Callable[P, T]], Callable[P, T]]:
def decorator(func: Callable[P, T]) -> Callable[P, T]:
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception:
if attempt == max_attempts - 1:
raise
raise RuntimeError("unreachable")
return wrapper
return decorator
mypy and pyright Configuration
Strict mypy and pyright configuration catches type errors early. Integrate both into CI/CD to prevent type regressions. Per-module overrides handle third-party libraries without type stubs.
# pyproject.toml - mypy configuration
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_any_generics = true
check_untyped_defs = true
no_implicit_reexport = true
warn_redundant_casts = true
warn_unused_ignores = true
show_error_codes = true
pretty = true
[[tool.mypy.overrides]]
module = ["unsloth.*", "trl.*", "bleak.*"]
ignore_missing_imports = true
# pyright configuration
[tool.pyright]
pythonVersion = "3.12"
typeCheckingMode = "strict"
reportMissingTypeStubs = "warning"
reportUnusedImport = "error"
reportUnusedVariable = "error"
# ruff configuration (linter + formatter)
[tool.ruff]
target-version = "py312"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP", "B", "A", "SIM", "TCH"]
ignore = ["E501"]
11. Python 3.13 and 3.14: Free-Threading and T-Strings
Python 3.13 (October 2024)
Python 3.13 is a landmark release with two experimental features that signal the future of the language: a free-threading build (python3.13t) that disables the GIL for true multi-core parallelism, and an experimental JIT compiler for improved runtime performance. It also ships an improved interactive REPL with multi-line editing and syntax highlighting, and initial Tier 3 support for iOS and Android platforms.
# Python 3.13: Free-threading experimental build
# Install the free-threaded build: python3.13t
import threading
import time
def cpu_bound(n: int) -> int:
"""CPU-intensive work that benefits from true parallelism."""
return sum(i * i for i in range(n))
# With GIL (python3.13): threads run one at a time for CPU work
# With free-threading (python3.13t): threads run on separate cores
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
start = time.perf_counter()
for t in threads: t.start()
for t in threads: t.join()
print(f"Elapsed: {time.perf_counter() - start:.2f}s")
# python3.13: ~4.0s (serial due to GIL)
# python3.13t: ~1.1s (parallel on 4 cores)
Python 3.14 (October 2025)
Python 3.14 introduces template string literals (t-strings, PEP 750), deferred evaluation of annotations (PEP 649/749), and significant improvements to free-threading. T-strings are the most visible new feature: they look like f-strings but produce Template objects instead of strings, enabling safe interpolation for SQL, HTML, and other contexts where injection prevention matters. The latest maintenance release is Python 3.14.4 (April 7, 2026) with 337 bugfixes. Python 3.15.0a8 is in alpha (features until May 5, 2026; RC July 28, 2026). Free-threaded builds are now officially supported in 3.14.
# Python 3.14: Template strings (t-strings, PEP 750)
from string.templatelib import Template
name = "Jose"
greeting: Template = t"Hello, {name}!"
# greeting is a Template object, NOT a string
# Libraries can inspect the structure for safe interpolation
# Use case: SQL injection prevention
user_input = "'; DROP TABLE users; --"
query = t"SELECT * FROM users WHERE name = {user_input}"
# A SQL library processes the Template safely, parameterizing the value
# Deferred annotations (PEP 649/749) - annotations evaluated lazily
from __future__ import annotations # no longer needed in 3.14
class Tree:
left: Tree | None # works without quotes -- evaluated lazily
right: Tree | None
value: int
Free-Threading Deep Dive: The GIL Removal
The removal of the Global Interpreter Lock (GIL) is the biggest change to CPython in decades. In Python 3.13, free-threading is experimental with a 10-15% single-thread performance penalty. In Python 3.14, this penalty drops to 5-10% thanks to a thread-safe incremental garbage collector and per-object locking on built-in types. Free-threading could become the default by Python 3.15 (2026). To use it: install the free-threaded build (python3.14t), and ensure your C extensions are thread-safe. Most pure Python code works unchanged.
Python 3.15 Alpha: JIT Compiler Finally Delivers Speed Gains
Python 3.15 (currently in alpha, features freeze May 5, 2026) marks the first time the experimental JIT compiler consistently outperforms the interpreter. On macOS AArch64, the JIT is 11-12% faster than the tail-calling interpreter; on x86_64 Linux, it delivers 5-6% speedups over the standard interpreter. The JIT uses a copy-and-patch compilation strategy that was infrastructure-focused in 3.13-3.14, but 3.15 benefits from a larger contributor base and optimized code generation. The specializing adaptive interpreter is now fully enabled in free-threaded builds, nearly closing the single-threaded performance gap between GIL and free-threaded modes.
# Free-threading: real multi-core parallelism in Python 3.14t
import threading
from concurrent.futures import ThreadPoolExecutor
# Per-object locks on built-in types ensure thread safety
shared_dict: dict[str, int] = {}
lock = threading.Lock() # still recommended for compound operations
def process_batch(batch_id: int, data: list[float]) -> dict:
"""Process a batch of data on a dedicated core."""
result = heavy_computation(data) # truly parallel with python3.14t
with lock:
shared_dict[f"batch_{batch_id}"] = len(result)
return result
# ThreadPoolExecutor with free-threading uses real OS threads on separate cores
with ThreadPoolExecutor(max_workers=8) as pool:
futures = [pool.submit(process_batch, i, chunk) for i, chunk in enumerate(chunks)]
results = [f.result() for f in futures]
# Check if running free-threaded: import sys; sys._is_gil_enabled()