Collection, Buffers, and Evaluation

This guide covers trajectory collection, replay buffers, generalized advantage estimation, and policy evaluation.

Trajectory Collection

Collect gathers agent-environment interactions into structure-of-arrays form for batch processing.

Rollout

Collect.rollout collects a fixed number of transitions. It resets the environment at the start and automatically on episode boundaries:

open Fehu

let () = Nx.Rng.run ~seed:42 @@ fun () ->
  let env = Fehu_envs.Cartpole.make () in

  (* The policy receives an observation and returns
     (action, log_prob option, value_estimate option) *)
  let policy _obs =
    let act = Space.sample (Env.action_space env) in
    (act, None, None)
  in

  let _trajectory = Collect.rollout env ~policy ~n_steps:1024 in ()

The returned trajectory contains parallel arrays:

let n = Collect.length trajectory  (* 1024 *)
let obs = trajectory.observations       (* 'obs array *)
let acts = trajectory.actions           (* 'act array *)
let rews = trajectory.rewards           (* float array *)
let next_obs = trajectory.next_observations  (* 'obs array *)
let terms = trajectory.terminated       (* bool array *)
let truncs = trajectory.truncated       (* bool array *)
let infos = trajectory.infos            (* Info.t array *)
let log_ps = trajectory.log_probs       (* float array option *)
let vals = trajectory.values            (* float array option *)

When the policy returns Some log_prob or Some value, those are collected into log_probs and values. When any return is None, the corresponding field is None for the entire trajectory.

Policy Signature

The policy function has the signature:

'obs -> 'act * float option * float option

The three components are:

  1. action: the action to take
  2. log_prob (optional): the log-probability of the action under the current policy, used for importance sampling in PPO
  3. value (optional): the estimated value of the current state, used for GAE computation

For a simple random policy, return None for both:

let random_policy _obs =
  let act = Space.sample (Env.action_space env) in
  (act, None, None)

For a neural network policy with value head:

let nn_policy obs =
  let logits, value = forward_pass model obs in
  let act = sample_from_logits logits in
  let log_prob = log_prob_of logits act in
  (act, Some log_prob, Some value)

Episodes

Collect.episodes collects complete episodes, one trajectory per episode:

let episodes = Collect.episodes env
  ~policy ~n_episodes:10
  ~max_steps:500 ()

(* episodes is a ('obs, 'act) Collect.t list *)
let total_rewards = List.map (fun traj ->
  Array.fold_left (+.) 0.0 traj.rewards) episodes

Each episode runs until termination, truncation, or max_steps (default 1000).

Concatenating Trajectories

Collect.concat merges multiple trajectories into one:

let combined = Collect.concat [traj1; traj2; traj3]

Optional fields (log_probs, values) are kept only if present in all inputs.

Replay Buffers

Buffer provides a fixed-capacity circular buffer for off-policy experience storage. It stores individual transitions and supports uniform random sampling.

Creating and Filling

open Fehu

let buf = Buffer.create ~capacity:10_000

(* Add transitions one at a time *)
Buffer.add buf {
  observation = obs;
  action = act;
  reward = 1.0;
  next_observation = next_obs;
  terminated = false;
  truncated = false;
}

let n = Buffer.size buf           (* number of stored transitions *)
let full = Buffer.is_full buf     (* true when at capacity *)
let cap = Buffer.capacity buf     (* 10000 *)

When the buffer is full, new transitions overwrite the oldest ones.

Sampling

Draw a batch of transitions uniformly at random (with replacement):

let batch = Nx.Rng.run ~seed:0 @@ fun () ->
  Buffer.sample buf ~batch_size:64

(* batch is a transition array *)
let _obs_0 = batch.(0).observation
let _rew_0 = batch.(0).reward

For structure-of-arrays form (more convenient for training):

let (observations, actions, rewards,
     next_observations, terminated, truncated) =
  Nx.Rng.run ~seed:0 @@ fun () ->
    Buffer.sample_arrays buf ~batch_size:64

Clearing

Buffer.clear buf  (* removes all transitions, keeps storage allocated *)

Generalized Advantage Estimation

Gae computes advantages and returns for policy gradient methods. It correctly handles the distinction between terminated and truncated episodes:

  • Terminated: the episode ended naturally (e.g., pole fell). Bootstrap value is zero.
  • Truncated: the episode was cut short (e.g., time limit). Bootstrap value comes from next_values.

Computing Advantages

open Fehu

(* From a trajectory with value estimates *)
let advantages, returns = Gae.compute
  ~rewards:trajectory.rewards
  ~values:(Option.get trajectory.values)
  ~terminated:trajectory.terminated
  ~truncated:trajectory.truncated
  ~next_values    (* V(s_{t+1}) for each step *)
  ~gamma:0.99     (* discount factor *)
  ~lambda:0.95    (* GAE smoothing parameter *)

When you have values from a value network and the last value estimate, compute_from_values builds next_values for you:

let advantages, returns = Gae.compute_from_values
  ~rewards:trajectory.rewards
  ~values:(Option.get trajectory.values)
  ~terminated:trajectory.terminated
  ~truncated:trajectory.truncated
  ~last_value:0.0   (* V(s_T) for the final state *)
  ~gamma:0.99
  ~lambda:0.95

Monte Carlo Returns

For simpler algorithms that do not need advantages:

let rets = Gae.returns
  ~rewards:trajectory.rewards
  ~terminated:trajectory.terminated
  ~truncated:trajectory.truncated
  ~gamma:0.99

Normalizing Advantages

Normalize to zero mean and unit variance for training stability:

let normalized = Gae.normalize advantages
(* or with custom epsilon *)
let normalized = Gae.normalize ~eps:1e-6 advantages

Policy Evaluation

Eval.run runs a deterministic or stochastic policy over multiple episodes and reports summary statistics:

open Fehu

let () = Nx.Rng.run ~seed:42 @@ fun () ->
  let env = Fehu_envs.Cartpole.make () in

  (* Evaluate a random policy *)
  let stats = Eval.run env
    ~policy:(fun _obs -> Space.sample (Env.action_space env))
    ~n_episodes:100
    ~max_steps:500
    ()
  in
  Printf.printf
    "Episodes: %d, Mean reward: %.1f +/- %.1f, Mean length: %.0f\n"
    stats.n_episodes
    stats.mean_reward
    stats.std_reward
    stats.mean_length

The evaluation policy has a simpler signature than the collection policy: it only returns an action, not log-probs or value estimates:

'obs -> 'act

Eval.run resets the environment between episodes. Default n_episodes is 10 and default max_steps is 1000.

Putting It Together

A typical PPO-style training iteration using these utilities:

open Fehu

(* 1. Collect rollout *)
let trajectory = Collect.rollout env
  ~policy:(fun obs ->
    let act, log_prob, value = nn_policy obs in
    (act, Some log_prob, Some value))
  ~n_steps:2048

(* 2. Compute advantages *)
let last_value = estimate_value model last_obs in
let advantages, returns = Gae.compute_from_values
  ~rewards:trajectory.rewards
  ~values:(Option.get trajectory.values)
  ~terminated:trajectory.terminated
  ~truncated:trajectory.truncated
  ~last_value
  ~gamma:0.99 ~lambda:0.95

let advantages = Gae.normalize advantages

(* 3. Update policy using trajectory data + advantages *)
(* ... your PPO update here ... *)

(* 4. Evaluate *)
let stats = Eval.run env
  ~policy:(fun obs -> greedy_action model obs)
  ~n_episodes:10 ()

Next Steps