Lab 05: Neural Network EnsemblesΒΆ

Source: Deep Learning Interviews by Shlomo Kashani – Chapter: NN Ensembles

PDF: resources/2201.00650v2.pdf

This lab explores ensemble methods – techniques that combine multiple models to produce predictions superior to any single model. We cover:

  1. Bagging – Bootstrap Aggregating to reduce variance

  2. Boosting (AdaBoost) – Sequential training focused on errors

  3. Stacking – Using base model predictions as meta-features

  4. Prediction Combining Strategies – Voting, averaging, weighted averaging

  5. Snapshot Ensembles – Cyclic learning rates to harvest multiple minima

  6. Learning Rate Schedules – Step decay, exponential decay, warmup+cosine

import numpy as np
import matplotlib.pyplot as plt
from typing import List, Callable, Tuple

np.random.seed(42)
plt.rcParams["figure.figsize"] = (10, 5)
plt.rcParams["axes.grid"] = True
plt.rcParams["grid.alpha"] = 0.3
print("Imports loaded successfully.")

PART 1: Bagging (Bootstrap Aggregating)ΒΆ

Key idea: Train multiple models on different bootstrap samples (random samples drawn with replacement from the training set), then average their predictions.

Why it works:

  • Each bootstrap sample is ~63.2% unique data (the rest are duplicates).

  • Different samples lead to different fitted models, introducing diversity.

  • Averaging diverse predictions reduces variance without increasing bias.

  • If each model has variance \(\sigma^2\) and they are uncorrelated, the ensemble variance is \(\sigma^2 / M\) where \(M\) is the number of models.

Bagging is the foundation of Random Forests and many practical ensemble systems.

def bootstrap_sample(X: np.ndarray, y: np.ndarray, seed: int = None) -> Tuple[np.ndarray, np.ndarray]:
    """
    Create a bootstrap sample (sample WITH replacement).
    Returns (X_boot, y_boot) of the same size as the input.
    """
    if seed is not None:
        np.random.seed(seed)
    n = len(X)
    indices = np.random.randint(0, n, size=n)
    return X[indices], y[indices]


