Lab 05: Neural Network EnsemblesΒΆ
Source: Deep Learning Interviews by Shlomo Kashani β Chapter: NN Ensembles
PDF: resources/2201.00650v2.pdf
This lab explores ensemble methods β techniques that combine multiple models to produce predictions superior to any single model. We cover:
Bagging β Bootstrap Aggregating to reduce variance
Boosting (AdaBoost) β Sequential training focused on errors
Stacking β Using base model predictions as meta-features
Prediction Combining Strategies β Voting, averaging, weighted averaging
Snapshot Ensembles β Cyclic learning rates to harvest multiple minima
Learning Rate Schedules β Step decay, exponential decay, warmup+cosine
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Callable, Tuple
np.random.seed(42)
plt.rcParams["figure.figsize"] = (10, 5)
plt.rcParams["axes.grid"] = True
plt.rcParams["grid.alpha"] = 0.3
print("Imports loaded successfully.")
PART 1: Bagging (Bootstrap Aggregating)ΒΆ
Key idea: Train multiple models on different bootstrap samples (random samples drawn with replacement from the training set), then average their predictions.
Why it works:
Each bootstrap sample is ~63.2% unique data (the rest are duplicates).
Different samples lead to different fitted models, introducing diversity.
Averaging diverse predictions reduces variance without increasing bias.
If each model has variance \(\sigma^2\) and they are uncorrelated, the ensemble variance is \(\sigma^2 / M\) where \(M\) is the number of models.
Bagging is the foundation of Random Forests and many practical ensemble systems.
def bootstrap_sample(X: np.ndarray, y: np.ndarray, seed: int = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Create a bootstrap sample (sample WITH replacement).
Returns (X_boot, y_boot) of the same size as the input.
"""
if seed is not None:
np.random.seed(seed)
n = len(X)
indices = np.random.randint(0, n, size=n)
return X[indices], y[indices]
# Quick demo
X_demo = np.arange(10)
y_demo = np.arange(10)
X_b, y_b = bootstrap_sample(X_demo, y_demo, seed=42)
print(f"Original X : {X_demo}")
print(f"Bootstrap X: {X_b}")
print(f"Unique samples: {len(np.unique(X_b))}/10 (expected ~63.2%)")
class BaggingRegressor:
"""Bagging ensemble using simple linear models."""
def __init__(self, n_estimators: int = 10):
self.n_estimators = n_estimators
self.models: List[np.ndarray] = [] # each model is a weight vector [w, b]
def _fit_linear(self, X: np.ndarray, y: np.ndarray) -> np.ndarray:
"""Fit a simple linear model y = w*x + b via least squares."""
A = np.column_stack([X, np.ones(len(X))])
w, _, _, _ = np.linalg.lstsq(A, y, rcond=None)
return w # shape (2,) -> [slope, intercept]
def fit(self, X: np.ndarray, y: np.ndarray) -> "BaggingRegressor":
"""Train n_estimators on bootstrap samples."""
self.models = []
for i in range(self.n_estimators):
X_boot, y_boot = bootstrap_sample(X, y, seed=i)
w = self._fit_linear(X_boot, y_boot)
self.models.append(w)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
"""Average predictions from all models."""
A = np.column_stack([X, np.ones(len(X))])
predictions = np.array([A @ w for w in self.models]) # (n_estimators, n_samples)
return predictions.mean(axis=0)
def predict_individual(self, X: np.ndarray) -> np.ndarray:
"""Return individual model predictions (for variance analysis)."""
A = np.column_stack([X, np.ones(len(X))])
return np.array([A @ w for w in self.models])
# Demo on noisy sine data
np.random.seed(42)
X_train = np.linspace(0, 5, 50)
y_train = np.sin(X_train) + np.random.randn(50) * 0.3
bag = BaggingRegressor(n_estimators=10).fit(X_train, y_train)
X_test = np.linspace(0, 5, 100)
y_pred = bag.predict(X_test)
plt.figure(figsize=(10, 5))
plt.scatter(X_train, y_train, alpha=0.5, label="Training data (noisy sine)", s=20)
plt.plot(X_test, np.sin(X_test), "g--", label="True sine", linewidth=2)
plt.plot(X_test, y_pred, "r-", label="Bagging ensemble (10 models)", linewidth=2)
plt.xlabel("x")
plt.ylabel("y")
plt.title("Bagging Regressor on Noisy Sine Data")
plt.legend()
plt.tight_layout()
plt.show()
# Show variance reduction: single model vs 10-model ensemble
np.random.seed(42)
X_train = np.linspace(0, 5, 50)
y_train = np.sin(X_train) + np.random.randn(50) * 0.3
X_test = np.linspace(0, 5, 20)
# Train many single models (each on a different bootstrap sample)
n_trials = 50
single_preds = []
for trial in range(n_trials):
X_b, y_b = bootstrap_sample(X_train, y_train, seed=trial * 100)
A = np.column_stack([X_b, np.ones(len(X_b))])
w, _, _, _ = np.linalg.lstsq(A, y_b, rcond=None)
A_test = np.column_stack([X_test, np.ones(len(X_test))])
single_preds.append(A_test @ w)
single_preds = np.array(single_preds)
# Train many 10-model ensembles
ensemble_preds = []
for trial in range(n_trials):
bag = BaggingRegressor(n_estimators=10)
# Shift seeds so each ensemble is different
bag_models = []
for i in range(10):
X_b, y_b = bootstrap_sample(X_train, y_train, seed=trial * 1000 + i)
A = np.column_stack([X_b, np.ones(len(X_b))])
w, _, _, _ = np.linalg.lstsq(A, y_b, rcond=None)
bag_models.append(w)
A_test = np.column_stack([X_test, np.ones(len(X_test))])
preds = np.array([A_test @ w for w in bag_models]).mean(axis=0)
ensemble_preds.append(preds)
ensemble_preds = np.array(ensemble_preds)
# Compare variance at each test point
single_var = single_preds.var(axis=0)
ensemble_var = ensemble_preds.var(axis=0)
plt.figure(figsize=(10, 5))
plt.bar(np.arange(len(X_test)) - 0.2, single_var, width=0.4, label="Single model variance", alpha=0.8)
plt.bar(np.arange(len(X_test)) + 0.2, ensemble_var, width=0.4, label="Ensemble (10) variance", alpha=0.8)
plt.xlabel("Test point index")
plt.ylabel("Prediction variance")
plt.title("Variance Reduction: Single Model vs. 10-Model Bagging Ensemble")
plt.legend()
plt.tight_layout()
plt.show()
print(f"Mean single-model variance: {single_var.mean():.6f}")
print(f"Mean ensemble variance: {ensemble_var.mean():.6f}")
print(f"Variance reduction factor: {single_var.mean() / ensemble_var.mean():.2f}x")
PART 2: Boosting (AdaBoost)ΒΆ
Key idea: Train models sequentially, where each new model focuses on the examples that previous models got wrong.
AdaBoost algorithm:
Initialize sample weights uniformly: \(w_i = 1/N\)
For each round \(t = 1, \ldots, T\):
Fit weak learner \(h_t\) on weighted data
Compute weighted error: \(\epsilon_t = \sum_i w_i \cdot \mathbb{1}[h_t(x_i) \neq y_i]\)
Compute model weight: \(\alpha_t = \frac{1}{2} \ln\left(\frac{1 - \epsilon_t}{\epsilon_t}\right)\)
Update sample weights: \(w_i \leftarrow w_i \cdot \exp(-\alpha_t \cdot y_i \cdot h_t(x_i))\)
Normalize weights so they sum to 1
Final prediction: \(H(x) = \text{sign}\left(\sum_t \alpha_t \cdot h_t(x)\right)\)
Unlike bagging, boosting reduces bias and can overfit if run too long.
class SimpleAdaBoost:
"""Simplified AdaBoost with decision stumps on 1D data."""
def __init__(self, n_estimators: int = 10):
self.n_estimators = n_estimators
self.stumps: List[Tuple[float, int]] = [] # (threshold, direction)
self.alphas: List[float] = []
def _best_stump(self, X: np.ndarray, y: np.ndarray, weights: np.ndarray
) -> Tuple[float, int, float]:
"""
Find the best threshold on 1D data to split into +1 / -1.
Returns (threshold, direction, weighted_error).
direction=+1 means: predict +1 if x >= threshold, else -1.
direction=-1 means: predict -1 if x >= threshold, else +1.
"""
best_err = float("inf")
best_thresh = 0.0
best_dir = 1
thresholds = np.unique(X)
for thresh in thresholds:
for direction in [1, -1]:
preds = np.ones(len(X))
if direction == 1:
preds[X < thresh] = -1
else:
preds[X >= thresh] = -1
err = np.sum(weights * (preds != y))
if err < best_err:
best_err = err
best_thresh = thresh
best_dir = direction
return best_thresh, best_dir, best_err
def _stump_predict(self, X: np.ndarray, threshold: float, direction: int) -> np.ndarray:
"""Predict with a single stump."""
preds = np.ones(len(X))
if direction == 1:
preds[X < threshold] = -1
else:
preds[X >= threshold] = -1
return preds
def fit(self, X: np.ndarray, y: np.ndarray) -> "SimpleAdaBoost":
"""
Full AdaBoost training loop.
X: 1D array of features
y: array of labels in {-1, +1}
"""
N = len(y)
weights = np.ones(N) / N
self.stumps = []
self.alphas = []
for t in range(self.n_estimators):
# 1. Find best weak learner
thresh, direction, err = self._best_stump(X, y, weights)
# Clamp error to avoid log(0) or division by zero
err = np.clip(err, 1e-10, 1.0 - 1e-10)
# 2. Compute model weight
alpha = 0.5 * np.log((1.0 - err) / err)
# 3. Get predictions of this stump
preds = self._stump_predict(X, thresh, direction)
# 4. Update sample weights
weights = weights * np.exp(-alpha * y * preds)
weights = weights / weights.sum() # normalize
# Store
self.stumps.append((thresh, direction))
self.alphas.append(alpha)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
"""Weighted vote of all stumps."""
total = np.zeros(len(X))
for alpha, (thresh, direction) in zip(self.alphas, self.stumps):
total += alpha * self._stump_predict(X, thresh, direction)
return np.sign(total)
print("SimpleAdaBoost class defined.")
# Demo on 1D data with a non-linear decision boundary
# Labels: +1 if |x| > 1, else -1 (two thresholds needed)
np.random.seed(42)
X_ada = np.random.uniform(-3, 3, 200)
y_ada = np.where(np.abs(X_ada) > 1.0, 1.0, -1.0)
# Add some noise: flip 5% of labels
flip_idx = np.random.choice(200, 10, replace=False)
y_ada[flip_idx] *= -1
ada = SimpleAdaBoost(n_estimators=20).fit(X_ada, y_ada)
X_plot = np.linspace(-3, 3, 500)
y_plot = ada.predict(X_plot)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Left: data and decision regions
ax = axes[0]
ax.scatter(X_ada[y_ada == 1], np.zeros_like(X_ada[y_ada == 1]), c="blue", alpha=0.5, label="+1", s=30)
ax.scatter(X_ada[y_ada == -1], np.zeros_like(X_ada[y_ada == -1]) + 0.1, c="red", alpha=0.5, label="-1", s=30)
ax.fill_between(X_plot, -0.2, 0.3, where=(y_plot > 0), alpha=0.15, color="blue", label="Predicted +1")
ax.fill_between(X_plot, -0.2, 0.3, where=(y_plot <= 0), alpha=0.15, color="red", label="Predicted -1")
ax.set_xlabel("x")
ax.set_title("AdaBoost Decision Regions (non-linear boundary)")
ax.legend(loc="upper right", fontsize=8)
ax.set_ylim(-0.2, 0.4)
# Right: alpha values over rounds
ax = axes[1]
ax.bar(range(len(ada.alphas)), ada.alphas, color="steelblue")
ax.set_xlabel("Boosting round")
ax.set_ylabel("Alpha (model weight)")
ax.set_title("AdaBoost Model Weights per Round")
plt.tight_layout()
plt.show()
train_acc = np.mean(ada.predict(X_ada) == y_ada)
print(f"Training accuracy: {train_acc:.1%}")
PART 3: StackingΒΆ
Key idea: Use the predictions of base models as input features for a meta-learner (second-level model).
Algorithm:
Train \(M\) diverse base models on the training set.
Generate predictions from each base model on a held-out validation set.
Stack these \(M\) predictions as new features.
Train a meta-learner on these stacked features to predict the true labels.
The meta-learner discovers which base models to trust and how to combine them, often outperforming any individual base model and even simple averaging.
def stacking_demo():
"""
Stacking demo:
- 3 base models (simple threshold classifiers with different thresholds)
- Meta-learner: simple logistic-style linear classifier trained via gradient descent
"""
np.random.seed(42)
N = 300
X = np.random.randn(N, 1)
y = (X[:, 0] > 0).astype(float) # true boundary at x=0
# Split into train and validation
split = 200
X_train, X_val = X[:split], X[split:]
y_train, y_val = y[:split], y[split:]
# --- Base models: threshold classifiers with different thresholds ---
def model_predict(x, threshold):
return (x[:, 0] > threshold).astype(float)
thresholds = [0.2, -0.1, 0.5] # intentionally imperfect
base_names = [f"Threshold @ {t}" for t in thresholds]
# Get base model predictions on validation set
base_preds_val = np.column_stack([
model_predict(X_val, t) for t in thresholds
]) # shape: (100, 3)
# --- Train meta-learner (logistic regression via gradient descent) ---
# Get base model predictions on training set for meta-learner training
base_preds_train = np.column_stack([
model_predict(X_train, t) for t in thresholds
])
# Add bias column
Z_train = np.column_stack([base_preds_train, np.ones(split)])
Z_val = np.column_stack([base_preds_val, np.ones(N - split)])
# Simple logistic regression via gradient descent
def sigmoid(z):
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
w_meta = np.zeros(Z_train.shape[1])
lr = 0.1
for _ in range(500):
logits = Z_train @ w_meta
p = sigmoid(logits)
grad = Z_train.T @ (p - y_train) / split
w_meta -= lr * grad
# Meta-learner predictions on validation
meta_probs = sigmoid(Z_val @ w_meta)
meta_preds = (meta_probs > 0.5).astype(float)
# --- Compare accuracies ---
print("=" * 50)
print("Stacking Results on Validation Set")
print("=" * 50)
for name, t in zip(base_names, thresholds):
acc = np.mean(model_predict(X_val, t) == y_val)
print(f" {name:20s} accuracy: {acc:.1%}")
meta_acc = np.mean(meta_preds == y_val)
print(f" {'Stacked meta-learner':20s} accuracy: {meta_acc:.1%}")
print()
print(f"Meta-learner weights: {w_meta[:-1].round(3)} bias: {w_meta[-1]:.3f}")
print("The meta-learner learns to combine base models optimally.")
stacking_demo()
PART 4: Prediction Combining StrategiesΒΆ
Given \(M\) classifiers, three common ways to combine their predictions:
Majority Voting: Each classifier casts a vote; the class with the most votes wins. $\(\hat{y} = \text{mode}\{h_1(x), h_2(x), \ldots, h_M(x)\}\)$
Simple Averaging: Average the predicted probabilities, then threshold. $\(\hat{p}(x) = \frac{1}{M} \sum_{m=1}^M p_m(x), \quad \hat{y} = \mathbb{1}[\hat{p} > 0.5]\)$
Weighted Averaging: Like simple averaging but each classifier gets a weight (e.g., proportional to its validation accuracy). $\(\hat{p}(x) = \sum_{m=1}^M w_m \cdot p_m(x), \quad \sum_m w_m = 1\)$
def combining_strategies():
"""
5 classifiers on 100 examples.
Compare majority voting, simple averaging, and weighted averaging.
"""
np.random.seed(42)
n_classifiers = 5
n_examples = 100
# True labels
y_true = np.random.randint(0, 2, size=n_examples)
# Simulate classifiers with different accuracies (60%-90%)
accuracies = [0.65, 0.70, 0.75, 0.80, 0.90]
predictions = np.zeros((n_classifiers, n_examples), dtype=int)
probabilities = np.zeros((n_classifiers, n_examples))
for i, acc in enumerate(accuracies):
np.random.seed(42 + i)
correct_mask = np.random.rand(n_examples) < acc
predictions[i] = np.where(correct_mask, y_true, 1 - y_true)
# Probabilities: correct class gets high prob, wrong gets low
base_prob = np.random.uniform(0.5, 0.95, n_examples)
probabilities[i] = np.where(
predictions[i] == 1,
base_prob,
1.0 - base_prob
)
# --- Strategy 1: Majority Voting ---
def majority_vote(preds: np.ndarray) -> np.ndarray:
"""Return the class with the most votes for each example."""
vote_sum = preds.sum(axis=0) # count of 1's
return (vote_sum > n_classifiers / 2).astype(int)
# --- Strategy 2: Simple Averaging ---
def simple_average(probs: np.ndarray, threshold: float = 0.5) -> np.ndarray:
"""Average probabilities, then threshold."""
avg_prob = probs.mean(axis=0)
return (avg_prob > threshold).astype(int)
# --- Strategy 3: Weighted Averaging ---
def weighted_average(probs: np.ndarray, weights: np.ndarray,
threshold: float = 0.5) -> np.ndarray:
"""Weighted average of probabilities."""
w = weights / weights.sum() # normalize
weighted_prob = (w[:, None] * probs).sum(axis=0)
return (weighted_prob > threshold).astype(int)
# Weights proportional to accuracy
weights = np.array(accuracies)
# Compute predictions
mv_preds = majority_vote(predictions)
sa_preds = simple_average(probabilities)
wa_preds = weighted_average(probabilities, weights)
# Compute accuracies
print("=" * 55)
print("Prediction Combining Strategies")
print("=" * 55)
print("\nIndividual classifier accuracies:")
for i, acc in enumerate(accuracies):
actual_acc = np.mean(predictions[i] == y_true)
print(f" Classifier {i+1} (target {acc:.0%}): {actual_acc:.1%}")
print("\nCombining strategies:")
print(f" Majority Voting: {np.mean(mv_preds == y_true):.1%}")
print(f" Simple Averaging: {np.mean(sa_preds == y_true):.1%}")
print(f" Weighted Averaging: {np.mean(wa_preds == y_true):.1%}")
# Bar chart
labels = [f"C{i+1}" for i in range(n_classifiers)] + ["Majority\nVote", "Simple\nAvg", "Weighted\nAvg"]
accs = [np.mean(predictions[i] == y_true) for i in range(n_classifiers)]
accs += [np.mean(mv_preds == y_true), np.mean(sa_preds == y_true), np.mean(wa_preds == y_true)]
colors = ["#7fbbda"] * n_classifiers + ["#2a6496", "#d9534f", "#5cb85c"]
plt.figure(figsize=(10, 5))
bars = plt.bar(labels, accs, color=colors, edgecolor="black", linewidth=0.5)
plt.ylabel("Accuracy")
plt.title("Individual Classifiers vs. Combining Strategies")
plt.ylim(0.5, 1.0)
for bar, a in zip(bars, accs):
plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.005,
f"{a:.0%}", ha="center", va="bottom", fontsize=9)
plt.tight_layout()
plt.show()
combining_strategies()
PART 5: Snapshot EnsemblesΒΆ
Key insight from Huang et al. (2017): A single training run with a cyclic learning rate can visit multiple local minima. By saving a snapshot of the model at the end of each cycle (when LR is at its minimum), we get an ensemble for free β no extra training cost.
Cosine annealing schedule: $\(\text{lr}(t) = \text{lr}_{\min} + \frac{1}{2}(\text{lr}_{\max} - \text{lr}_{\min})\left(1 + \cos\left(\frac{\pi t}{T}\right)\right)\)$
where \(T\) is the cycle length. At \(t=0\) the LR is at maximum (exploration), at \(t=T\) it drops to minimum (convergence to a local minimum).
def cosine_annealing_lr(epoch: int, T: int, lr_max: float = 0.1,
lr_min: float = 0.001) -> float:
"""
Cosine annealing learning rate schedule.
lr = lr_min + 0.5 * (lr_max - lr_min) * (1 + cos(pi * t / T))
"""
return lr_min + 0.5 * (lr_max - lr_min) * (1 + np.cos(np.pi * epoch / T))
# Verify
print("Cosine annealing LR values:")
for epoch in [0, 25, 50, 75, 100]:
lr = cosine_annealing_lr(epoch, T=100)
print(f" Epoch {epoch:3d}: lr = {lr:.6f}")
# Plot cyclic LR with 3 cycles over 300 epochs
total_epochs = 300
T = 100 # cycle length
lrs = []
for epoch in range(total_epochs):
t = epoch % T
lrs.append(cosine_annealing_lr(t, T, lr_max=0.1, lr_min=0.001))
plt.figure(figsize=(10, 4))
plt.plot(range(total_epochs), lrs, color="steelblue", linewidth=1.5)
# Mark snapshot points (end of each cycle)
snapshot_epochs = [T - 1, 2 * T - 1, 3 * T - 1]
snapshot_lrs = [lrs[e] for e in snapshot_epochs]
plt.scatter(snapshot_epochs, snapshot_lrs, color="red", zorder=5, s=80,
label="Snapshot (save model)")
plt.xlabel("Epoch")
plt.ylabel("Learning Rate")
plt.title("Cosine Annealing with Warm Restarts (3 Cycles)")
plt.legend()
plt.tight_layout()
plt.show()
# Simulate snapshot ensemble on f(x) = x^4 - 3x^2
# This function has local minima near x = -1.22 and x = +1.22
f = lambda x: x**4 - 3 * x**2
f_grad = lambda x: 4 * x**3 - 6 * x
n_cycles = 5
steps_per_cycle = 100
snapshots = []
trajectory = []
x = 3.0 # start far from minima
for cycle in range(n_cycles):
for step in range(steps_per_cycle):
lr = cosine_annealing_lr(step, steps_per_cycle, lr_max=0.05, lr_min=0.001)
grad = f_grad(x)
x = x - lr * grad
trajectory.append(x)
snapshots.append(x)
# Warm restart: perturb x to explore different region
x = x + np.random.randn() * 1.5
# Plot the function, trajectory, and snapshots
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Left: loss landscape with snapshots
ax = axes[0]
x_range = np.linspace(-3, 3, 300)
ax.plot(x_range, f(x_range), "k-", linewidth=2, label="$f(x) = x^4 - 3x^2$")
for i, snap in enumerate(snapshots):
ax.plot(snap, f(snap), "o", markersize=10, label=f"Snapshot {i+1}: x={snap:.3f}")
ax.set_xlabel("x")
ax.set_ylabel("f(x)")
ax.set_title("Snapshot Ensemble: Finding Multiple Minima")
ax.legend(fontsize=8)
# Right: trajectory over training steps
ax = axes[1]
ax.plot(trajectory, linewidth=0.8, alpha=0.8)
for i in range(n_cycles):
step_idx = (i + 1) * steps_per_cycle - 1
ax.axvline(step_idx, color="red", linestyle="--", alpha=0.5)
ax.plot(step_idx, trajectory[step_idx], "ro", markersize=8)
ax.set_xlabel("Training step")
ax.set_ylabel("x (parameter value)")
ax.set_title("Parameter Trajectory with Cyclic Restarts")
plt.tight_layout()
plt.show()
print("Snapshots found at:")
for i, snap in enumerate(snapshots):
print(f" Snapshot {i+1}: x = {snap:.4f}, f(x) = {f(snap):.4f}")
print(f"\nTrue minima at x = +/-{np.sqrt(1.5):.4f}, f = {f(np.sqrt(1.5)):.4f}")
PART 6: Learning Rate SchedulesΒΆ
The learning rate schedule profoundly affects training dynamics and ensemble quality. Three common schedules:
Step Decay: Drop the LR by a constant factor every \(k\) epochs. $\(\text{lr}(t) = \text{lr}_0 \times \text{drop}^{\lfloor t / k \rfloor}\)$
Exponential Decay: Smooth exponential reduction. $\(\text{lr}(t) = \text{lr}_0 \times \gamma^t\)$
Warmup + Cosine: Linear warmup for the first few epochs, then cosine decay. $\(\text{lr}(t) = \begin{cases} \text{lr}_{\max} \cdot t / T_w & \text{if } t < T_w \\ \frac{\text{lr}_{\max}}{2}\left(1 + \cos\left(\frac{\pi(t - T_w)}{T - T_w}\right)\right) & \text{otherwise} \end{cases}\)$
For ensembles, cyclic and warm-restart schedules are especially useful because they allow snapshot ensembling (Part 5).
def step_decay(epoch: int, initial_lr: float = 0.1, drop: float = 0.5,
epochs_drop: int = 30) -> float:
"""LR drops by factor 'drop' every 'epochs_drop' epochs."""
return initial_lr * (drop ** (epoch // epochs_drop))
def exponential_decay(epoch: int, initial_lr: float = 0.1,
decay_rate: float = 0.95) -> float:
"""LR = initial_lr * decay_rate^epoch."""
return initial_lr * (decay_rate ** epoch)
def warmup_cosine(epoch: int, warmup_epochs: int = 10, total_epochs: int = 200,
lr_max: float = 0.1) -> float:
"""Linear warmup for warmup_epochs, then cosine decay to 0."""
if epoch < warmup_epochs:
# Linear warmup
return lr_max * epoch / warmup_epochs
else:
# Cosine decay
progress = (epoch - warmup_epochs) / (total_epochs - warmup_epochs)
return lr_max * 0.5 * (1 + np.cos(np.pi * progress))
# Quick verification
print("Step Decay:")
for e in [0, 29, 30, 59, 60]:
print(f" Epoch {e:3d}: lr = {step_decay(e):.6f}")
print("\nExponential Decay:")
for e in [0, 10, 50, 100, 199]:
print(f" Epoch {e:3d}: lr = {exponential_decay(e):.6f}")
print("\nWarmup + Cosine:")
for e in [0, 5, 10, 100, 199]:
print(f" Epoch {e:3d}: lr = {warmup_cosine(e):.6f}")
# Plot all 3 schedules for 200 epochs on the same axes
epochs = np.arange(200)
lr_step = [step_decay(e) for e in epochs]
lr_exp = [exponential_decay(e) for e in epochs]
lr_warmup_cos = [warmup_cosine(e, warmup_epochs=10, total_epochs=200) for e in epochs]
plt.figure(figsize=(10, 5))
plt.plot(epochs, lr_step, label="Step Decay (drop=0.5, every 30 epochs)",
linewidth=2, linestyle="-")
plt.plot(epochs, lr_exp, label="Exponential Decay (rate=0.95)",
linewidth=2, linestyle="--")
plt.plot(epochs, lr_warmup_cos, label="Warmup (10 ep) + Cosine Decay",
linewidth=2, linestyle="-.")
plt.xlabel("Epoch")
plt.ylabel("Learning Rate")
plt.title("Comparison of Learning Rate Schedules (200 Epochs)")
plt.legend()
plt.tight_layout()
plt.show()
SummaryΒΆ
Method |
Key Idea |
Reduces |
|---|---|---|
Bagging |
Train on bootstrap samples, average predictions |
Variance |
Boosting |
Sequential training, focus on errors |
Bias |
Stacking |
Use base predictions as meta-features |
Both (via meta-learner) |
Voting/Averaging |
Combine final predictions directly |
Variance |
Snapshot Ensembles |
Cyclic LR + save at minima |
Variance (free ensemble) |
Key takeaways:
Ensembles almost always outperform single models when diversity exists among members.
Bagging and boosting represent two fundamental approaches: parallel vs. sequential.
Stacking adds a meta-learning layer that learns how to combine base models.
Snapshot ensembles provide ensembles at no extra training cost via cyclic learning rates.
The learning rate schedule is critical for both single-model training and ensemble construction.
Reference: Kashani, S. Deep Learning Interviews, Chapter: NN Ensembles.