Lab 06: Large Language ModelsΒΆ

Source: Speech & Language Processing by Jurafsky/Martin (3rd ed. draft) – Chapters 7, 9, 10
PDF: resources/ed3book_jan26.pdf

This lab covers the core concepts behind modern Large Language Models:

Part

Topic

SLP Reference

1

Text Generation Strategies

Ch 7.4 – Greedy, Temperature, Top-k, Top-p

2

Autoregressive Generation

Ch 7.2–7.4 – Causal LM decoding loop

3

Masked Language Model (BERT-style)

Ch 9.2 – MLM training objective

4

Fine-Tuning for Classification

Ch 9.4 – [CLS] token classifier head

5

Prompting and In-Context Learning

Ch 7.3 – Zero/few-shot, chain-of-thought

6

RLHF Concepts

Ch 10.2 – Reward models, preference learning

import numpy as np
from typing import List, Dict, Tuple, Callable, Optional

print("Imports loaded successfully.")

PART 1: Text Generation StrategiesΒΆ

Reference: SLP Ch 7.4 – Generation and Sampling

A language model produces a logit vector \(\mathbf{u}\) of shape \([1 \times |V|]\) giving a raw score for every token in the vocabulary. The softmax converts logits into a probability distribution \(\mathbf{y} = \text{softmax}(\mathbf{u})\). Different decoding strategies then select the next token from this distribution:

Strategy

Idea

Greedy

Always pick the highest-probability token (argmax). Deterministic but repetitive.

Temperature

Divide logits by \(\tau\) before softmax. \(\tau < 1\) sharpens, \(\tau > 1\) flattens.

Top-k

Keep only the \(k\) highest-probability tokens, renormalize, then sample.

Top-p (nucleus)

Keep the smallest set of tokens whose cumulative probability \(\ge p\), renormalize, then sample.

def softmax(logits: np.ndarray) -> np.ndarray:
    """Numerically stable softmax (SLP Eq 7.1).
    
    Subtracts the max before exponentiating to avoid overflow.
    """
    shifted = logits - np.max(logits)
    exp_z = np.exp(shifted)
    return exp_z / np.sum(exp_z)


# Quick sanity check
test_logits = np.array([2.0, 1.0, 0.1])
probs = softmax(test_logits)
print(f"Logits:       {test_logits}")
print(f"Probabilities: {probs}")
print(f"Sum:           {probs.sum():.6f}")
assert np.isclose(probs.sum(), 1.0), "Probabilities must sum to 1"
def greedy_decode(logits: np.ndarray) -> int:
    """Greedy decoding: select the token with the highest logit (SLP Eq 7.2).
    
    w_hat = argmax_{w in V} P(w | context)
    
    Deterministic -- always returns the same token for the same logits.
    """
    return int(np.argmax(logits))


# Demo
demo_logits = np.array([5.0, 3.0, 2.0, 1.0])
demo_vocab = ["the", "a", "cat", "dog"]
chosen = greedy_decode(demo_logits)
print(f"Logits: {demo_logits}")
print(f"Greedy choice: index={chosen}, token='{demo_vocab[chosen]}'")
def temperature_sample(logits: np.ndarray, temperature: float = 1.0) -> int:
    """Temperature sampling (SLP Eq 7.4).
    
    y = softmax(u / tau)
    
    - tau < 1  -> sharpens distribution (more deterministic)
    - tau = 1  -> standard softmax sampling
    - tau > 1  -> flattens distribution (more random)
    """
    adjusted = logits / temperature
    probs = softmax(adjusted)
    return int(np.random.choice(len(probs), p=probs))


# Demo: low vs high temperature
np.random.seed(42)
demo_logits = np.array([5.0, 3.0, 2.0, 1.0])
demo_vocab = ["the", "a", "cat", "dog"]

for T in [0.1, 0.5, 1.0, 2.0]:
    samples = [demo_vocab[temperature_sample(demo_logits, T)] for _ in range(20)]
    print(f"T={T:<4} -> {samples}")
def top_k_sample(logits: np.ndarray, k: int = 10) -> int:
    """Top-k sampling (SLP Section 8.6.1).
    
    1. Find the top-k logits.
    2. Set all other logits to -inf.
    3. Softmax over the modified logits.
    4. Sample from the resulting distribution.
    
    When k=1 this is identical to greedy decoding.
    """
    k = min(k, len(logits))  # safety
    # Find the k-th largest value as the cutoff
    top_k_indices = np.argsort(logits)[-k:]
    filtered = np.full_like(logits, -np.inf)
    filtered[top_k_indices] = logits[top_k_indices]
    probs = softmax(filtered)
    return int(np.random.choice(len(probs), p=probs))


