Skip to content

Observation Functions

Protocol

ObservationFunction

Bases: Protocol

Source code in gyozas/observations/structs.py
@runtime_checkable
class ObservationFunction(Protocol):
    def reset(self, model: Model) -> None: ...
    def extract(self, model: Model, done: bool) -> Any: ...

Return Types

BipartiteGraph

BipartiteGraph dataclass

Source code in gyozas/observations/structs.py
@dataclass
class BipartiteGraph:
    variable_features: np.ndarray
    row_features: np.ndarray
    edge_features: EdgeFeatures

EdgeFeatures

EdgeFeatures dataclass

Source code in gyozas/observations/structs.py
@dataclass
class EdgeFeatures:
    indices: np.ndarray
    values: np.ndarray

NodeBipartite (alias)

gyozas.NodeBipartite is an alias for NodeBipartiteEcole. Prefer using it for code that does not need to distinguish between implementations.

NodeBipartiteEcole

NodeBipartiteEcole

Pure-Python bipartite graph observation with configurable feature extraction.

Parameters:

Name Type Description Default
cache bool

When True, static features and edge structure are computed once (at the root node or first call) and reused; only dynamic features are refreshed on subsequent calls. Mirrors Ecole's NodeBipartite(cache=True).

False
suppress_warnings bool

Suppress shape-change warnings when passing prev_* features.

False
static_col_features tuple[str, ...]

Column features to extract.

_DEFAULT_STATIC_COL
dynamic_col_features tuple[str, ...]

Column features to extract.

_DEFAULT_STATIC_COL
static_row_features tuple[str, ...]

Row features to extract.

_DEFAULT_STATIC_ROW
dynamic_row_features tuple[str, ...]

Row features to extract.