# Quick demo
X_demo = np.arange(10)
y_demo = np.arange(10)
X_b, y_b = bootstrap_sample(X_demo, y_demo, seed=42)
print(f"Original X : {X_demo}")
print(f"Bootstrap X: {X_b}")
print(f"Unique samples: {len(np.unique(X_b))}/10  (expected ~63.2%)")
class BaggingRegressor:
    """Bagging ensemble using simple linear models."""

    def __init__(self, n_estimators: int = 10):
        self.n_estimators = n_estimators
        self.models: List[np.ndarray] = []  # each model is a weight vector [w, b]

    def _fit_linear(self, X: np.ndarray, y: np.ndarray) -> np.ndarray:
        """Fit a simple linear model y = w*x + b via least squares."""
        A = np.column_stack([X, np.ones(len(X))])
        w, _, _, _ = np.linalg.lstsq(A, y, rcond=None)
        return w  # shape (2,) -> [slope, intercept]

    def fit(self, X: np.ndarray, y: np.ndarray) -> "BaggingRegressor":
        """Train n_estimators on bootstrap samples."""
        self.models = []
        for i in range(self.n_estimators):
            X_boot, y_boot = bootstrap_sample(X, y, seed=i)
            w = self._fit_linear(X_boot, y_boot)
            self.models.append(w)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Average predictions from all models."""
        A = np.column_stack([X, np.ones(len(X))])
        predictions = np.array([A @ w for w in self.models])  # (n_estimators, n_samples)
        return predictions.mean(axis=0)

    def predict_individual(self, X: np.ndarray) -> np.ndarray:
        """Return individual model predictions (for variance analysis)."""
        A = np.column_stack([X, np.ones(len(X))])
        return np.array([A @ w for w in self.models])


# Demo on noisy sine data
np.random.seed(42)
X_train = np.linspace(0, 5, 50)
y_train = np.sin(X_train) + np.random.randn(50) * 0.3

bag = BaggingRegressor(n_estimators=10).fit(X_train, y_train)
X_test = np.linspace(0, 5, 100)
y_pred = bag.predict(X_test)

plt.figure(figsize=(10, 5))
plt.scatter(X_train, y_train, alpha=0.5, label="Training data (noisy sine)", s=20)
plt.plot(X_test, np.sin(X_test), "g--", label="True sine", linewidth=2)
plt.plot(X_test, y_pred, "r-", label="Bagging ensemble (10 models)", linewidth=2)
plt.xlabel("x")
plt.ylabel("y")
plt.title("Bagging Regressor on Noisy Sine Data")
plt.legend()
plt.tight_layout()
plt.show()
# Show variance reduction: single model vs 10-model ensemble
np.random.seed(42)
X_train = np.linspace(0, 5, 50)
y_train = np.sin(X_train) + np.random.randn(50) * 0.3
X_test = np.linspace(0, 5, 20)

# Train many single models (each on a different bootstrap sample)
n_trials = 50
single_preds = []
for trial in range(n_trials):
    X_b, y_b = bootstrap_sample(X_train, y_train, seed=trial * 100)
    A = np.column_stack([X_b, np.ones(len(X_b))])
    w, _, _, _ = np.linalg.lstsq(A, y_b, rcond=None)
    A_test = np.column_stack([X_test, np.ones(len(X_test))])
    single_preds.append(A_test @ w)
single_preds = np.array(single_preds)

# Train many 10-model ensembles
ensemble_preds = []
for trial in range(n_trials):
    bag = BaggingRegressor(n_estimators=10)
    # Shift seeds so each ensemble is different
    bag_models = []
    for i in range(10):
        X_b, y_b = bootstrap_sample(X_train, y_train, seed=trial * 1000 + i)
        A = np.column_stack([X_b, np.ones(len(X_b))])
        w, _, _, _ = np.linalg.lstsq(A, y_b, rcond=None)
        bag_models.append(w)
    A_test = np.column_stack([X_test, np.ones(len(X_test))])
    preds = np.array([A_test @ w for w in bag_models]).mean(axis=0)
    ensemble_preds.append(preds)
ensemble_preds = np.array(ensemble_preds)

# Compare variance at each test point
single_var = single_preds.var(axis=0)
ensemble_var = ensemble_preds.var(axis=0)

plt.figure(figsize=(10, 5))
plt.bar(np.arange(len(X_test)) - 0.2, single_var, width=0.4, label="Single model variance", alpha=0.8)
plt.bar(np.arange(len(X_test)) + 0.2, ensemble_var, width=0.4, label="Ensemble (10) variance", alpha=0.8)
plt.xlabel("Test point index")
plt.ylabel("Prediction variance")
plt.title("Variance Reduction: Single Model vs. 10-Model Bagging Ensemble")
plt.legend()
plt.tight_layout()
plt.show()

print(f"Mean single-model variance:   {single_var.mean():.6f}")
print(f"Mean ensemble variance:        {ensemble_var.mean():.6f}")
print(f"Variance reduction factor:     {single_var.mean() / ensemble_var.mean():.2f}x")

PART 2: Boosting (AdaBoost)ΒΆ

Key idea: Train models sequentially, where each new model focuses on the examples that previous models got wrong.

AdaBoost algorithm:

  1. Initialize sample weights uniformly: \(w_i = 1/N\)

  2. For each round \(t = 1, \ldots, T\):

    • Fit weak learner \(h_t\) on weighted data

    • Compute weighted error: \(\epsilon_t = \sum_i w_i \cdot \mathbb{1}[h_t(x_i) \neq y_i]\)

    • Compute model weight: \(\alpha_t = \frac{1}{2} \ln\left(\frac{1 - \epsilon_t}{\epsilon_t}\right)\)

    • Update sample weights: \(w_i \leftarrow w_i \cdot \exp(-\alpha_t \cdot y_i \cdot h_t(x_i))\)

    • Normalize weights so they sum to 1

  3. Final prediction: \(H(x) = \text{sign}\left(\sum_t \alpha_t \cdot h_t(x)\right)\)

Unlike bagging, boosting reduces bias and can overfit if run too long.

class SimpleAdaBoost:
    """Simplified AdaBoost with decision stumps on 1D data."""

    def __init__(self, n_estimators: int = 10):
        self.n_estimators = n_estimators
        self.stumps: List[Tuple[float, int]] = []   # (threshold, direction)
        self.alphas: List[float] = []

    def _best_stump(self, X: np.ndarray, y: np.ndarray, weights: np.ndarray
                    ) -> Tuple[float, int, float]:
        """
        Find the best threshold on 1D data to split into +1 / -1.
        Returns (threshold, direction, weighted_error).
        direction=+1 means: predict +1 if x >= threshold, else -1.
        direction=-1 means: predict -1 if x >= threshold, else +1.
        """
        best_err = float("inf")
        best_thresh = 0.0
        best_dir = 1

        thresholds = np.unique(X)
        for thresh in thresholds:
            for direction in [1, -1]:
                preds = np.ones(len(X))
                if direction == 1:
                    preds[X < thresh] = -1
                else:
                    preds[X >= thresh] = -1
                err = np.sum(weights * (preds != y))
                if err < best_err:
                    best_err = err
                    best_thresh = thresh
                    best_dir = direction

        return best_thresh, best_dir, best_err

    def _stump_predict(self, X: np.ndarray, threshold: float, direction: int) -> np.ndarray:
        """Predict with a single stump."""
        preds = np.ones(len(X))
        if direction == 1:
            preds[X < threshold] = -1
        else:
            preds[X >= threshold] = -1
        return preds

    def fit(self, X: np.ndarray, y: np.ndarray) -> "SimpleAdaBoost":
        """
        Full AdaBoost training loop.
        X: 1D array of features
        y: array of labels in {-1, +1}
        """
        N = len(y)
        weights = np.ones(N) / N
        self.stumps = []
        self.alphas = []

        for t in range(self.n_estimators):
            # 1. Find best weak learner
            thresh, direction, err = self._best_stump(X, y, weights)

            # Clamp error to avoid log(0) or division by zero
            err = np.clip(err, 1e-10, 1.0 - 1e-10)

            # 2. Compute model weight
            alpha = 0.5 * np.log((1.0 - err) / err)

            # 3. Get predictions of this stump
            preds = self._stump_predict(X, thresh, direction)

            # 4. Update sample weights
            weights = weights * np.exp(-alpha * y * preds)
            weights = weights / weights.sum()  # normalize

            # Store
            self.stumps.append((thresh, direction))
            self.alphas.append(alpha)

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Weighted vote of all stumps."""
        total = np.zeros(len(X))
        for alpha, (thresh, direction) in zip(self.alphas, self.stumps):
            total += alpha * self._stump_predict(X, thresh, direction)
        return np.sign(total)