# Demo
np.random.seed(42)
demo_logits = np.array([5.0, 3.0, 2.0, 1.0, 0.5, -1.0])
demo_vocab = ["the", "a", "cat", "dog", "sat", "on"]

for k_val in [1, 2, 3, 6]:
    samples = [demo_vocab[top_k_sample(demo_logits, k_val)] for _ in range(15)]
    print(f"k={k_val} -> {samples}")
def top_p_sample(logits: np.ndarray, p: float = 0.9) -> int:
    """Top-p (nucleus) sampling (SLP Section 8.6.2).
    
    The top-p vocabulary V^(p) is the smallest set of tokens such that:
        sum_{w in V^(p)} P(w | context) >= p
    
    Steps:
    1. Compute softmax probabilities.
    2. Sort tokens by probability (descending).
    3. Find the cutoff where cumulative sum >= p.
    4. Zero out tokens below the cutoff.
    5. Renormalize and sample.
    """
    probs = softmax(logits)
    # Sort indices by descending probability
    sorted_indices = np.argsort(probs)[::-1]
    sorted_probs = probs[sorted_indices]
    
    # Find cumulative sum cutoff
    cumsum = np.cumsum(sorted_probs)
    # Keep tokens up to and including the first one that pushes cumsum >= p
    cutoff_idx = np.searchsorted(cumsum, p) + 1  # +1 to include the boundary token
    cutoff_idx = min(cutoff_idx, len(probs))  # safety
    
    # Zero out everything below the cutoff
    allowed_indices = sorted_indices[:cutoff_idx]
    filtered_probs = np.zeros_like(probs)
    filtered_probs[allowed_indices] = probs[allowed_indices]
    
    # Renormalize
    filtered_probs = filtered_probs / filtered_probs.sum()
    
    return int(np.random.choice(len(filtered_probs), p=filtered_probs))


# Demo
np.random.seed(42)
demo_logits = np.array([5.0, 3.0, 2.0, 1.0, 0.5, -1.0])
demo_vocab = ["the", "a", "cat", "dog", "sat", "on"]

for p_val in [0.5, 0.8, 0.95, 1.0]:
    samples = [demo_vocab[top_p_sample(demo_logits, p_val)] for _ in range(15)]
    print(f"p={p_val:<4} -> {samples}")
def compare_sampling_strategies():
    """Compare all strategies on the same logits, 100 runs each."""
    np.random.seed(42)
    logits = np.array([5.0, 3.0, 2.0, 1.0, 0.5, 0.1, -1.0, -2.0, -3.0, -5.0])
    vocab = ["the", "a", "cat", "dog", "sat", "on", "big", "red", "very", "zzz"]
    n_runs = 100

    strategies = {
        "Greedy":          lambda: greedy_decode(logits),
        "Temp=0.5":        lambda: temperature_sample(logits, 0.5),
        "Temp=1.0":        lambda: temperature_sample(logits, 1.0),
        "Temp=2.0":        lambda: temperature_sample(logits, 2.0),
        "Top-k (k=3)":     lambda: top_k_sample(logits, 3),
        "Top-k (k=5)":     lambda: top_k_sample(logits, 5),
        "Top-p (p=0.7)":   lambda: top_p_sample(logits, 0.7),
        "Top-p (p=0.95)":  lambda: top_p_sample(logits, 0.95),
    }

    # Show softmax probabilities first
    probs = softmax(logits)
    print("Logits and softmax probabilities:")
    print(f"  {'Token':<6} {'Logit':>6} {'P':>8}")
    print(f"  {'-'*6} {'-'*6} {'-'*8}")
    for i, (tok, lo, pr) in enumerate(zip(vocab, logits, probs)):
        print(f"  {tok:<6} {lo:>6.1f} {pr:>8.4f}")
    print()

    for name, strategy_fn in strategies.items():
        counts: Dict[str, int] = {}
        for _ in range(n_runs):
            token_idx = strategy_fn()
            token = vocab[token_idx]
            counts[token] = counts.get(token, 0) + 1
        # Sort by count descending
        sorted_counts = sorted(counts.items(), key=lambda x: -x[1])
        dist_str = ", ".join(f"{tok}:{cnt}" for tok, cnt in sorted_counts)
        print(f"  {name:<18} -> {dist_str}")


