Lab 06: Large Language ModelsΒΆ
Source: Speech & Language Processing by Jurafsky/Martin (3rd ed. draft) β Chapters 7, 9, 10
PDF: resources/ed3book_jan26.pdf
This lab covers the core concepts behind modern Large Language Models:
Part |
Topic |
SLP Reference |
|---|---|---|
1 |
Text Generation Strategies |
Ch 7.4 β Greedy, Temperature, Top-k, Top-p |
2 |
Autoregressive Generation |
Ch 7.2β7.4 β Causal LM decoding loop |
3 |
Masked Language Model (BERT-style) |
Ch 9.2 β MLM training objective |
4 |
Fine-Tuning for Classification |
Ch 9.4 β [CLS] token classifier head |
5 |
Prompting and In-Context Learning |
Ch 7.3 β Zero/few-shot, chain-of-thought |
6 |
RLHF Concepts |
Ch 10.2 β Reward models, preference learning |
import numpy as np
from typing import List, Dict, Tuple, Callable, Optional
print("Imports loaded successfully.")
PART 1: Text Generation StrategiesΒΆ
Reference: SLP Ch 7.4 β Generation and Sampling
A language model produces a logit vector \(\mathbf{u}\) of shape \([1 \times |V|]\) giving a raw score for every token in the vocabulary. The softmax converts logits into a probability distribution \(\mathbf{y} = \text{softmax}(\mathbf{u})\). Different decoding strategies then select the next token from this distribution:
Strategy |
Idea |
|---|---|
Greedy |
Always pick the highest-probability token (argmax). Deterministic but repetitive. |
Temperature |
Divide logits by \(\tau\) before softmax. \(\tau < 1\) sharpens, \(\tau > 1\) flattens. |
Top-k |
Keep only the \(k\) highest-probability tokens, renormalize, then sample. |
Top-p (nucleus) |
Keep the smallest set of tokens whose cumulative probability \(\ge p\), renormalize, then sample. |
def softmax(logits: np.ndarray) -> np.ndarray:
"""Numerically stable softmax (SLP Eq 7.1).
Subtracts the max before exponentiating to avoid overflow.
"""
shifted = logits - np.max(logits)
exp_z = np.exp(shifted)
return exp_z / np.sum(exp_z)
# Quick sanity check
test_logits = np.array([2.0, 1.0, 0.1])
probs = softmax(test_logits)
print(f"Logits: {test_logits}")
print(f"Probabilities: {probs}")
print(f"Sum: {probs.sum():.6f}")
assert np.isclose(probs.sum(), 1.0), "Probabilities must sum to 1"
def greedy_decode(logits: np.ndarray) -> int:
"""Greedy decoding: select the token with the highest logit (SLP Eq 7.2).
w_hat = argmax_{w in V} P(w | context)
Deterministic -- always returns the same token for the same logits.
"""
return int(np.argmax(logits))
# Demo
demo_logits = np.array([5.0, 3.0, 2.0, 1.0])
demo_vocab = ["the", "a", "cat", "dog"]
chosen = greedy_decode(demo_logits)
print(f"Logits: {demo_logits}")
print(f"Greedy choice: index={chosen}, token='{demo_vocab[chosen]}'")
def temperature_sample(logits: np.ndarray, temperature: float = 1.0) -> int:
"""Temperature sampling (SLP Eq 7.4).
y = softmax(u / tau)
- tau < 1 -> sharpens distribution (more deterministic)
- tau = 1 -> standard softmax sampling
- tau > 1 -> flattens distribution (more random)
"""
adjusted = logits / temperature
probs = softmax(adjusted)
return int(np.random.choice(len(probs), p=probs))
# Demo: low vs high temperature
np.random.seed(42)
demo_logits = np.array([5.0, 3.0, 2.0, 1.0])
demo_vocab = ["the", "a", "cat", "dog"]
for T in [0.1, 0.5, 1.0, 2.0]:
samples = [demo_vocab[temperature_sample(demo_logits, T)] for _ in range(20)]
print(f"T={T:<4} -> {samples}")
def top_k_sample(logits: np.ndarray, k: int = 10) -> int:
"""Top-k sampling (SLP Section 8.6.1).
1. Find the top-k logits.
2. Set all other logits to -inf.
3. Softmax over the modified logits.
4. Sample from the resulting distribution.
When k=1 this is identical to greedy decoding.
"""
k = min(k, len(logits)) # safety
# Find the k-th largest value as the cutoff
top_k_indices = np.argsort(logits)[-k:]
filtered = np.full_like(logits, -np.inf)
filtered[top_k_indices] = logits[top_k_indices]
probs = softmax(filtered)
return int(np.random.choice(len(probs), p=probs))
# Demo
np.random.seed(42)
demo_logits = np.array([5.0, 3.0, 2.0, 1.0, 0.5, -1.0])
demo_vocab = ["the", "a", "cat", "dog", "sat", "on"]
for k_val in [1, 2, 3, 6]:
samples = [demo_vocab[top_k_sample(demo_logits, k_val)] for _ in range(15)]
print(f"k={k_val} -> {samples}")
def top_p_sample(logits: np.ndarray, p: float = 0.9) -> int:
"""Top-p (nucleus) sampling (SLP Section 8.6.2).
The top-p vocabulary V^(p) is the smallest set of tokens such that:
sum_{w in V^(p)} P(w | context) >= p
Steps:
1. Compute softmax probabilities.
2. Sort tokens by probability (descending).
3. Find the cutoff where cumulative sum >= p.
4. Zero out tokens below the cutoff.
5. Renormalize and sample.
"""
probs = softmax(logits)
# Sort indices by descending probability
sorted_indices = np.argsort(probs)[::-1]
sorted_probs = probs[sorted_indices]
# Find cumulative sum cutoff
cumsum = np.cumsum(sorted_probs)
# Keep tokens up to and including the first one that pushes cumsum >= p
cutoff_idx = np.searchsorted(cumsum, p) + 1 # +1 to include the boundary token
cutoff_idx = min(cutoff_idx, len(probs)) # safety
# Zero out everything below the cutoff
allowed_indices = sorted_indices[:cutoff_idx]
filtered_probs = np.zeros_like(probs)
filtered_probs[allowed_indices] = probs[allowed_indices]
# Renormalize
filtered_probs = filtered_probs / filtered_probs.sum()
return int(np.random.choice(len(filtered_probs), p=filtered_probs))
# Demo
np.random.seed(42)
demo_logits = np.array([5.0, 3.0, 2.0, 1.0, 0.5, -1.0])
demo_vocab = ["the", "a", "cat", "dog", "sat", "on"]
for p_val in [0.5, 0.8, 0.95, 1.0]:
samples = [demo_vocab[top_p_sample(demo_logits, p_val)] for _ in range(15)]
print(f"p={p_val:<4} -> {samples}")
def compare_sampling_strategies():
"""Compare all strategies on the same logits, 100 runs each."""
np.random.seed(42)
logits = np.array([5.0, 3.0, 2.0, 1.0, 0.5, 0.1, -1.0, -2.0, -3.0, -5.0])
vocab = ["the", "a", "cat", "dog", "sat", "on", "big", "red", "very", "zzz"]
n_runs = 100
strategies = {
"Greedy": lambda: greedy_decode(logits),
"Temp=0.5": lambda: temperature_sample(logits, 0.5),
"Temp=1.0": lambda: temperature_sample(logits, 1.0),
"Temp=2.0": lambda: temperature_sample(logits, 2.0),
"Top-k (k=3)": lambda: top_k_sample(logits, 3),
"Top-k (k=5)": lambda: top_k_sample(logits, 5),
"Top-p (p=0.7)": lambda: top_p_sample(logits, 0.7),
"Top-p (p=0.95)": lambda: top_p_sample(logits, 0.95),
}
# Show softmax probabilities first
probs = softmax(logits)
print("Logits and softmax probabilities:")
print(f" {'Token':<6} {'Logit':>6} {'P':>8}")
print(f" {'-'*6} {'-'*6} {'-'*8}")
for i, (tok, lo, pr) in enumerate(zip(vocab, logits, probs)):
print(f" {tok:<6} {lo:>6.1f} {pr:>8.4f}")
print()
for name, strategy_fn in strategies.items():
counts: Dict[str, int] = {}
for _ in range(n_runs):
token_idx = strategy_fn()
token = vocab[token_idx]
counts[token] = counts.get(token, 0) + 1
# Sort by count descending
sorted_counts = sorted(counts.items(), key=lambda x: -x[1])
dist_str = ", ".join(f"{tok}:{cnt}" for tok, cnt in sorted_counts)
print(f" {name:<18} -> {dist_str}")
compare_sampling_strategies()
PART 2: Autoregressive GenerationΒΆ
Reference: SLP Ch 7.2β7.4
In autoregressive (or causal) generation, the model predicts one token at a time, conditioned on all previous tokens. Each newly generated token is appended to the context, and the process repeats:
Generation stops when a special end-of-sequence token is produced or a maximum length is reached.
def generate_text_autoregressive(
model_fn: Callable[[List[int]], np.ndarray],
vocab: List[str],
prompt_ids: List[int],
max_new_tokens: int = 20,
strategy: str = "greedy",
temperature: float = 1.0,
top_k: int = 10,
) -> List[int]:
"""Generate text autoregressively.
Args:
model_fn: function(token_ids) -> logits for next token
vocab: list of token strings
prompt_ids: initial token ids
max_new_tokens: max tokens to generate
strategy: 'greedy', 'temperature', or 'top_k'
temperature: temperature parameter (used when strategy='temperature')
top_k: k parameter (used when strategy='top_k')
Returns:
Full list of token ids (prompt + generated)
"""
generated = list(prompt_ids)
for _ in range(max_new_tokens):
logits = model_fn(generated)
if strategy == "greedy":
next_token = greedy_decode(logits)
elif strategy == "temperature":
next_token = temperature_sample(logits, temperature)
elif strategy == "top_k":
next_token = top_k_sample(logits, top_k)
else:
raise ValueError(f"Unknown strategy: {strategy}")
generated.append(next_token)
# Stop at end-of-sequence token (id 0 by convention)
if next_token == 0:
break
return generated
print("generate_text_autoregressive() defined.")
def simple_language_model():
"""Build a bigram language model and generate text with different strategies.
The bigram model uses a transition matrix where row i gives the logits
for P(next_token | current_token = vocab[i]).
"""
vocab = ["<eos>", "the", "cat", "sat", "on", "mat", "dog", "ran", "big", "a"]
V = len(vocab)
# Transition logits: transitions[i] = logits for next token given word i
transitions = np.zeros((V, V))
# eos the cat sat on mat dog ran big a
transitions[1] = [0, 0, 3, 0, 0, 0, 2, 0, 1, 0] # after "the"
transitions[2] = [0, 0, 0, 3, 0, 0, 0, 1, 0, 0] # after "cat"
transitions[3] = [0, 0, 0, 0, 3, 0, 0, 0, 0, 0] # after "sat"
transitions[4] = [0, 2, 0, 0, 0, 2, 0, 0, 0, 1] # after "on"
transitions[5] = [3, 0, 0, 0, 0, 0, 0, 0, 0, 0] # after "mat"
transitions[6] = [0, 0, 0, 1, 0, 0, 0, 2, 0, 0] # after "dog"
transitions[7] = [1, 0, 0, 0, 1, 0, 0, 0, 0, 0] # after "ran"
transitions[8] = [0, 0, 2, 0, 0, 0, 2, 0, 0, 0] # after "big"
transitions[9] = [0, 0, 2, 0, 0, 1, 1, 0, 1, 0] # after "a"
def model_fn(token_ids: List[int]) -> np.ndarray:
"""Return logits for the next token given the last token (bigram)."""
last_token = token_ids[-1]
return transitions[last_token]
# Generate with different strategies from prompt "the" (id=1)
prompt = [1]
print("Bigram Language Model Generation")
print(f"Vocabulary: {vocab}")
print(f"Prompt: '{vocab[prompt[0]]}'")
print()
# Greedy -- deterministic
ids = generate_text_autoregressive(model_fn, vocab, prompt, max_new_tokens=10,
strategy="greedy")
text = " ".join(vocab[i] for i in ids)
print(f" Greedy: {text}")
# Temperature sampling -- several runs
np.random.seed(0)
for T in [0.5, 1.0, 2.0]:
results = []
for _ in range(5):
ids = generate_text_autoregressive(model_fn, vocab, prompt,
max_new_tokens=10,
strategy="temperature",
temperature=T)
text = " ".join(vocab[i] for i in ids)
results.append(text)
print(f" Temp={T:<4}: {results[0]}")
for r in results[1:]:
print(f" {r}")
print()
# Top-k sampling
np.random.seed(42)
for k_val in [2, 3]:
results = []
for _ in range(3):
ids = generate_text_autoregressive(model_fn, vocab, prompt,
max_new_tokens=10,
strategy="top_k", top_k=k_val)
text = " ".join(vocab[i] for i in ids)
results.append(text)
print(f" Top-k={k_val}: {results}")
simple_language_model()
PART 3: Masked Language Model (BERT-style)ΒΆ
Reference: SLP Ch 9.2 β Training Bidirectional Encoders
Instead of predicting the next token left-to-right, a Masked Language Model (MLM) masks some tokens and predicts them from bidirectional context.
The BERT masking strategy selects 15% of tokens and then:
80% of the time: replace with
[MASK]10% of the time: replace with a random token from the vocabulary
10% of the time: keep the original token unchanged
The loss \(L_{MLM}\) is the cross-entropy computed only at masked positions:
where \(M\) is the set of masked token indices.
def create_mlm_input(
tokens: List[int],
vocab_size: int,
mask_token_id: int,
mask_prob: float = 0.15,
) -> Tuple[List[int], List[int]]:
"""Create MLM training input following the BERT masking strategy.
For each token selected for masking (with probability mask_prob):
- 80%: replace with [MASK] token
- 10%: replace with a random token from the vocabulary
- 10%: keep the original token
Args:
tokens: original token ids
vocab_size: total vocabulary size
mask_token_id: id of the [MASK] special token
mask_prob: probability of selecting each token for masking
Returns:
(masked_tokens, labels) where labels[i] = original token if
position i was selected, else -1 (ignore in loss).
"""
masked = list(tokens)
labels = [-1] * len(tokens)
for i in range(len(tokens)):
if np.random.random() < mask_prob:
labels[i] = tokens[i] # remember original for the loss
r = np.random.random()
if r < 0.8:
# 80%: replace with [MASK]
masked[i] = mask_token_id
elif r < 0.9:
# 10%: replace with random token
masked[i] = np.random.randint(0, vocab_size)
# else 10%: keep original (do nothing)
return masked, labels
print("create_mlm_input() defined.")
def mlm_loss(logits: np.ndarray, labels: np.ndarray) -> float:
"""Compute cross-entropy loss only at masked positions.
Args:
logits: shape (seq_len, vocab_size) -- raw scores from the model
labels: shape (seq_len,) -- original token id at masked positions,
-1 at non-masked positions (ignored)
Returns:
Average cross-entropy loss over masked positions.
"""
seq_len = logits.shape[0]
total_loss = 0.0
count = 0
for i in range(seq_len):
if labels[i] == -1:
continue # skip non-masked positions
# Compute softmax probabilities for this position
probs = softmax(logits[i])
# Cross-entropy: -log P(correct_token)
correct_token = int(labels[i])
# Clamp to avoid log(0)
prob = max(probs[correct_token], 1e-12)
total_loss += -np.log(prob)
count += 1
if count == 0:
return 0.0
return total_loss / count
# Quick test
test_logits = np.array([[2.0, 0.5, 0.1], [0.1, 2.0, 0.5], [0.5, 0.1, 2.0]])
test_labels = np.array([0, -1, 2]) # masked at positions 0 and 2
loss = mlm_loss(test_logits, test_labels)
print(f"MLM loss (should be small since logits favour correct tokens): {loss:.4f}")
def mlm_demo():
"""Demo: mask a sentence and show original vs masked vs labels."""
np.random.seed(42)
# Toy vocabulary
token_to_id = {
"[PAD]": 0, "[MASK]": 1, "[CLS]": 2, "[SEP]": 3,
"the": 4, "cat": 5, "sat": 6, "on": 7, "mat": 8,
"dog": 9, "ran": 10, "big": 11, "red": 12, "a": 13,
}
id_to_token = {v: k for k, v in token_to_id.items()}
vocab_size = len(token_to_id)
mask_token_id = token_to_id["[MASK]"]
# Original sentence
sentence = ["[CLS]", "the", "big", "cat", "sat", "on", "the", "red", "mat", "[SEP]"]
token_ids = [token_to_id[w] for w in sentence]
masked_ids, labels = create_mlm_input(token_ids, vocab_size, mask_token_id,
mask_prob=0.15)
print("MLM Masking Demo")
print(f" Original: {' '.join(sentence)}")
print(f" Masked: {' '.join(id_to_token[i] for i in masked_ids)}")
print(f" Labels: {['_' if l == -1 else id_to_token[l] for l in labels]}")
print()
# Show positions that were selected for masking
masked_positions = [i for i, l in enumerate(labels) if l != -1]
if masked_positions:
print(f" Masked positions: {masked_positions}")
for pos in masked_positions:
orig = id_to_token[token_ids[pos]]
new = id_to_token[masked_ids[pos]]
print(f" Position {pos}: '{orig}' -> '{new}' (label='{id_to_token[labels[pos]]}')")
else:
print(" (No tokens were selected for masking this run -- try a different seed)")
# Compute loss with mock logits
mock_logits = np.random.randn(len(token_ids), vocab_size)
loss = mlm_loss(mock_logits, np.array(labels))
print(f"\n Mock MLM loss (random logits): {loss:.4f}")
print(f" Expected for random: ~-ln(1/{vocab_size}) = {np.log(vocab_size):.4f}")
mlm_demo()
PART 4: Fine-Tuning for ClassificationΒΆ
Reference: SLP Ch 9.4 β Fine-Tuning for Classification
The power of pretrained models lies in transfer learning: the pretrained representations can be adapted to downstream tasks with a small amount of labeled data.
For sequence classification (e.g., sentiment analysis), BERT prepends a special
[CLS] token to the input. The output vector \(\mathbf{h}_{CLS}^L\) from the final
transformer layer serves as the input to a classifier head:
where \(\mathbf{W}_C\) is of size \([d \times k]\) (\(d\) = model dimension, \(k\) = number of classes).
def classification_head(
cls_embedding: np.ndarray,
W: np.ndarray,
b: np.ndarray,
) -> np.ndarray:
"""Classification from [CLS] token embedding (SLP Eq 9.11).
Args:
cls_embedding: shape (d_model,) -- the [CLS] output vector
W: shape (d_model, n_classes) -- classification weights
b: shape (n_classes,) -- classification bias
Returns:
logits: shape (n_classes,)
"""
return cls_embedding @ W + b
# Quick test
d, k = 8, 3
np.random.seed(0)
test_emb = np.random.randn(d)
test_W = np.random.randn(d, k) * 0.1
test_b = np.zeros(k)
logits = classification_head(test_emb, test_W, test_b)
probs = softmax(logits)
print(f"CLS embedding shape: {test_emb.shape}")
print(f"Logits: {logits}")
print(f"Probabilities: {probs}")
print(f"Predicted class: {np.argmax(probs)}")
def fine_tuning_pipeline():
"""Simulate the BERT fine-tuning process for classification.
Steps:
1. Create mock 'pretrained' [CLS] embeddings.
2. Generate labels correlated with embedding dimensions.
3. Split into train/test.
4. Train a linear classification head with gradient descent.
5. Evaluate accuracy.
"""
np.random.seed(42)
d_model = 64
n_classes = 3
n_samples = 200
# --- Step 1: Mock "pretrained" CLS embeddings ---
cls_embeddings = np.random.randn(n_samples, d_model)
# --- Step 2: Generate labels correlated with embedding ---
# Use a hidden linear mapping + noise to create class labels
true_W = np.random.randn(d_model, n_classes) * 0.1
true_logits = cls_embeddings @ true_W
labels = np.argmax(true_logits + np.random.randn(n_samples, n_classes) * 0.3, axis=1)
# --- Step 3: Train/test split (80/20) ---
split = int(0.8 * n_samples)
X_train, X_test = cls_embeddings[:split], cls_embeddings[split:]
y_train, y_test = labels[:split], labels[split:]
# --- Step 4: Train a linear head with gradient descent ---
W = np.random.randn(d_model, n_classes) * 0.01
b = np.zeros(n_classes)
lr = 0.01
n_epochs = 200
batch_size = 32
train_losses = []
for epoch in range(n_epochs):
# Shuffle training data
perm = np.random.permutation(split)
epoch_loss = 0.0
for start in range(0, split, batch_size):
idx = perm[start:start + batch_size]
X_batch = X_train[idx]
y_batch = y_train[idx]
bs = len(idx)
# Forward pass
logits = X_batch @ W + b # (bs, n_classes)
# Softmax (row-wise)
shifted = logits - logits.max(axis=1, keepdims=True)
exp_logits = np.exp(shifted)
probs = exp_logits / exp_logits.sum(axis=1, keepdims=True)
# Cross-entropy loss
correct_probs = probs[np.arange(bs), y_batch]
loss = -np.mean(np.log(np.maximum(correct_probs, 1e-12)))
epoch_loss += loss * bs
# Backward pass (gradient of softmax cross-entropy)
grad_logits = probs.copy() # (bs, n_classes)
grad_logits[np.arange(bs), y_batch] -= 1.0
grad_logits /= bs
grad_W = X_batch.T @ grad_logits # (d_model, n_classes)
grad_b = grad_logits.sum(axis=0) # (n_classes,)
# Update
W -= lr * grad_W
b -= lr * grad_b
epoch_loss /= split
train_losses.append(epoch_loss)
# --- Step 5: Evaluate ---
# Train accuracy
train_logits = X_train @ W + b
train_preds = np.argmax(train_logits, axis=1)
train_acc = np.mean(train_preds == y_train)
# Test accuracy
test_logits = X_test @ W + b
test_preds = np.argmax(test_logits, axis=1)
test_acc = np.mean(test_preds == y_test)
print("Fine-Tuning Pipeline Results")
print(f" Dataset: {n_samples} samples, d_model={d_model}, {n_classes} classes")
print(f" Train/Test split: {split}/{n_samples - split}")
print(f" Training epochs: {n_epochs}, lr={lr}, batch_size={batch_size}")
print(f" Final train loss: {train_losses[-1]:.4f}")
print(f" Train accuracy: {train_acc:.2%}")
print(f" Test accuracy: {test_acc:.2%}")
print(f" Loss over training (first 5 epochs): {[f'{l:.3f}' for l in train_losses[:5]]}")
print(f" Loss over training (last 5 epochs): {[f'{l:.3f}' for l in train_losses[-5:]]}")
fine_tuning_pipeline()
PART 5: Prompting and In-Context LearningΒΆ
Reference: SLP Ch 7.3 β Prompting
Rather than fine-tuning model weights, we can use prompting to steer an LLM toward a task. A prompt is the text string given to the model as context for conditional generation.
Style |
Description |
|---|---|
Zero-shot |
Only the task instruction, no examples. |
One-shot |
One labeled example (demonstration) before the query. |
Few-shot |
Multiple demonstrations before the query. |
This kind of task-specific learning at inference time β without any gradient updates β is called in-context learning (Brown et al., 2020).
def prompting_demo():
"""Construct zero-shot, one-shot, and few-shot prompts for sentiment classification."""
task = "Classify the sentiment of the following review."
# --- Zero-shot ---
zero_shot = (
f"{task}\n"
f"\n"
f'Review: "This movie was amazing!"\n'
f"Sentiment:"
)
# --- One-shot ---
one_shot = (
f"{task}\n"
f"\n"
f'Review: "I loved every minute of it"\n'
f"Sentiment: Positive\n"
f"\n"
f'Review: "This movie was amazing!"\n'
f"Sentiment:"
)
# --- Few-shot ---
few_shot = (
f"{task}\n"
f"\n"
f'Review: "I loved every minute of it"\n'
f"Sentiment: Positive\n"
f"\n"
f'Review: "Terrible waste of time"\n'
f"Sentiment: Negative\n"
f"\n"
f'Review: "It was okay, nothing special"\n'
f"Sentiment: Neutral\n"
f"\n"
f'Review: "This movie was amazing!"\n'
f"Sentiment:"
)
print("=" * 60)
print("ZERO-SHOT PROMPT")
print("=" * 60)
print(zero_shot)
print()
print("=" * 60)
print("ONE-SHOT PROMPT")
print("=" * 60)
print(one_shot)
print()
print("=" * 60)
print("FEW-SHOT PROMPT")
print("=" * 60)
print(few_shot)
prompting_demo()
def chain_of_thought_demo():
"""Demonstrate chain-of-thought prompting.
Chain-of-thought (CoT) prompting encourages the model to show its
reasoning step-by-step before giving the final answer. This has been
shown to improve performance on reasoning tasks.
"""
# Standard prompt (no CoT)
standard_prompt = (
"Q: Roger has 5 tennis balls. He buys 2 more cans of tennis balls. "
"Each can has 3 tennis balls. How many tennis balls does he have now?\n"
"A:"
)
# Chain-of-thought prompt
cot_prompt = (
"Q: Janet has 3 apples. She buys 2 bags of apples. Each bag has 4 "
"apples. How many apples does she have now?\n"
"A: Janet starts with 3 apples. She buys 2 bags, each with 4 apples, "
"so she gets 2 * 4 = 8 more apples. In total she has 3 + 8 = 11 apples. "
"The answer is 11.\n"
"\n"
"Q: Roger has 5 tennis balls. He buys 2 more cans of tennis balls. "
"Each can has 3 tennis balls. How many tennis balls does he have now?\n"
"A:"
)
print("=" * 60)
print("STANDARD PROMPT (no chain-of-thought)")
print("=" * 60)
print(standard_prompt)
print("\n(Expected LLM answer: 11)")
print()
print("=" * 60)
print("CHAIN-OF-THOUGHT PROMPT")
print("=" * 60)
print(cot_prompt)
print(
"\n(Expected LLM answer: Roger starts with 5 tennis balls. "
"He buys 2 cans, each with 3 balls, "
"so he gets 2 * 3 = 6 more. In total he has 5 + 6 = 11. "
"The answer is 11.)"
)
chain_of_thought_demo()
PART 6: RLHF ConceptsΒΆ
Reference: SLP Ch 10.2 β Learning from Preferences
Even after instruction tuning, LLMs can be too harmful or insufficiently helpful. Preference-based learning (RLHF / DPO) addresses this by training a reward model on human preference data and then using it to further align the LLM.
The core idea (the Bradley-Terry model):
where \(z_i, z_j\) are scalar reward scores for outputs \(o_i, o_j\) and \(\sigma\) is the logistic sigmoid. The reward model is trained so that the preferred response gets a higher score than the rejected one.
def reward_model_demo():
"""Mock reward model: a linear function of response embeddings.
Score = W_reward . embedding
We construct embeddings so that helpful responses have positive features
and harmful responses have negative features, mimicking what a trained
reward model would learn.
"""
np.random.seed(42)
d = 16 # embedding dimension
# Reward model weights (learned from human preferences)
W_reward = np.random.randn(d) * 0.1
# Bias the first few dimensions to represent "helpfulness"
W_reward[:4] = np.array([0.5, 0.4, 0.3, 0.2])
# Mock response embeddings with controlled characteristics
base = np.random.randn(d) * 0.1
responses = {
"helpful_response": base + np.concatenate([np.array([1.0, 0.8, 0.7, 0.6]),
np.zeros(d - 4)]),
"unhelpful_response": base + np.concatenate([np.array([-0.2, -0.1, 0.0, -0.1]),
np.zeros(d - 4)]),
"harmful_response": base + np.concatenate([np.array([-1.0, -0.8, -0.7, -0.6]),
np.zeros(d - 4)]),
}
print("Reward Model Scores")
print(f" (Linear reward model with d={d} dimensions)\n")
for name, emb in responses.items():
score = float(np.dot(W_reward, emb))
label = name.replace("_", " ")
print(f" {label:<24} reward = {score:>+.4f}")
# Show that the ordering matches human preferences
scores = {name: float(np.dot(W_reward, emb)) for name, emb in responses.items()}
ranked = sorted(scores.items(), key=lambda x: -x[1])
print(f"\n Ranking (highest to lowest): {' > '.join(r[0] for r in ranked)}")
reward_model_demo()
def preference_learning_demo():
"""Simple preference learning: train a reward model on (preferred, rejected) pairs.
Uses the Bradley-Terry model:
P(preferred > rejected) = sigmoid(r_preferred - r_rejected)
Loss = -log sigmoid(r_preferred - r_rejected)
We train the reward model weights via gradient descent so that the
preferred response gets a higher score.
"""
np.random.seed(42)
d = 8 # embedding dimension
n_pairs = 50 # number of preference pairs
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
# --- Generate preference pair data ---
# Preferred responses have positive values in first dimensions;
# rejected responses have negative values.
preferred_embs = np.random.randn(n_pairs, d) * 0.5
preferred_embs[:, :3] += 1.0 # helpful features
rejected_embs = np.random.randn(n_pairs, d) * 0.5
rejected_embs[:, :3] -= 0.5 # unhelpful features
# --- Train reward model ---
W = np.zeros(d) # reward model weights
lr = 0.1
n_epochs = 100
losses = []
for epoch in range(n_epochs):
# Compute rewards
r_preferred = preferred_embs @ W # (n_pairs,)
r_rejected = rejected_embs @ W # (n_pairs,)
# Bradley-Terry loss: -log sigmoid(r_pref - r_rej)
diff = r_preferred - r_rejected
prob = sigmoid(diff)
loss = -np.mean(np.log(np.maximum(prob, 1e-12)))
losses.append(loss)
# Gradient: d_loss/d_W
# d_loss/d_diff = -(1 - sigmoid(diff)) = sigmoid(diff) - 1
grad_diff = (prob - 1.0) / n_pairs # (n_pairs,)
# diff = (preferred - rejected) @ W, so d_diff/d_W = preferred - rejected
grad_W = (preferred_embs - rejected_embs).T @ grad_diff # (d,)
W -= lr * grad_W
# --- Evaluate ---
r_preferred_final = preferred_embs @ W
r_rejected_final = rejected_embs @ W
accuracy = np.mean(r_preferred_final > r_rejected_final)
print("Preference Learning Demo (Bradley-Terry Model)")
print(f" {n_pairs} preference pairs, embedding dim={d}")
print(f" Training: {n_epochs} epochs, lr={lr}")
print(f" Initial loss: {losses[0]:.4f}")
print(f" Final loss: {losses[-1]:.4f}")
print(f" Accuracy (preferred scored higher): {accuracy:.2%}")
print()
print(f" Learned weights W: {np.round(W, 3)}")
print(f" Note: first 3 dims are largest -- these capture 'helpfulness'")
print()
# Show a few example pairs
print(" Sample pairs (reward_preferred vs reward_rejected):")
for i in range(5):
rp = r_preferred_final[i]
rr = r_rejected_final[i]
marker = "correct" if rp > rr else "WRONG"
print(f" Pair {i}: preferred={rp:>+.3f} rejected={rr:>+.3f} [{marker}]")
preference_learning_demo()
SummaryΒΆ
This lab covered the key building blocks of Large Language Models, from raw logits to human-aligned outputs:
Part |
What we built |
Key insight |
|---|---|---|
1 |
Greedy, temperature, top-k, top-p decoding |
Different strategies trade off between quality and diversity |
2 |
Autoregressive generation loop |
LLMs generate one token at a time, conditioned on all previous tokens |
3 |
MLM masking and loss |
BERT masks 15% of tokens (80/10/10 split) and predicts from bidirectional context |
4 |
Classification head + fine-tuning |
A simple linear layer on top of [CLS] embeddings can classify text |
5 |
Zero/few-shot and chain-of-thought prompts |
Prompt engineering enables tasks without weight updates |
6 |
Reward model + preference learning |
The Bradley-Terry model learns to score preferred outputs higher |
Key references:
Jurafsky & Martin, Speech and Language Processing (3rd ed.), Chapters 7, 9, 10
Greedy decoding: SLP Eq 7.2
Temperature sampling: SLP Eq 7.4
Top-k sampling: SLP Section 8.6.1
Top-p / nucleus sampling: SLP Section 8.6.2
Masked Language Modeling: SLP Section 9.2
Fine-tuning for classification: SLP Section 9.4, Eq 9.11
Prompting and in-context learning: SLP Section 7.3
Bradley-Terry preference model: SLP Section 10.2.2