print("SimpleAdaBoost class defined.")
# Demo on 1D data with a non-linear decision boundary
# Labels: +1 if |x| > 1, else -1  (two thresholds needed)
np.random.seed(42)
X_ada = np.random.uniform(-3, 3, 200)
y_ada = np.where(np.abs(X_ada) > 1.0, 1.0, -1.0)
# Add some noise: flip 5% of labels
flip_idx = np.random.choice(200, 10, replace=False)
y_ada[flip_idx] *= -1

ada = SimpleAdaBoost(n_estimators=20).fit(X_ada, y_ada)

X_plot = np.linspace(-3, 3, 500)
y_plot = ada.predict(X_plot)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Left: data and decision regions
ax = axes[0]
ax.scatter(X_ada[y_ada == 1], np.zeros_like(X_ada[y_ada == 1]), c="blue", alpha=0.5, label="+1", s=30)
ax.scatter(X_ada[y_ada == -1], np.zeros_like(X_ada[y_ada == -1]) + 0.1, c="red", alpha=0.5, label="-1", s=30)
ax.fill_between(X_plot, -0.2, 0.3, where=(y_plot > 0), alpha=0.15, color="blue", label="Predicted +1")
ax.fill_between(X_plot, -0.2, 0.3, where=(y_plot <= 0), alpha=0.15, color="red", label="Predicted -1")
ax.set_xlabel("x")
ax.set_title("AdaBoost Decision Regions (non-linear boundary)")
ax.legend(loc="upper right", fontsize=8)
ax.set_ylim(-0.2, 0.4)

# Right: alpha values over rounds
ax = axes[1]
ax.bar(range(len(ada.alphas)), ada.alphas, color="steelblue")
ax.set_xlabel("Boosting round")
ax.set_ylabel("Alpha (model weight)")
ax.set_title("AdaBoost Model Weights per Round")