compare_sampling_strategies()

PART 2: Autoregressive GenerationΒΆ

Reference: SLP Ch 7.2–7.4

In autoregressive (or causal) generation, the model predicts one token at a time, conditioned on all previous tokens. Each newly generated token is appended to the context, and the process repeats:

\[ w_i \sim p(w_i \mid w_{<i}) \]

Generation stops when a special end-of-sequence token is produced or a maximum length is reached.

def generate_text_autoregressive(
    model_fn: Callable[[List[int]], np.ndarray],
    vocab: List[str],
    prompt_ids: List[int],
    max_new_tokens: int = 20,
    strategy: str = "greedy",
    temperature: float = 1.0,
    top_k: int = 10,
) -> List[int]:
    """Generate text autoregressively.
    
    Args:
        model_fn: function(token_ids) -> logits for next token
        vocab: list of token strings
        prompt_ids: initial token ids
        max_new_tokens: max tokens to generate
        strategy: 'greedy', 'temperature', or 'top_k'
        temperature: temperature parameter (used when strategy='temperature')
        top_k: k parameter (used when strategy='top_k')
    
    Returns:
        Full list of token ids (prompt + generated)
    """
    generated = list(prompt_ids)

    for _ in range(max_new_tokens):
        logits = model_fn(generated)

        if strategy == "greedy":
            next_token = greedy_decode(logits)
        elif strategy == "temperature":
            next_token = temperature_sample(logits, temperature)
        elif strategy == "top_k":
            next_token = top_k_sample(logits, top_k)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")

        generated.append(next_token)

        # Stop at end-of-sequence token (id 0 by convention)
        if next_token == 0:
            break

    return generated


print("generate_text_autoregressive() defined.")
def simple_language_model():
    """Build a bigram language model and generate text with different strategies.
    
    The bigram model uses a transition matrix where row i gives the logits
    for P(next_token | current_token = vocab[i]).
    """
    vocab = ["<eos>", "the", "cat", "sat", "on", "mat", "dog", "ran", "big", "a"]
    V = len(vocab)

    # Transition logits: transitions[i] = logits for next token given word i
    transitions = np.zeros((V, V))
    #                eos  the  cat  sat  on  mat  dog  ran  big   a
    transitions[1] = [0,   0,   3,   0,  0,   0,   2,   0,   1,  0]   # after "the"
    transitions[2] = [0,   0,   0,   3,  0,   0,   0,   1,   0,  0]   # after "cat"
    transitions[3] = [0,   0,   0,   0,  3,   0,   0,   0,   0,  0]   # after "sat"
    transitions[4] = [0,   2,   0,   0,  0,   2,   0,   0,   0,  1]   # after "on"
    transitions[5] = [3,   0,   0,   0,  0,   0,   0,   0,   0,  0]   # after "mat"
    transitions[6] = [0,   0,   0,   1,  0,   0,   0,   2,   0,  0]   # after "dog"
    transitions[7] = [1,   0,   0,   0,  1,   0,   0,   0,   0,  0]   # after "ran"
    transitions[8] = [0,   0,   2,   0,  0,   0,   2,   0,   0,  0]   # after "big"
    transitions[9] = [0,   0,   2,   0,  0,   1,   1,   0,   1,  0]   # after "a"

    def model_fn(token_ids: List[int]) -> np.ndarray:
        """Return logits for the next token given the last token (bigram)."""
        last_token = token_ids[-1]
        return transitions[last_token]

    # Generate with different strategies from prompt "the" (id=1)
    prompt = [1]
    print("Bigram Language Model Generation")
    print(f"Vocabulary: {vocab}")
    print(f"Prompt: '{vocab[prompt[0]]}'")
    print()

    # Greedy -- deterministic
    ids = generate_text_autoregressive(model_fn, vocab, prompt, max_new_tokens=10,
                                       strategy="greedy")
    text = " ".join(vocab[i] for i in ids)
    print(f"  Greedy:          {text}")

    # Temperature sampling -- several runs
    np.random.seed(0)
    for T in [0.5, 1.0, 2.0]:
        results = []
        for _ in range(5):
            ids = generate_text_autoregressive(model_fn, vocab, prompt,
                                               max_new_tokens=10,
                                               strategy="temperature",
                                               temperature=T)
            text = " ".join(vocab[i] for i in ids)
            results.append(text)
        print(f"  Temp={T:<4}:       {results[0]}")
        for r in results[1:]:
            print(f"                    {r}")
        print()

    # Top-k sampling
    np.random.seed(42)
    for k_val in [2, 3]:
        results = []
        for _ in range(3):
            ids = generate_text_autoregressive(model_fn, vocab, prompt,
                                               max_new_tokens=10,
                                               strategy="top_k", top_k=k_val)
            text = " ".join(vocab[i] for i in ids)
            results.append(text)
        print(f"  Top-k={k_val}:       {results}")