_DEFAULT_STATIC_ROW
Source code in gyozas/observations/node_bipartite_ecole.py
class NodeBipartiteEcole:
    """Pure-Python bipartite graph observation with configurable feature extraction.

    Parameters
    ----------
    cache : bool
        When ``True``, static features and edge structure are computed once (at
        the root node or first call) and reused; only dynamic features are
        refreshed on subsequent calls. Mirrors Ecole's ``NodeBipartite(cache=True)``.
    suppress_warnings : bool
        Suppress shape-change warnings when passing ``prev_*`` features.
    static_col_features, dynamic_col_features : tuple[str, ...]
        Column features to extract.
    static_row_features, dynamic_row_features : tuple[str, ...]
        Row features to extract.
    """

    def __init__(
        self,
        cache: bool = False,
        suppress_warnings: bool = False,
        static_col_features: tuple = _DEFAULT_STATIC_COL,
        dynamic_col_features: tuple = _DEFAULT_DYNAMIC_COL,
        static_row_features: tuple = _DEFAULT_STATIC_ROW,
        dynamic_row_features: tuple = _DEFAULT_DYNAMIC_ROW,
    ) -> None:
        self.cache = cache
        self.suppress_warnings = suppress_warnings

        col_feats = tuple(static_col_features) + tuple(dynamic_col_features)
        row_feats = tuple(static_row_features) + tuple(dynamic_row_features)

        self._col_fm = {f: i for i, f in enumerate(col_feats)}
        self._row_fm = {f: i for i, f in enumerate(row_feats)}

        self._cached_var_features = None
        self._cached_row_features = None
        self._cached_edge_features = None
        self._cache_computed = False

    def reset(self, model: Model) -> None:
        self._cached_var_features = None
        self._cached_row_features = None
        self._cached_edge_features = None
        self._cache_computed = False

    def _extract_tuple(
        self, model: Model, done: bool, prev_var_features=None, prev_row_features=None, prev_edge_features=None
    ) -> tuple | None:
        """Extract the bipartite graph observation.

        Returns
        -------
        tuple or None
            ``(variable_features, row_features, (edge_indices, edge_vals))``
            during solving, or ``None`` when done or outside the solving stage.
            ``edge_indices`` has shape ``(2, nnz)``; ``edge_vals`` has shape ``(nnz,)``.
        """
        if done or model.getStage() != PY_SCIP_STAGE.SOLVING:
            return None

        if self.cache:
            current = model.getCurrentNode()
            on_root = current is not None and current.getDepth() == 0
            if on_root or not self._cache_computed:
                vf, rf, ef = _extract(model, self._col_fm, self._row_fm, suppress_warnings=self.suppress_warnings)
                self._cached_var_features = vf
                self._cached_row_features = rf
                self._cached_edge_features = ef
                self._cache_computed = True
                return vf, rf, ef
            assert self._cached_var_features is not None
            assert self._cached_row_features is not None
            vf = self._cached_var_features.copy()
            rf = self._cached_row_features.copy()
            _update_dynamic_only(model, vf, rf, self._col_fm, self._row_fm)
            return vf, rf, self._cached_edge_features

        return _extract(
            model,
            self._col_fm,
            self._row_fm,
            prev_var_features=prev_var_features,
            prev_row_features=prev_row_features,
            prev_edge_features=prev_edge_features,
            suppress_warnings=self.suppress_warnings,
        )

    def extract(
        self, model: Model, done: bool, prev_var_features=None, prev_row_features=None, prev_edge_features=None
    ) -> BipartiteGraph | None:
        """Extract the bipartite graph observation.

        Returns
        -------
        BipartiteGraph or None
            A :class:`BipartiteGraph` dataclass during solving,
            or ``None`` when done or outside the solving stage.
        """
        result = self._extract_tuple(model, done, prev_var_features, prev_row_features, prev_edge_features)
        if result is None:
            return None
        variable_features, row_features, (edge_indices, edge_features) = result
        bg = BipartiteGraph(
            variable_features=variable_features,
            row_features=row_features,
            edge_features=EdgeFeatures(indices=edge_indices, values=edge_features),
        )
        return bg

    @classmethod
    def getBipartiteGraphRepresentation(
        cls,
        model: Model,
        static_only: bool = False,
        static_col_features: tuple | None = None,
        dynamic_col_features: tuple | None = None,
        static_row_features: tuple | None = None,
        dynamic_row_features: tuple | None = None,
        prev_col_features=None,
        prev_row_features=None,
        prev_edge_features=None,
        suppress_warnings: bool = False,
    ) -> tuple:
        """Stateless extraction returning a 4-tuple.

        Raises ``RuntimeError`` when the model is not in SOLVING stage.

        Returns
        -------
        tuple
            ``(col_features, (edge_indices, edge_values), row_features, {})``
            where ``edge_indices`` has shape ``(2, nnz)`` and
            ``edge_values`` has shape ``(nnz,)``.
        """
        if model.getStage() != PY_SCIP_STAGE.SOLVING:
            raise RuntimeError(
                f"Model must be in SOLVING stage to extract observations, got stage {model.getStage()!r}."
            )
        kwargs: dict = dict(suppress_warnings=suppress_warnings)
        if static_only:
            kwargs["dynamic_col_features"] = ()
            kwargs["dynamic_row_features"] = ()
        if static_col_features is not None:
            kwargs["static_col_features"] = static_col_features
        if dynamic_col_features is not None:
            kwargs["dynamic_col_features"] = dynamic_col_features
        if static_row_features is not None:
            kwargs["static_row_features"] = static_row_features
        if dynamic_row_features is not None:
            kwargs["dynamic_row_features"] = dynamic_row_features

        obs = cls(**kwargs)
        result = obs._extract_tuple(
            model,
            done=False,
            prev_var_features=prev_col_features,
            prev_row_features=prev_row_features,
            prev_edge_features=prev_edge_features,
        )
        if result is None:
            raise RuntimeError("Model is not in SOLVING stage.")
        variable_features, row_features, (edge_indices, edge_values) = result
        return (variable_features, (edge_indices, edge_values), row_features, {})

extract(model, done, prev_var_features=None, prev_row_features=None, prev_edge_features=None)

Extract the bipartite graph observation.

Returns:

Type Description
BipartiteGraph or None

A :class:BipartiteGraph dataclass during solving, or None when done or outside the solving stage.