plt.tight_layout()
plt.show()

train_acc = np.mean(ada.predict(X_ada) == y_ada)
print(f"Training accuracy: {train_acc:.1%}")

PART 3: StackingΒΆ

Key idea: Use the predictions of base models as input features for a meta-learner (second-level model).

Algorithm:

  1. Train \(M\) diverse base models on the training set.

  2. Generate predictions from each base model on a held-out validation set.

  3. Stack these \(M\) predictions as new features.

  4. Train a meta-learner on these stacked features to predict the true labels.

The meta-learner discovers which base models to trust and how to combine them, often outperforming any individual base model and even simple averaging.

def stacking_demo():
    """
    Stacking demo:
    - 3 base models (simple threshold classifiers with different thresholds)
    - Meta-learner: simple logistic-style linear classifier trained via gradient descent
    """
    np.random.seed(42)
    N = 300
    X = np.random.randn(N, 1)
    y = (X[:, 0] > 0).astype(float)  # true boundary at x=0

    # Split into train and validation
    split = 200
    X_train, X_val = X[:split], X[split:]
    y_train, y_val = y[:split], y[split:]

    # --- Base models: threshold classifiers with different thresholds ---
    def model_predict(x, threshold):
        return (x[:, 0] > threshold).astype(float)

    thresholds = [0.2, -0.1, 0.5]  # intentionally imperfect
    base_names = [f"Threshold @ {t}" for t in thresholds]

    # Get base model predictions on validation set
    base_preds_val = np.column_stack([
        model_predict(X_val, t) for t in thresholds
    ])  # shape: (100, 3)

    # --- Train meta-learner (logistic regression via gradient descent) ---
    # Get base model predictions on training set for meta-learner training
    base_preds_train = np.column_stack([
        model_predict(X_train, t) for t in thresholds
    ])

    # Add bias column
    Z_train = np.column_stack([base_preds_train, np.ones(split)])
    Z_val = np.column_stack([base_preds_val, np.ones(N - split)])

    # Simple logistic regression via gradient descent
    def sigmoid(z):
        return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

    w_meta = np.zeros(Z_train.shape[1])
    lr = 0.1
    for _ in range(500):
        logits = Z_train @ w_meta
        p = sigmoid(logits)
        grad = Z_train.T @ (p - y_train) / split
        w_meta -= lr * grad

    # Meta-learner predictions on validation
    meta_probs = sigmoid(Z_val @ w_meta)
    meta_preds = (meta_probs > 0.5).astype(float)

    # --- Compare accuracies ---
    print("=" * 50)
    print("Stacking Results on Validation Set")
    print("=" * 50)
    for name, t in zip(base_names, thresholds):
        acc = np.mean(model_predict(X_val, t) == y_val)
        print(f"  {name:20s} accuracy: {acc:.1%}")
    meta_acc = np.mean(meta_preds == y_val)
    print(f"  {'Stacked meta-learner':20s} accuracy: {meta_acc:.1%}")
    print()
    print(f"Meta-learner weights: {w_meta[:-1].round(3)}  bias: {w_meta[-1]:.3f}")
    print("The meta-learner learns to combine base models optimally.")


stacking_demo()

PART 4: Prediction Combining StrategiesΒΆ

Given \(M\) classifiers, three common ways to combine their predictions:

  1. Majority Voting: Each classifier casts a vote; the class with the most votes wins. $\(\hat{y} = \text{mode}\{h_1(x), h_2(x), \ldots, h_M(x)\}\)$

  2. Simple Averaging: Average the predicted probabilities, then threshold. $\(\hat{p}(x) = \frac{1}{M} \sum_{m=1}^M p_m(x), \quad \hat{y} = \mathbb{1}[\hat{p} > 0.5]\)$

  3. Weighted Averaging: Like simple averaging but each classifier gets a weight (e.g., proportional to its validation accuracy). $\(\hat{p}(x) = \sum_{m=1}^M w_m \cdot p_m(x), \quad \sum_m w_m = 1\)$