simple_language_model()

PART 3: Masked Language Model (BERT-style)ΒΆ

Reference: SLP Ch 9.2 – Training Bidirectional Encoders

Instead of predicting the next token left-to-right, a Masked Language Model (MLM) masks some tokens and predicts them from bidirectional context.

The BERT masking strategy selects 15% of tokens and then:

  • 80% of the time: replace with [MASK]

  • 10% of the time: replace with a random token from the vocabulary

  • 10% of the time: keep the original token unchanged

The loss \(L_{MLM}\) is the cross-entropy computed only at masked positions:

\[ L_{MLM} = -\frac{1}{|M|} \sum_{i \in M} \log P(x_i \mid \mathbf{h}_i^L) \]

where \(M\) is the set of masked token indices.

def create_mlm_input(
    tokens: List[int],
    vocab_size: int,
    mask_token_id: int,
    mask_prob: float = 0.15,
) -> Tuple[List[int], List[int]]:
    """Create MLM training input following the BERT masking strategy.
    
    For each token selected for masking (with probability mask_prob):
      - 80%: replace with [MASK] token
      - 10%: replace with a random token from the vocabulary
      - 10%: keep the original token
    
    Args:
        tokens: original token ids
        vocab_size: total vocabulary size
        mask_token_id: id of the [MASK] special token
        mask_prob: probability of selecting each token for masking
    
    Returns:
        (masked_tokens, labels) where labels[i] = original token if
        position i was selected, else -1 (ignore in loss).
    """
    masked = list(tokens)
    labels = [-1] * len(tokens)

    for i in range(len(tokens)):
        if np.random.random() < mask_prob:
            labels[i] = tokens[i]  # remember original for the loss
            r = np.random.random()
            if r < 0.8:
                # 80%: replace with [MASK]
                masked[i] = mask_token_id
            elif r < 0.9:
                # 10%: replace with random token
                masked[i] = np.random.randint(0, vocab_size)
            # else 10%: keep original (do nothing)

    return masked, labels


print("create_mlm_input() defined.")
def mlm_loss(logits: np.ndarray, labels: np.ndarray) -> float:
    """Compute cross-entropy loss only at masked positions.
    
    Args:
        logits: shape (seq_len, vocab_size) -- raw scores from the model
        labels: shape (seq_len,) -- original token id at masked positions,
                -1 at non-masked positions (ignored)
    
    Returns:
        Average cross-entropy loss over masked positions.
    """
    seq_len = logits.shape[0]
    total_loss = 0.0
    count = 0

    for i in range(seq_len):
        if labels[i] == -1:
            continue  # skip non-masked positions
        # Compute softmax probabilities for this position
        probs = softmax(logits[i])
        # Cross-entropy: -log P(correct_token)
        correct_token = int(labels[i])
        # Clamp to avoid log(0)
        prob = max(probs[correct_token], 1e-12)
        total_loss += -np.log(prob)
        count += 1

    if count == 0:
        return 0.0
    return total_loss / count