Source code in gyozas/observations/node_bipartite_ecole.py
def extract(
    self, model: Model, done: bool, prev_var_features=None, prev_row_features=None, prev_edge_features=None
) -> BipartiteGraph | None:
    """Extract the bipartite graph observation.

    Returns
    -------
    BipartiteGraph or None
        A :class:`BipartiteGraph` dataclass during solving,
        or ``None`` when done or outside the solving stage.
    """
    result = self._extract_tuple(model, done, prev_var_features, prev_row_features, prev_edge_features)
    if result is None:
        return None
    variable_features, row_features, (edge_indices, edge_features) = result
    bg = BipartiteGraph(
        variable_features=variable_features,
        row_features=row_features,
        edge_features=EdgeFeatures(indices=edge_indices, values=edge_features),
    )
    return bg

getBipartiteGraphRepresentation(model, static_only=False, static_col_features=None, dynamic_col_features=None, static_row_features=None, dynamic_row_features=None, prev_col_features=None, prev_row_features=None, prev_edge_features=None, suppress_warnings=False) classmethod

Stateless extraction returning a 4-tuple.

Raises RuntimeError when the model is not in SOLVING stage.

Returns:

Type Description
tuple

(col_features, (edge_indices, edge_values), row_features, {}) where edge_indices has shape (2, nnz) and edge_values has shape (nnz,).

Source code in gyozas/observations/node_bipartite_ecole.py
@classmethod
def getBipartiteGraphRepresentation(
    cls,
    model: Model,
    static_only: bool = False,
    static_col_features: tuple | None = None,
    dynamic_col_features: tuple | None = None,
    static_row_features: tuple | None = None,
    dynamic_row_features: tuple | None = None,
    prev_col_features=None,
    prev_row_features=None,
    prev_edge_features=None,
    suppress_warnings: bool = False,
) -> tuple:
    """Stateless extraction returning a 4-tuple.

    Raises ``RuntimeError`` when the model is not in SOLVING stage.

    Returns
    -------
    tuple
        ``(col_features, (edge_indices, edge_values), row_features, {})``
        where ``edge_indices`` has shape ``(2, nnz)`` and
        ``edge_values`` has shape ``(nnz,)``.
    """
    if model.getStage() != PY_SCIP_STAGE.SOLVING:
        raise RuntimeError(
            f"Model must be in SOLVING stage to extract observations, got stage {model.getStage()!r}."
        )
    kwargs: dict = dict(suppress_warnings=suppress_warnings)
    if static_only:
        kwargs["dynamic_col_features"] = ()
        kwargs["dynamic_row_features"] = ()
    if static_col_features is not None:
        kwargs["static_col_features"] = static_col_features
    if dynamic_col_features is not None:
        kwargs["dynamic_col_features"] = dynamic_col_features
    if static_row_features is not None:
        kwargs["static_row_features"] = static_row_features
    if dynamic_row_features is not None:
        kwargs["dynamic_row_features"] = dynamic_row_features

    obs = cls(**kwargs)
    result = obs._extract_tuple(
        model,
        done=False,
        prev_var_features=prev_col_features,
        prev_row_features=prev_row_features,
        prev_edge_features=prev_edge_features,
    )
    if result is None:
        raise RuntimeError("Model is not in SOLVING stage.")
    variable_features, row_features, (edge_indices, edge_values) = result
    return (variable_features, (edge_indices, edge_values), row_features, {})

NodeBipartiteSCIP

NodeBipartiteSCIP

Bipartite graph observation using PySCIPOpt's built-in C implementation.

Returns the LP relaxation as a bipartite graph between constraint rows and variable columns, following Gasse et al. (NeurIPS 2019).

Source code in gyozas/observations/node_bipartite_scip.py
class NodeBipartiteSCIP:
    """Bipartite graph observation using PySCIPOpt's built-in C implementation.

    Returns the LP relaxation as a bipartite graph between constraint rows and
    variable columns, following Gasse et al. (NeurIPS 2019).
    """

    def __init__(self) -> None:
        pass

    def reset(self, model: Model) -> None:
        pass

    def extract(self, model: Model, done: bool) -> BipartiteGraph:
        obs = model.getBipartiteGraphRepresentation()
        variable_features = np.array(obs[0], dtype=np.float64)
        row_features = np.array(obs[2], dtype=np.float64)
        edge_indices = np.array([[x[1], x[0]] for x in obs[1]], dtype=np.int32).T
        edge_features = np.array([x[2] for x in obs[1]], dtype=np.float64)

        bg = BipartiteGraph(
            variable_features=variable_features,
            row_features=row_features,
            edge_features=EdgeFeatures(indices=edge_indices, values=edge_features),
        )
        return bg