def combining_strategies():
    """
    5 classifiers on 100 examples.
    Compare majority voting, simple averaging, and weighted averaging.
    """
    np.random.seed(42)
    n_classifiers = 5
    n_examples = 100

    # True labels
    y_true = np.random.randint(0, 2, size=n_examples)

    # Simulate classifiers with different accuracies (60%-90%)
    accuracies = [0.65, 0.70, 0.75, 0.80, 0.90]
    predictions = np.zeros((n_classifiers, n_examples), dtype=int)
    probabilities = np.zeros((n_classifiers, n_examples))

    for i, acc in enumerate(accuracies):
        np.random.seed(42 + i)
        correct_mask = np.random.rand(n_examples) < acc
        predictions[i] = np.where(correct_mask, y_true, 1 - y_true)
        # Probabilities: correct class gets high prob, wrong gets low
        base_prob = np.random.uniform(0.5, 0.95, n_examples)
        probabilities[i] = np.where(
            predictions[i] == 1,
            base_prob,
            1.0 - base_prob
        )

    # --- Strategy 1: Majority Voting ---
    def majority_vote(preds: np.ndarray) -> np.ndarray:
        """Return the class with the most votes for each example."""
        vote_sum = preds.sum(axis=0)  # count of 1's
        return (vote_sum > n_classifiers / 2).astype(int)

    # --- Strategy 2: Simple Averaging ---
    def simple_average(probs: np.ndarray, threshold: float = 0.5) -> np.ndarray:
        """Average probabilities, then threshold."""
        avg_prob = probs.mean(axis=0)
        return (avg_prob > threshold).astype(int)

    # --- Strategy 3: Weighted Averaging ---
    def weighted_average(probs: np.ndarray, weights: np.ndarray,
                         threshold: float = 0.5) -> np.ndarray:
        """Weighted average of probabilities."""
        w = weights / weights.sum()  # normalize
        weighted_prob = (w[:, None] * probs).sum(axis=0)
        return (weighted_prob > threshold).astype(int)

    # Weights proportional to accuracy
    weights = np.array(accuracies)

    # Compute predictions
    mv_preds = majority_vote(predictions)
    sa_preds = simple_average(probabilities)
    wa_preds = weighted_average(probabilities, weights)

    # Compute accuracies
    print("=" * 55)
    print("Prediction Combining Strategies")
    print("=" * 55)
    print("\nIndividual classifier accuracies:")
    for i, acc in enumerate(accuracies):
        actual_acc = np.mean(predictions[i] == y_true)
        print(f"  Classifier {i+1} (target {acc:.0%}): {actual_acc:.1%}")

    print("\nCombining strategies:")
    print(f"  Majority Voting:    {np.mean(mv_preds == y_true):.1%}")
    print(f"  Simple Averaging:   {np.mean(sa_preds == y_true):.1%}")
    print(f"  Weighted Averaging: {np.mean(wa_preds == y_true):.1%}")

    # Bar chart
    labels = [f"C{i+1}" for i in range(n_classifiers)] + ["Majority\nVote", "Simple\nAvg", "Weighted\nAvg"]
    accs = [np.mean(predictions[i] == y_true) for i in range(n_classifiers)]
    accs += [np.mean(mv_preds == y_true), np.mean(sa_preds == y_true), np.mean(wa_preds == y_true)]
    colors = ["#7fbbda"] * n_classifiers + ["#2a6496", "#d9534f", "#5cb85c"]

    plt.figure(figsize=(10, 5))
    bars = plt.bar(labels, accs, color=colors, edgecolor="black", linewidth=0.5)
    plt.ylabel("Accuracy")
    plt.title("Individual Classifiers vs. Combining Strategies")
    plt.ylim(0.5, 1.0)
    for bar, a in zip(bars, accs):
        plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.005,
                 f"{a:.0%}", ha="center", va="bottom", fontsize=9)
    plt.tight_layout()
    plt.show()


combining_strategies()

PART 5: Snapshot EnsemblesΒΆ

Key insight from Huang et al. (2017): A single training run with a cyclic learning rate can visit multiple local minima. By saving a snapshot of the model at the end of each cycle (when LR is at its minimum), we get an ensemble for free – no extra training cost.

Cosine annealing schedule: $\(\text{lr}(t) = \text{lr}_{\min} + \frac{1}{2}(\text{lr}_{\max} - \text{lr}_{\min})\left(1 + \cos\left(\frac{\pi t}{T}\right)\right)\)$