# Quick test
test_logits = np.array([[2.0, 0.5, 0.1], [0.1, 2.0, 0.5], [0.5, 0.1, 2.0]])
test_labels = np.array([0, -1, 2])  # masked at positions 0 and 2
loss = mlm_loss(test_logits, test_labels)
print(f"MLM loss (should be small since logits favour correct tokens): {loss:.4f}")
def mlm_demo():
    """Demo: mask a sentence and show original vs masked vs labels."""
    np.random.seed(42)

    # Toy vocabulary
    token_to_id = {
        "[PAD]": 0, "[MASK]": 1, "[CLS]": 2, "[SEP]": 3,
        "the": 4, "cat": 5, "sat": 6, "on": 7, "mat": 8,
        "dog": 9, "ran": 10, "big": 11, "red": 12, "a": 13,
    }
    id_to_token = {v: k for k, v in token_to_id.items()}
    vocab_size = len(token_to_id)
    mask_token_id = token_to_id["[MASK]"]

    # Original sentence
    sentence = ["[CLS]", "the", "big", "cat", "sat", "on", "the", "red", "mat", "[SEP]"]
    token_ids = [token_to_id[w] for w in sentence]

    masked_ids, labels = create_mlm_input(token_ids, vocab_size, mask_token_id,
                                           mask_prob=0.15)

    print("MLM Masking Demo")
    print(f"  Original: {' '.join(sentence)}")
    print(f"  Masked:   {' '.join(id_to_token[i] for i in masked_ids)}")
    print(f"  Labels:   {['_' if l == -1 else id_to_token[l] for l in labels]}")
    print()
    
    # Show positions that were selected for masking
    masked_positions = [i for i, l in enumerate(labels) if l != -1]
    if masked_positions:
        print(f"  Masked positions: {masked_positions}")
        for pos in masked_positions:
            orig = id_to_token[token_ids[pos]]
            new = id_to_token[masked_ids[pos]]
            print(f"    Position {pos}: '{orig}' -> '{new}'  (label='{id_to_token[labels[pos]]}')") 
    else:
        print("  (No tokens were selected for masking this run -- try a different seed)")

    # Compute loss with mock logits
    mock_logits = np.random.randn(len(token_ids), vocab_size)
    loss = mlm_loss(mock_logits, np.array(labels))
    print(f"\n  Mock MLM loss (random logits): {loss:.4f}")
    print(f"  Expected for random: ~-ln(1/{vocab_size}) = {np.log(vocab_size):.4f}")


mlm_demo()

PART 4: Fine-Tuning for ClassificationΒΆ

Reference: SLP Ch 9.4 – Fine-Tuning for Classification

The power of pretrained models lies in transfer learning: the pretrained representations can be adapted to downstream tasks with a small amount of labeled data.

For sequence classification (e.g., sentiment analysis), BERT prepends a special [CLS] token to the input. The output vector \(\mathbf{h}_{CLS}^L\) from the final transformer layer serves as the input to a classifier head:

\[ \mathbf{y} = \text{softmax}(\mathbf{h}_{CLS}^L \, \mathbf{W}_C) \]

where \(\mathbf{W}_C\) is of size \([d \times k]\) (\(d\) = model dimension, \(k\) = number of classes).

def classification_head(
    cls_embedding: np.ndarray,
    W: np.ndarray,
    b: np.ndarray,
) -> np.ndarray:
    """Classification from [CLS] token embedding (SLP Eq 9.11).
    
    Args:
        cls_embedding: shape (d_model,) -- the [CLS] output vector
        W: shape (d_model, n_classes) -- classification weights
        b: shape (n_classes,) -- classification bias
    
    Returns:
        logits: shape (n_classes,)
    """
    return cls_embedding @ W + b