Pseudocosts

Pseudocosts

Pseudocost scores for LP branching candidates.

Mirrors ecole.observation.Pseudocosts, implemented in pure pyscipopt.

Pseudocosts are estimated incrementally from the branching history observed during solving. For each branching decision recorded via node.getParentBranchings(), the per-variable up/down pseudocosts are updated as::

pseudocost[dir] = Σ obj_delta_k / Σ |frac_delta_k|

where obj_delta is the LP bound improvement at the child node and frac_delta is the LP fractionality consumed by the branching.

Returns a 1-D array of shape (n_vars,) with the branch score for each LP candidate (NaN for non-candidates), or None outside the solving stage.

Note

Only branchings observed while extract is called are tracked. Branchings that SCIP performs between two calls (e.g. at nodes not visited by the agent) are missed; those variables fall back to the _INIT_PSEUDOCOST prior.

Source code in gyozas/observations/pseudo_cost.py
class Pseudocosts:
    """Pseudocost scores for LP branching candidates.

    Mirrors ``ecole.observation.Pseudocosts``, implemented in pure pyscipopt.

    Pseudocosts are estimated incrementally from the branching history observed
    during solving.  For each branching decision recorded via
    ``node.getParentBranchings()``, the per-variable up/down pseudocosts are
    updated as::

        pseudocost[dir] = Σ obj_delta_k / Σ |frac_delta_k|

    where *obj_delta* is the LP bound improvement at the child node and
    *frac_delta* is the LP fractionality consumed by the branching.

    Returns a 1-D array of shape ``(n_vars,)`` with the branch score for each
    LP candidate (``NaN`` for non-candidates), or ``None`` outside the solving
    stage.

    Note
    ----
    Only branchings observed while ``extract`` is called are tracked.  Branchings
    that SCIP performs between two calls (e.g. at nodes not visited by the agent)
    are missed; those variables fall back to the ``_INIT_PSEUDOCOST`` prior.
    """

    def __init__(self) -> None:
        self._pseudo_down: dict[int, list[float]] = {}  # var_idx -> [sum_obj, sum_frac]
        self._pseudo_up: dict[int, list[float]] = {}
        self._node_lp_vals: dict[int, dict[int, float]] = {}  # node_num -> {var_idx: lp_val}

    def reset(self, _model: Model) -> None:
        self._pseudo_down.clear()
        self._pseudo_up.clear()
        self._node_lp_vals.clear()

    def extract(self, model: Model, done: bool) -> np.ndarray | None:
        if done or model.getStage() != PY_SCIP_STAGE.SOLVING:
            return None

        self._update_from_current_node(model)

        cands, lp_vals, _, n_cands, _, _ = model.getLPBranchCands()
        cands = cands[:n_cands]
        lp_vals = lp_vals[:n_cands]

        # Cache LP solution values for this node so children can look them up.
        node = model.getCurrentNode()
        if node is not None:
            self._node_lp_vals[node.getNumber()] = {
                var.getCol().getLPPos(): lp_val for var, lp_val in zip(cands, lp_vals, strict=False)
            }

        max_idx = max(var.getCol().getLPPos() for var in cands) + 1
        scores = np.full(max_idx, np.nan, dtype=np.float64)
        for var, lp_val in zip(cands, lp_vals, strict=False):
            scores[var.getCol().getLPPos()] = self._score(model, var, lp_val)

        return scores

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _update_from_current_node(self, model: Model) -> None:
        """Update pseudocost estimates using the branching that created the current node."""
        node = model.getCurrentNode()
        if node is None or node.getNParentBranchings() == 0:
            return
        branching = node.getParentBranchings()
        if branching is None:
            return

        parent = node.getParent()
        if parent is None:
            return
        parent_lp_vals = self._node_lp_vals.get(parent.getNumber(), {})
        if not parent_lp_vals:
            return

        parent_lb = parent.getLowerbound()
        child_lp_obj = model.getLPObjVal()
        obj_delta = max(0.0, child_lp_obj - parent_lb)

        vars_, _, btypes = branching
        for var, btype in zip(vars_, btypes, strict=False):
            idx = var.getCol().getLPPos()
            parent_lp_val = parent_lp_vals.get(idx)
            if parent_lp_val is None:
                continue
            frac = model.feasFrac(parent_lp_val)
            if btype == _BOUNDTYPE_UPPER:  # upper bound set → branch down
                val_delta = max(frac, _EPS)
                entry = self._pseudo_down.setdefault(idx, [0.0, 0.0])
            else:  # lower bound set → branch up
                val_delta = max(1.0 - frac, _EPS)
                entry = self._pseudo_up.setdefault(idx, [0.0, 0.0])
            entry[0] += obj_delta
            entry[1] += val_delta

    def _pseudocost(self, idx: int, down: bool) -> float:
        data = self._pseudo_down.get(idx) if down else self._pseudo_up.get(idx)
        if data is None or data[1] < _EPS:
            return _INIT_PSEUDOCOST
        return data[0] / data[1]

    def _score(self, model: Model, var, lp_val: float) -> float:
        frac = model.feasFrac(lp_val)
        idx = var.getCol().getLPPos()
        downgain = self._pseudocost(idx, down=True) * frac
        upgain = self._pseudocost(idx, down=False) * (1.0 - frac)
        return model.getBranchScoreMultiple(var, [downgain, upgain])