where \(T\) is the cycle length. At \(t=0\) the LR is at maximum (exploration), at \(t=T\) it drops to minimum (convergence to a local minimum).

def cosine_annealing_lr(epoch: int, T: int, lr_max: float = 0.1,
                         lr_min: float = 0.001) -> float:
    """
    Cosine annealing learning rate schedule.
    lr = lr_min + 0.5 * (lr_max - lr_min) * (1 + cos(pi * t / T))
    """
    return lr_min + 0.5 * (lr_max - lr_min) * (1 + np.cos(np.pi * epoch / T))


# Verify
print("Cosine annealing LR values:")
for epoch in [0, 25, 50, 75, 100]:
    lr = cosine_annealing_lr(epoch, T=100)
    print(f"  Epoch {epoch:3d}: lr = {lr:.6f}")
# Plot cyclic LR with 3 cycles over 300 epochs
total_epochs = 300
T = 100  # cycle length

lrs = []
for epoch in range(total_epochs):
    t = epoch % T
    lrs.append(cosine_annealing_lr(t, T, lr_max=0.1, lr_min=0.001))

plt.figure(figsize=(10, 4))
plt.plot(range(total_epochs), lrs, color="steelblue", linewidth=1.5)

# Mark snapshot points (end of each cycle)
snapshot_epochs = [T - 1, 2 * T - 1, 3 * T - 1]
snapshot_lrs = [lrs[e] for e in snapshot_epochs]
plt.scatter(snapshot_epochs, snapshot_lrs, color="red", zorder=5, s=80,
            label="Snapshot (save model)")

plt.xlabel("Epoch")
plt.ylabel("Learning Rate")
plt.title("Cosine Annealing with Warm Restarts (3 Cycles)")
plt.legend()
plt.tight_layout()
plt.show()
# Simulate snapshot ensemble on f(x) = x^4 - 3x^2
# This function has local minima near x = -1.22 and x = +1.22

f = lambda x: x**4 - 3 * x**2
f_grad = lambda x: 4 * x**3 - 6 * x

n_cycles = 5
steps_per_cycle = 100
snapshots = []
trajectory = []

x = 3.0  # start far from minima
for cycle in range(n_cycles):
    for step in range(steps_per_cycle):
        lr = cosine_annealing_lr(step, steps_per_cycle, lr_max=0.05, lr_min=0.001)
        grad = f_grad(x)
        x = x - lr * grad
        trajectory.append(x)
    snapshots.append(x)
    # Warm restart: perturb x to explore different region
    x = x + np.random.randn() * 1.5

# Plot the function, trajectory, and snapshots
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Left: loss landscape with snapshots
ax = axes[0]
x_range = np.linspace(-3, 3, 300)
ax.plot(x_range, f(x_range), "k-", linewidth=2, label="$f(x) = x^4 - 3x^2$")
for i, snap in enumerate(snapshots):
    ax.plot(snap, f(snap), "o", markersize=10, label=f"Snapshot {i+1}: x={snap:.3f}")
ax.set_xlabel("x")
ax.set_ylabel("f(x)")
ax.set_title("Snapshot Ensemble: Finding Multiple Minima")
ax.legend(fontsize=8)

# Right: trajectory over training steps
ax = axes[1]
ax.plot(trajectory, linewidth=0.8, alpha=0.8)
for i in range(n_cycles):
    step_idx = (i + 1) * steps_per_cycle - 1
    ax.axvline(step_idx, color="red", linestyle="--", alpha=0.5)
    ax.plot(step_idx, trajectory[step_idx], "ro", markersize=8)
ax.set_xlabel("Training step")
ax.set_ylabel("x (parameter value)")
ax.set_title("Parameter Trajectory with Cyclic Restarts")

plt.tight_layout()
plt.show()

print("Snapshots found at:")
for i, snap in enumerate(snapshots):
    print(f"  Snapshot {i+1}: x = {snap:.4f}, f(x) = {f(snap):.4f}")
print(f"\nTrue minima at x = +/-{np.sqrt(1.5):.4f}, f = {f(np.sqrt(1.5)):.4f}")

PART 6: Learning Rate SchedulesΒΆ