# Quick test
d, k = 8, 3
np.random.seed(0)
test_emb = np.random.randn(d)
test_W = np.random.randn(d, k) * 0.1
test_b = np.zeros(k)
logits = classification_head(test_emb, test_W, test_b)
probs = softmax(logits)
print(f"CLS embedding shape: {test_emb.shape}")
print(f"Logits: {logits}")
print(f"Probabilities: {probs}")
print(f"Predicted class: {np.argmax(probs)}")
def fine_tuning_pipeline():
    """Simulate the BERT fine-tuning process for classification.
    
    Steps:
    1. Create mock 'pretrained' [CLS] embeddings.
    2. Generate labels correlated with embedding dimensions.
    3. Split into train/test.
    4. Train a linear classification head with gradient descent.
    5. Evaluate accuracy.
    """
    np.random.seed(42)
    d_model = 64
    n_classes = 3
    n_samples = 200

    # --- Step 1: Mock "pretrained" CLS embeddings ---
    cls_embeddings = np.random.randn(n_samples, d_model)

    # --- Step 2: Generate labels correlated with embedding ---
    # Use a hidden linear mapping + noise to create class labels
    true_W = np.random.randn(d_model, n_classes) * 0.1
    true_logits = cls_embeddings @ true_W
    labels = np.argmax(true_logits + np.random.randn(n_samples, n_classes) * 0.3, axis=1)

    # --- Step 3: Train/test split (80/20) ---
    split = int(0.8 * n_samples)
    X_train, X_test = cls_embeddings[:split], cls_embeddings[split:]
    y_train, y_test = labels[:split], labels[split:]

    # --- Step 4: Train a linear head with gradient descent ---
    W = np.random.randn(d_model, n_classes) * 0.01
    b = np.zeros(n_classes)
    lr = 0.01
    n_epochs = 200
    batch_size = 32

    train_losses = []
    for epoch in range(n_epochs):
        # Shuffle training data
        perm = np.random.permutation(split)
        epoch_loss = 0.0

        for start in range(0, split, batch_size):
            idx = perm[start:start + batch_size]
            X_batch = X_train[idx]
            y_batch = y_train[idx]
            bs = len(idx)

            # Forward pass
            logits = X_batch @ W + b  # (bs, n_classes)
            # Softmax (row-wise)
            shifted = logits - logits.max(axis=1, keepdims=True)
            exp_logits = np.exp(shifted)
            probs = exp_logits / exp_logits.sum(axis=1, keepdims=True)

            # Cross-entropy loss
            correct_probs = probs[np.arange(bs), y_batch]
            loss = -np.mean(np.log(np.maximum(correct_probs, 1e-12)))
            epoch_loss += loss * bs

            # Backward pass (gradient of softmax cross-entropy)
            grad_logits = probs.copy()  # (bs, n_classes)
            grad_logits[np.arange(bs), y_batch] -= 1.0
            grad_logits /= bs

            grad_W = X_batch.T @ grad_logits  # (d_model, n_classes)
            grad_b = grad_logits.sum(axis=0)    # (n_classes,)

            # Update
            W -= lr * grad_W
            b -= lr * grad_b

        epoch_loss /= split
        train_losses.append(epoch_loss)

    # --- Step 5: Evaluate ---
    # Train accuracy
    train_logits = X_train @ W + b
    train_preds = np.argmax(train_logits, axis=1)
    train_acc = np.mean(train_preds == y_train)

    # Test accuracy
    test_logits = X_test @ W + b
    test_preds = np.argmax(test_logits, axis=1)
    test_acc = np.mean(test_preds == y_test)

    print("Fine-Tuning Pipeline Results")
    print(f"  Dataset: {n_samples} samples, d_model={d_model}, {n_classes} classes")
    print(f"  Train/Test split: {split}/{n_samples - split}")
    print(f"  Training epochs: {n_epochs}, lr={lr}, batch_size={batch_size}")
    print(f"  Final train loss: {train_losses[-1]:.4f}")
    print(f"  Train accuracy: {train_acc:.2%}")
    print(f"  Test accuracy:  {test_acc:.2%}")
    print(f"  Loss over training (first 5 epochs): {[f'{l:.3f}' for l in train_losses[:5]]}")
    print(f"  Loss over training (last 5 epochs):  {[f'{l:.3f}' for l in train_losses[-5:]]}")


fine_tuning_pipeline()

PART 5: Prompting and In-Context LearningΒΆ

Reference: SLP Ch 7.3 – Prompting

Rather than fine-tuning model weights, we can use prompting to steer an LLM toward a task. A prompt is the text string given to the model as context for conditional generation.

Style

Description

Zero-shot

Only the task instruction, no examples.

One-shot

One labeled example (demonstration) before the query.

Few-shot

Multiple demonstrations before the query.

This kind of task-specific learning at inference time – without any gradient updates – is called in-context learning (Brown et al., 2020).

def prompting_demo():
    """Construct zero-shot, one-shot, and few-shot prompts for sentiment classification."""

    task = "Classify the sentiment of the following review."

    # --- Zero-shot ---
    zero_shot = (
        f"{task}\n"
        f"\n"
        f'Review: "This movie was amazing!"\n'
        f"Sentiment:"
    )

    # --- One-shot ---
    one_shot = (
        f"{task}\n"
        f"\n"
        f'Review: "I loved every minute of it"\n'
        f"Sentiment: Positive\n"
        f"\n"
        f'Review: "This movie was amazing!"\n'
        f"Sentiment:"
    )

    # --- Few-shot ---
    few_shot = (
        f"{task}\n"
        f"\n"
        f'Review: "I loved every minute of it"\n'
        f"Sentiment: Positive\n"
        f"\n"
        f'Review: "Terrible waste of time"\n'
        f"Sentiment: Negative\n"
        f"\n"
        f'Review: "It was okay, nothing special"\n'
        f"Sentiment: Neutral\n"
        f"\n"
        f'Review: "This movie was amazing!"\n'
        f"Sentiment:"
    )

    print("=" * 60)
    print("ZERO-SHOT PROMPT")
    print("=" * 60)
    print(zero_shot)
    print()

    print("=" * 60)
    print("ONE-SHOT PROMPT")
    print("=" * 60)
    print(one_shot)
    print()

    print("=" * 60)
    print("FEW-SHOT PROMPT")
    print("=" * 60)
    print(few_shot)