StrongBranchingScores

StrongBranchingScores

Full strong branching scores for LP branching candidates.

Mirrors ecole.observation.StrongBranchingScores, implemented in pure pyscipopt.

For each LP branching candidate the observation temporarily enters probing mode, solves the down-branch LP (var <= floor(lp_val)) and the up-branch LP (var >= ceil(lp_val)), then combines the bound improvements into a branch score via model.getBranchScoreMultiple.

The probing LPs are solved idempotently (no side-effects on SCIP state).

Returns a 1-D array of shape (n_vars,) with the score for each LP candidate (NaN for non-candidates), or None outside the solving stage.

Parameters:

Name Type Description Default
pseudo_candidates bool

If False (default), score LP branching candidates (fractional vars). If True, score pseudo-branching candidates (all non-fixed discrete vars).

False
itlim int

LP iteration limit for each strong-branching solve. -1 = no limit.

-1
Source code in gyozas/observations/strong_branching_scores.py
class StrongBranchingScores:
    """Full strong branching scores for LP branching candidates.

    Mirrors ``ecole.observation.StrongBranchingScores``, implemented in pure pyscipopt.

    For each LP branching candidate the observation temporarily enters probing
    mode, solves the down-branch LP (``var <= floor(lp_val)``) and the up-branch
    LP (``var >= ceil(lp_val)``), then combines the bound improvements into a
    branch score via ``model.getBranchScoreMultiple``.

    The probing LPs are solved idempotently (no side-effects on SCIP state).

    Returns a 1-D array of shape ``(n_vars,)`` with the score for each LP
    candidate (``NaN`` for non-candidates), or ``None`` outside the solving stage.

    Parameters
    ----------
    pseudo_candidates
        If ``False`` (default), score LP branching candidates (fractional vars).
        If ``True``, score pseudo-branching candidates (all non-fixed discrete vars).
    itlim
        LP iteration limit for each strong-branching solve. -1 = no limit.
    """

    def __init__(self, pseudo_candidates: bool = False, itlim: int = -1) -> None:
        self.pseudo_candidates = pseudo_candidates
        self.itlim = itlim

    def reset(self, _model: Model) -> None:
        # No per-episode state to reset.
        pass

    def _probe_bound(self, model: Model, var, lp_obj: float, inf: float, down: bool) -> float:
        """Probe one branch direction and return the bound improvement (gain).

        Manages its own probing session so it can be called independently.
        Returns ``float('nan')`` on LP error, ``inf`` on cutoff, otherwise
        ``max(0, new_lp_obj - lp_obj)``.
        """
        lp_val = var.getLPSol()
        model.startProbing()
        try:
            model.newProbingNode()
            if down:
                model.chgVarUbProbing(var, math.floor(lp_val))
            else:
                model.chgVarLbProbing(var, math.ceil(lp_val))
            model.constructLP()
            lperror, cutoff = model.solveProbingLP(self.itlim)
            if lperror:
                return float("nan")
            elif cutoff:
                return inf
            else:
                return max(0.0, model.getLPObjVal() - lp_obj)
        finally:
            model.endProbing()

    def extract(self, model: Model, done: bool) -> np.ndarray | None:
        if done or model.getStage() != PY_SCIP_STAGE.SOLVING:
            return None

        if self.pseudo_candidates:
            cands, n_cands, _ = model.getPseudoBranchCands()
            cands = cands[:n_cands]
            lp_vals = [var.getLPSol() for var in cands]
        else:
            cands, lp_vals, _, n_cands, _, _ = model.getLPBranchCands()
            cands = cands[:n_cands]
            lp_vals = lp_vals[:n_cands]
            lp_vals = [val for val, var in zip(lp_vals, cands, strict=False) if not is_fixed_domain(var)]
            cands = [var for var in cands if not is_fixed_domain(var)]

        if not cands:
            return np.empty(0, dtype=np.float64)

        max_idx = max(var.getCol().getLPPos() for var in cands) + 1
        scores = np.full(max_idx, np.nan, dtype=np.float64)

        lp_obj = model.getLPObjVal()
        inf = model.infinity()

        model.startProbing()
        try:
            for var, lp_val in zip(cands, lp_vals, strict=False):
                # Down branch
                model.newProbingNode()
                model.chgVarUbProbing(var, math.floor(lp_val))
                model.constructLP()
                lperror, cutoff = model.solveProbingLP(self.itlim)
                if lperror:
                    downgain = float("nan")
                elif cutoff:
                    downgain = inf
                else:
                    downgain = max(0.0, model.getLPObjVal() - lp_obj)
                model.backtrackProbing(0)

                # Up branch
                model.newProbingNode()
                model.chgVarLbProbing(var, math.ceil(lp_val))
                model.constructLP()
                lperror, cutoff = model.solveProbingLP(self.itlim)
                if lperror:
                    upgain = float("nan")
                elif cutoff:
                    upgain = inf
                else:
                    upgain = max(0.0, model.getLPObjVal() - lp_obj)
                model.backtrackProbing(0)

                if not (math.isnan(downgain) or math.isnan(upgain)):
                    scores[var.getCol().getLPPos()] = model.getBranchScoreMultiple(var, [downgain, upgain])
        finally:
            model.endProbing()

        return scores

MetaObservation

MetaObservation

Combines multiple observation functions into a single composite observation.

Parameters:

Name Type Description Default
observations list | dict | tuple

A list, tuple, or dict of observation functions. If a dict is provided, extract() returns a dict keyed by the same names. Otherwise, it returns a container of the same type as the input.

required
Source code in gyozas/observations/meta_observation.py
class MetaObservation:
    """Combines multiple observation functions into a single composite observation.

    Parameters
    ----------
    observations
        A list, tuple, or dict of observation functions. If a dict is provided,
        ``extract()`` returns a dict keyed by the same names. Otherwise, it returns
        a container of the same type as the input.
    """

    def __init__(self, observations: list | dict | tuple) -> None:
        self.observations = observations

    def reset(self, model: Model) -> None:
        if isinstance(self.observations, dict):
            for _, obs in self.observations.items():
                obs.reset(model)
        else:
            for obs in self.observations:
                obs.reset(model)

    def extract(self, model: Model, done: bool) -> list | dict | tuple:
        if isinstance(self.observations, dict):
            return {name: obs.extract(model, done) for name, obs in self.observations.items()}
        else:
            type_ = type(self.observations)
            return type_(obs.extract(model, done) for obs in self.observations)