The learning rate schedule profoundly affects training dynamics and ensemble quality. Three common schedules:

  1. Step Decay: Drop the LR by a constant factor every \(k\) epochs. $\(\text{lr}(t) = \text{lr}_0 \times \text{drop}^{\lfloor t / k \rfloor}\)$

  2. Exponential Decay: Smooth exponential reduction. $\(\text{lr}(t) = \text{lr}_0 \times \gamma^t\)$

  3. Warmup + Cosine: Linear warmup for the first few epochs, then cosine decay. $\(\text{lr}(t) = \begin{cases} \text{lr}_{\max} \cdot t / T_w & \text{if } t < T_w \\ \frac{\text{lr}_{\max}}{2}\left(1 + \cos\left(\frac{\pi(t - T_w)}{T - T_w}\right)\right) & \text{otherwise} \end{cases}\)$

For ensembles, cyclic and warm-restart schedules are especially useful because they allow snapshot ensembling (Part 5).

def step_decay(epoch: int, initial_lr: float = 0.1, drop: float = 0.5,
               epochs_drop: int = 30) -> float:
    """LR drops by factor 'drop' every 'epochs_drop' epochs."""
    return initial_lr * (drop ** (epoch // epochs_drop))


def exponential_decay(epoch: int, initial_lr: float = 0.1,
                       decay_rate: float = 0.95) -> float:
    """LR = initial_lr * decay_rate^epoch."""
    return initial_lr * (decay_rate ** epoch)


def warmup_cosine(epoch: int, warmup_epochs: int = 10, total_epochs: int = 200,
                   lr_max: float = 0.1) -> float:
    """Linear warmup for warmup_epochs, then cosine decay to 0."""
    if epoch < warmup_epochs:
        # Linear warmup
        return lr_max * epoch / warmup_epochs
    else:
        # Cosine decay
        progress = (epoch - warmup_epochs) / (total_epochs - warmup_epochs)
        return lr_max * 0.5 * (1 + np.cos(np.pi * progress))


# Quick verification
print("Step Decay:")
for e in [0, 29, 30, 59, 60]:
    print(f"  Epoch {e:3d}: lr = {step_decay(e):.6f}")

print("\nExponential Decay:")
for e in [0, 10, 50, 100, 199]:
    print(f"  Epoch {e:3d}: lr = {exponential_decay(e):.6f}")

print("\nWarmup + Cosine:")
for e in [0, 5, 10, 100, 199]:
    print(f"  Epoch {e:3d}: lr = {warmup_cosine(e):.6f}")
# Plot all 3 schedules for 200 epochs on the same axes
epochs = np.arange(200)

lr_step = [step_decay(e) for e in epochs]
lr_exp = [exponential_decay(e) for e in epochs]
lr_warmup_cos = [warmup_cosine(e, warmup_epochs=10, total_epochs=200) for e in epochs]

plt.figure(figsize=(10, 5))
plt.plot(epochs, lr_step, label="Step Decay (drop=0.5, every 30 epochs)",
         linewidth=2, linestyle="-")
plt.plot(epochs, lr_exp, label="Exponential Decay (rate=0.95)",
         linewidth=2, linestyle="--")
plt.plot(epochs, lr_warmup_cos, label="Warmup (10 ep) + Cosine Decay",
         linewidth=2, linestyle="-.")
plt.xlabel("Epoch")
plt.ylabel("Learning Rate")
plt.title("Comparison of Learning Rate Schedules (200 Epochs)")
plt.legend()
plt.tight_layout()
plt.show()

SummaryΒΆ

Method

Key Idea

Reduces

Bagging

Train on bootstrap samples, average predictions

Variance

Boosting

Sequential training, focus on errors

Bias

Stacking

Use base predictions as meta-features

Both (via meta-learner)

Voting/Averaging

Combine final predictions directly

Variance

Snapshot Ensembles

Cyclic LR + save at minima

Variance (free ensemble)

Key takeaways:

  • Ensembles almost always outperform single models when diversity exists among members.

  • Bagging and boosting represent two fundamental approaches: parallel vs. sequential.

  • Stacking adds a meta-learning layer that learns how to combine base models.

  • Snapshot ensembles provide ensembles at no extra training cost via cyclic learning rates.

  • The learning rate schedule is critical for both single-model training and ensemble construction.

Reference: Kashani, S. Deep Learning Interviews, Chapter: NN Ensembles.