prompting_demo()
def chain_of_thought_demo():
    """Demonstrate chain-of-thought prompting.
    
    Chain-of-thought (CoT) prompting encourages the model to show its
    reasoning step-by-step before giving the final answer.  This has been
    shown to improve performance on reasoning tasks.
    """
    # Standard prompt (no CoT)
    standard_prompt = (
        "Q: Roger has 5 tennis balls. He buys 2 more cans of tennis balls. "
        "Each can has 3 tennis balls. How many tennis balls does he have now?\n"
        "A:"
    )

    # Chain-of-thought prompt
    cot_prompt = (
        "Q: Janet has 3 apples. She buys 2 bags of apples. Each bag has 4 "
        "apples. How many apples does she have now?\n"
        "A: Janet starts with 3 apples. She buys 2 bags, each with 4 apples, "
        "so she gets 2 * 4 = 8 more apples. In total she has 3 + 8 = 11 apples. "
        "The answer is 11.\n"
        "\n"
        "Q: Roger has 5 tennis balls. He buys 2 more cans of tennis balls. "
        "Each can has 3 tennis balls. How many tennis balls does he have now?\n"
        "A:"
    )

    print("=" * 60)
    print("STANDARD PROMPT (no chain-of-thought)")
    print("=" * 60)
    print(standard_prompt)
    print("\n(Expected LLM answer: 11)")

    print()
    print("=" * 60)
    print("CHAIN-OF-THOUGHT PROMPT")
    print("=" * 60)
    print(cot_prompt)
    print(
        "\n(Expected LLM answer: Roger starts with 5 tennis balls. "
        "He buys 2 cans, each with 3 balls, "
        "so he gets 2 * 3 = 6 more. In total he has 5 + 6 = 11. "
        "The answer is 11.)"
    )


chain_of_thought_demo()

PART 6: RLHF ConceptsΒΆ

Reference: SLP Ch 10.2 – Learning from Preferences

Even after instruction tuning, LLMs can be too harmful or insufficiently helpful. Preference-based learning (RLHF / DPO) addresses this by training a reward model on human preference data and then using it to further align the LLM.

The core idea (the Bradley-Terry model):

\[ P(o_i \succ o_j \mid x) = \sigma(z_i - z_j) \]

where \(z_i, z_j\) are scalar reward scores for outputs \(o_i, o_j\) and \(\sigma\) is the logistic sigmoid. The reward model is trained so that the preferred response gets a higher score than the rejected one.

def reward_model_demo():
    """Mock reward model: a linear function of response embeddings.
    
    Score = W_reward . embedding
    
    We construct embeddings so that helpful responses have positive features
    and harmful responses have negative features, mimicking what a trained
    reward model would learn.
    """
    np.random.seed(42)
    d = 16  # embedding dimension

    # Reward model weights (learned from human preferences)
    W_reward = np.random.randn(d) * 0.1
    # Bias the first few dimensions to represent "helpfulness"
    W_reward[:4] = np.array([0.5, 0.4, 0.3, 0.2])

    # Mock response embeddings with controlled characteristics
    base = np.random.randn(d) * 0.1
    responses = {
        "helpful_response": base + np.concatenate([np.array([1.0, 0.8, 0.7, 0.6]),
                                                    np.zeros(d - 4)]),
        "unhelpful_response": base + np.concatenate([np.array([-0.2, -0.1, 0.0, -0.1]),
                                                      np.zeros(d - 4)]),
        "harmful_response": base + np.concatenate([np.array([-1.0, -0.8, -0.7, -0.6]),
                                                    np.zeros(d - 4)]),
    }

    print("Reward Model Scores")
    print(f"  (Linear reward model with d={d} dimensions)\n")
    for name, emb in responses.items():
        score = float(np.dot(W_reward, emb))
        label = name.replace("_", " ")
        print(f"  {label:<24} reward = {score:>+.4f}")

    # Show that the ordering matches human preferences
    scores = {name: float(np.dot(W_reward, emb)) for name, emb in responses.items()}
    ranked = sorted(scores.items(), key=lambda x: -x[1])
    print(f"\n  Ranking (highest to lowest): {' > '.join(r[0] for r in ranked)}")


reward_model_demo()
def preference_learning_demo():
    """Simple preference learning: train a reward model on (preferred, rejected) pairs.
    
    Uses the Bradley-Terry model:
        P(preferred > rejected) = sigmoid(r_preferred - r_rejected)
    
    Loss = -log sigmoid(r_preferred - r_rejected)
    
    We train the reward model weights via gradient descent so that the
    preferred response gets a higher score.
    """
    np.random.seed(42)
    d = 8  # embedding dimension
    n_pairs = 50  # number of preference pairs

    def sigmoid(x: np.ndarray) -> np.ndarray:
        return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

    # --- Generate preference pair data ---
    # Preferred responses have positive values in first dimensions;
    # rejected responses have negative values.
    preferred_embs = np.random.randn(n_pairs, d) * 0.5
    preferred_embs[:, :3] += 1.0  # helpful features

    rejected_embs = np.random.randn(n_pairs, d) * 0.5
    rejected_embs[:, :3] -= 0.5  # unhelpful features

    # --- Train reward model ---
    W = np.zeros(d)  # reward model weights
    lr = 0.1
    n_epochs = 100

    losses = []
    for epoch in range(n_epochs):
        # Compute rewards
        r_preferred = preferred_embs @ W  # (n_pairs,)
        r_rejected = rejected_embs @ W    # (n_pairs,)

        # Bradley-Terry loss: -log sigmoid(r_pref - r_rej)
        diff = r_preferred - r_rejected
        prob = sigmoid(diff)
        loss = -np.mean(np.log(np.maximum(prob, 1e-12)))
        losses.append(loss)

        # Gradient: d_loss/d_W
        # d_loss/d_diff = -(1 - sigmoid(diff)) = sigmoid(diff) - 1
        grad_diff = (prob - 1.0) / n_pairs  # (n_pairs,)
        # diff = (preferred - rejected) @ W, so d_diff/d_W = preferred - rejected
        grad_W = (preferred_embs - rejected_embs).T @ grad_diff  # (d,)

        W -= lr * grad_W

    # --- Evaluate ---
    r_preferred_final = preferred_embs @ W
    r_rejected_final = rejected_embs @ W
    accuracy = np.mean(r_preferred_final > r_rejected_final)

    print("Preference Learning Demo (Bradley-Terry Model)")
    print(f"  {n_pairs} preference pairs, embedding dim={d}")
    print(f"  Training: {n_epochs} epochs, lr={lr}")
    print(f"  Initial loss: {losses[0]:.4f}")
    print(f"  Final loss:   {losses[-1]:.4f}")
    print(f"  Accuracy (preferred scored higher): {accuracy:.2%}")
    print()
    print(f"  Learned weights W: {np.round(W, 3)}")
    print(f"  Note: first 3 dims are largest -- these capture 'helpfulness'")
    print()

    # Show a few example pairs
    print("  Sample pairs (reward_preferred vs reward_rejected):")
    for i in range(5):
        rp = r_preferred_final[i]
        rr = r_rejected_final[i]
        marker = "correct" if rp > rr else "WRONG"
        print(f"    Pair {i}: preferred={rp:>+.3f}  rejected={rr:>+.3f}  [{marker}]")


preference_learning_demo()

SummaryΒΆ

This lab covered the key building blocks of Large Language Models, from raw logits to human-aligned outputs:

Part

What we built

Key insight

1

Greedy, temperature, top-k, top-p decoding

Different strategies trade off between quality and diversity

2

Autoregressive generation loop

LLMs generate one token at a time, conditioned on all previous tokens

3

MLM masking and loss

BERT masks 15% of tokens (80/10/10 split) and predicts from bidirectional context

4

Classification head + fine-tuning

A simple linear layer on top of [CLS] embeddings can classify text

5

Zero/few-shot and chain-of-thought prompts

Prompt engineering enables tasks without weight updates

6

Reward model + preference learning

The Bradley-Terry model learns to score preferred outputs higher

Key references:

  • Jurafsky & Martin, Speech and Language Processing (3rd ed.), Chapters 7, 9, 10

  • Greedy decoding: SLP Eq 7.2

  • Temperature sampling: SLP Eq 7.4

  • Top-k sampling: SLP Section 8.6.1

  • Top-p / nucleus sampling: SLP Section 8.6.2

  • Masked Language Modeling: SLP Section 9.2

  • Fine-tuning for classification: SLP Section 9.4, Eq 9.11

  • Prompting and in-context learning: SLP Section 7.3

  • Bradley-Terry preference model: SLP Section 10.2.2