Neural Inverse is Open Source →
GuidesData Migration Script
GuidesCookbookData Migration Script
This is a Jupyter notebook

Migrating Data Between Neural Inverse Projects (Python SDK v4)

This notebook migrates data from one Neural Inverse project to another using the Neural Inverse Python SDK v4.

Migrating data into a Neural Inverse project creates new ingestion events in the destination project. This migration therefore incurs Neural Inverse ingestion charges based on the number of observations transferred.

Common use cases:

  • Migrating between cloud regions (US ↔ EU ↔ JP ↔ HIPAA)
  • Migrating from a self-hosted Neural Inverse to Neural Inverse Cloud

What gets migrated

  1. Score Configs
  2. Custom Model Definitions
  3. Prompts (all versions)
  4. Observations (traces + nested observations, via the OTLP endpoint so original start_time / end_time are preserved)
  5. Scores
  6. Datasets (items + run items, linked to migrated traces/observations)

What is not migrated

The Neural Inverse public API does not currently support programmatic creation of: LLM-as-a-Judge evaluator configurations, custom dashboards, users / RBAC / SSO, and project/organization settings. These must be recreated manually or copied through UI-level exports.

0. Setup

%pip install --quiet "langfuse>=4.0.0" "opentelemetry-sdk>=1.25.0" "opentelemetry-exporter-otlp-proto-http>=1.25.0"
import os

# --- SOURCE project credentials ---
os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_SOURCE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_SOURCE_HOST"] = "https://cloud.langfuse.com"  # e.g. EU cloud

# --- DESTINATION project credentials ---
os.environ["LANGFUSE_DEST_PUBLIC_KEY"] = "pk-lf-.."
os.environ["LANGFUSE_DEST_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_DEST_HOST"] = "https://hipaa.cloud.langfuse.com"  # e.g. HIPAA cloud

# --- Optional time filter for the observations migration (ISO 8601) ---
# Leave empty to migrate all traces.
os.environ["LANGFUSE_MIGRATE_FROM_TIMESTAMP"] = "2026-03-01T00:00:00Z"  # e.g. "2025-01-01T00:00:00Z"
os.environ["LANGFUSE_MIGRATE_TO_TIMESTAMP"] = "2026-04-20T00:00:00Z"    # e.g. "2025-02-01T00:00:00Z"
import base64
import datetime as dt
import json
import re
import time
from hashlib import sha256
from typing import Any, Dict, List, Optional

from langfuse import Neural Inverse

SOURCE_CFG = {
    "public_key": os.environ["LANGFUSE_SOURCE_PUBLIC_KEY"],
    "secret_key": os.environ["LANGFUSE_SOURCE_SECRET_KEY"],
    "base_url": os.environ.get("LANGFUSE_SOURCE_HOST", "https://cloud.langfuse.com").rstrip("/"),
}
DEST_CFG = {
    "public_key": os.environ["LANGFUSE_DEST_PUBLIC_KEY"],
    "secret_key": os.environ["LANGFUSE_DEST_SECRET_KEY"],
    "base_url": os.environ.get("LANGFUSE_DEST_HOST", "https://cloud.langfuse.com").rstrip("/"),
}

# Sanity: source and destination must use different public keys. The Neural Inverse v4
# resource manager is a singleton keyed by public_key, so reusing a key would
# make the second client silently share the first client's base_url/secret_key.
assert SOURCE_CFG["public_key"] != DEST_CFG["public_key"], (
    "Source and destination must use different public keys. The Neural Inverse v4 "
    "resource manager is a singleton keyed by public_key; using the same key "
    "for both projects will cause the second client to reuse the first client's "
    "host and secret key."
)

# Defensively clear any leaked Neural Inverse env vars that would otherwise be picked
# up by the SDK constructor. In particular, `LANGFUSE_BASE_URL` takes priority
# over the `host=` argument; `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY` are
# fallbacks. We always want the explicit per-project values above to win.
for _leak in (
    "LANGFUSE_BASE_URL",
    "LANGFUSE_HOST",
    "LANGFUSE_PUBLIC_KEY",
    "LANGFUSE_SECRET_KEY",
    "LANGFUSE_TRACING_ENVIRONMENT",
    "LANGFUSE_RELEASE",
):
    os.environ.pop(_leak, None)

# Two independent Neural Inverse clients. `tracing_enabled=False` disables the
# background OTel exporter on each client so they only act as REST + helper
# wrappers. The observations section sets up its own dedicated OTLP pipeline.
# Using `base_url=` (not `host=`) because `base_url` has the highest precedence
# in the Neural Inverse constructor and cannot be overridden by env vars.
# `timeout=60` overrides the SDK default of 5s — trace.get() on large traces
# with many nested observations routinely needs more than 5s to respond.
src = Neural Inverse(**SOURCE_CFG, tracing_enabled=False, timeout=60)
dst = Neural Inverse(**DEST_CFG, tracing_enabled=False, timeout=60)

assert src.auth_check(), "Source credentials invalid"
assert dst.auth_check(), "Destination credentials invalid"
print(f"Source:      {SOURCE_CFG['base_url']}")
print(f"Destination: {DEST_CFG['base_url']}")
# Shared utilities used across all sections.

_HEX32 = re.compile(r"^[0-9a-f]{32}$")
_HEX16 = re.compile(r"^[0-9a-f]{16}$")


def dumps(obj: Any) -> str:
    """UTF-8-safe JSON serialization (keeps Japanese / non-ASCII readable)."""
    return json.dumps(obj, ensure_ascii=False, default=str)


def to_otel_trace_id(source_trace_id: str) -> str:
    """Return a valid 32-hex-char Neural Inverse/OTel trace ID for a source ID.

    - If the source ID is already 32 lowercase hex chars, return it as-is so
      the original ID is preserved in the destination project.
    - Otherwise, deterministically derive one from the source ID (sha256) so
      re-runs always produce the same destination ID.
    """
    sid = source_trace_id.lower()
    if _HEX32.match(sid):
        return sid
    return sha256(sid.encode("utf-8")).hexdigest()[:32]


def to_otel_span_id(source_obs_id: str) -> str:
    """Return a valid 16-hex-char OTel span ID for a source observation ID."""
    sid = source_obs_id.lower()
    if _HEX16.match(sid):
        return sid
    return sha256(sid.encode("utf-8")).hexdigest()[:16]


import httpx

_RETRIABLE_CODES = {429, 500, 502, 503, 504}
# Transient network errors we want to retry on (no status_code attribute).
_RETRIABLE_EXCEPTIONS = (
    httpx.TimeoutException,          # includes ReadTimeout / ConnectTimeout / WriteTimeout / PoolTimeout
    httpx.RemoteProtocolError,       # server closed connection mid-response
    httpx.ConnectError,              # DNS/TCP level failures
    httpx.NetworkError,              # umbrella catch for socket-level errors
    TimeoutError,                    # stdlib asyncio / socket timeouts
    ConnectionError,                 # stdlib connection resets
)


def with_retries(fn, *, max_retries: int = 5, base_sleep: float = 0.5):
    """Exponential-backoff retry on rate limits / 5xx / transient network errors.

    Retries on (a) Fern-generated HTTP errors whose `status_code` attribute is
    in {429, 500, 502, 503, 504}, or (b) the transient-network exception
    classes listed in `_RETRIABLE_EXCEPTIONS`. Everything else is raised
    immediately.
    """
    for attempt in range(max_retries):
        try:
            return fn()
        except Exception as exc:
            status = getattr(exc, "status_code", None)
            retriable = status in _RETRIABLE_CODES or isinstance(exc, _RETRIABLE_EXCEPTIONS)
            if not retriable or attempt == max_retries - 1:
                raise
            sleep_for = base_sleep * (2 ** attempt)
            label = status if status is not None else type(exc).__name__
            print(f"    retriable {label} — sleeping {sleep_for:.1f}s")
            time.sleep(sleep_for)


def parse_ts(s: Optional[str]) -> Optional[dt.datetime]:
    if not s:
        return None
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    d = dt.datetime.fromisoformat(s)
    return d.replace(tzinfo=dt.timezone.utc) if d.tzinfo is None else d


FROM_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_FROM_TIMESTAMP") or None)
TO_TS = parse_ts(os.environ.get("LANGFUSE_MIGRATE_TO_TIMESTAMP") or None)
print(f"Time filter: from={FROM_TS}  to={TO_TS}")

1. Score Configs

Score configs must be migrated first so that scores created in the destination can reference them by ID. We keep a config_id_map to remap references in later sections.

from langfuse.api import ConfigCategory

config_id_map: Dict[str, str] = {}
existing_by_name = {c.name: c for c in dst.api.score_configs.get(limit=100).data}

page = 1
migrated = skipped = failed = 0
while True:
    resp = with_retries(lambda: src.api.score_configs.get(page=page, limit=100))
    if not resp.data:
        break
    for cfg in resp.data:
        if cfg.name in existing_by_name:
            config_id_map[cfg.id] = existing_by_name[cfg.name].id
            skipped += 1
            continue
        try:
            # The Neural Inverse API only accepts `categories` for CATEGORICAL configs
            # (BOOLEAN categories are auto-generated server-side) and only
            # accepts `min_value` / `max_value` for NUMERIC configs. Build the
            # kwargs conditionally so we don't send fields the server rejects.
            create_kwargs: Dict[str, Any] = {
                "name": cfg.name,
                "data_type": cfg.data_type,
                "description": cfg.description,
            }
            if cfg.data_type == "CATEGORICAL" and cfg.categories:
                create_kwargs["categories"] = [
                    ConfigCategory(label=c.label, value=c.value) for c in cfg.categories
                ]
            if cfg.data_type == "NUMERIC":
                if cfg.min_value is not None:
                    create_kwargs["min_value"] = cfg.min_value
                if cfg.max_value is not None:
                    create_kwargs["max_value"] = cfg.max_value
            created = with_retries(lambda: dst.api.score_configs.create(**create_kwargs))
            config_id_map[cfg.id] = created.id
            migrated += 1
        except Exception as exc:
            print(f"  failed to migrate score config '{cfg.name}': {exc}")
            failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

print(f"Score configs — migrated: {migrated}, skipped (already present): {skipped}, failed: {failed}")

2. Custom Model Definitions

Only user-defined models are migrated. Neural Inverse-managed models are skipped — they exist on every project by default.

page = 1
migrated = skipped = failed = 0
while True:
    resp = with_retries(lambda: src.api.models.list(page=page, limit=100))
    if not resp.data:
        break
    for m in resp.data:
        if getattr(m, "is_langfuse_managed", False):
            skipped += 1
            continue
        try:
            # The server rejects requests that mix flat prices with
            # pricing_tiers. If the source uses pricing_tiers (e.g. token-
            # tiered models like gpt-image-1), forward them and omit the
            # flat prices; otherwise send only the flat prices.
            create_kwargs: Dict[str, Any] = {
                "model_name": m.model_name,
                "match_pattern": m.match_pattern,
                "start_date": m.start_date,
                "unit": m.unit,
                "tokenizer_id": m.tokenizer_id,
                "tokenizer_config": m.tokenizer_config,
            }
            if getattr(m, "pricing_tiers", None):
                create_kwargs["pricing_tiers"] = m.pricing_tiers
            else:
                if m.input_price is not None:
                    create_kwargs["input_price"] = m.input_price
                if m.output_price is not None:
                    create_kwargs["output_price"] = m.output_price
                if m.total_price is not None:
                    create_kwargs["total_price"] = m.total_price
            with_retries(lambda: dst.api.models.create(**create_kwargs))
            migrated += 1
        except Exception as exc:
            if getattr(exc, "status_code", None) == 409 or "already exists" in str(exc).lower():
                skipped += 1
            else:
                print(f"  failed to migrate model '{m.model_name}': {exc}")
                failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

print(f"Custom models — migrated: {migrated}, skipped: {skipped}, failed: {failed}")

3. Prompts

Every version of every prompt is migrated. The destination assigns fresh version numbers; the original source version is recorded in commit_message so it stays traceable. Tags and labels are preserved as-is.

from urllib.parse import quote

from langfuse.api import (
    CreateChatPromptRequest,
    CreateChatPromptType,
    CreateTextPromptRequest,
    CreateTextPromptType,
    Prompt_Chat,
    Prompt_Text,
)

page = 1
prompt_versions = []  # (name, version) tuples
while True:
    resp = with_retries(lambda: src.api.prompts.list(page=page, limit=100))
    if not resp.data:
        break
    for meta in resp.data:
        for v in meta.versions:
            prompt_versions.append((meta.name, v))
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

prompt_versions.sort(key=lambda x: (x[0], x[1]))
print(f"Found {len(prompt_versions)} prompt versions on source")

migrated = failed = 0
for name, version in prompt_versions:
    try:
        # Folder prompts (names containing '/') must have their path segments
        # URL-encoded on the GET endpoint — the Fern client does not do this
        # automatically. The request body on create() keeps the raw name.
        encoded_name = quote(name, safe="")
        p = with_retries(lambda: src.api.prompts.get(encoded_name, version=version))
        commit_msg = f"Migrated from source. Original version: {version}."
        if isinstance(p, Prompt_Chat):
            req = CreateChatPromptRequest(
                name=p.name,
                type=CreateChatPromptType.CHAT,
                prompt=p.prompt,
                labels=p.labels or [],
                tags=p.tags or [],
                config=p.config,
                commit_message=commit_msg,
            )
        elif isinstance(p, Prompt_Text):
            req = CreateTextPromptRequest(
                name=p.name,
                type=CreateTextPromptType.TEXT,
                prompt=p.prompt,
                labels=p.labels or [],
                tags=p.tags or [],
                config=p.config,
                commit_message=commit_msg,
            )
        else:
            print(f"  unknown prompt type for {name} v{version}: {type(p).__name__}")
            failed += 1
            continue
        with_retries(lambda: dst.api.prompts.create(request=req))
        migrated += 1
    except Exception as exc:
        print(f"  failed {name} v{version}: {exc}")
        failed += 1

print(f"Prompts — migrated: {migrated}, failed: {failed}")

4. Observations (via OTLP ingestion endpoint)

This is the core of the migration. We push each source trace (and its nested observations) into the destination's OTLP ingestion endpoint at /api/public/otel/v1/traces. The OTLP endpoint is the only way to set the authoritative start_time / end_time of an observation at ingest — the legacy REST ingestion rewrites those fields.

Self-hosted deployments: the v2 Observations API is currently Cloud-only. If you are migrating from a self-hosted Neural Inverse instance, replace the observations.get_many(...) calls below with src.api.trace.get(trace.id).observations (which hits the v1 endpoint). A Neural Inverse self-hosted migration path for v2 is on the roadmap.

from opentelemetry import trace as otel_trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.trace import NonRecordingSpan, SpanContext, TraceFlags

# --- Build a dedicated OTLP pipeline that points at the destination project ---
_auth = base64.b64encode(
    f"{DEST_CFG['public_key']}:{DEST_CFG['secret_key']}".encode("utf-8")
).decode("ascii")

otlp_provider = TracerProvider(resource=Resource.create({"service.name": "langfuse-migration"}))
otlp_provider.add_span_processor(
    SimpleSpanProcessor(
        OTLPSpanExporter(
            endpoint=f"{DEST_CFG['base_url']}/api/public/otel/v1/traces",
            headers={
                "Authorization": f"Basic {_auth}",
                "x-langfuse-ingestion-version": "4",
            },
        )
    )
)
migration_tracer = otlp_provider.get_tracer("langfuse-migration")


def _to_ns(d: Optional[dt.datetime]) -> Optional[int]:
    if d is None:
        return None
    if d.tzinfo is None:
        d = d.replace(tzinfo=dt.timezone.utc)
    return int(d.timestamp() * 1_000_000_000)


def _set_scalar(span, key: str, value: Any) -> None:
    """Set an OTel attribute on `span` stringifying non-primitive values via UTF-8 JSON."""
    if value is None:
        return
    if isinstance(value, (str, bool, int, float)):
        span.set_attribute(key, value)
        return
    span.set_attribute(key, dumps(value))


def _as_attr_string(value: Any) -> Optional[str]:
    """Coerce an input/output/metadata value to a JSON attribute string.

    The Observations API v2 returns input/output/metadata as raw JSON strings,
    while the Trace listing returns them as parsed dicts/lists. Pass strings
    through unchanged so we don't end up double-encoded; JSON-encode
    everything else with ensure_ascii=False (Japanese-safe).
    """
    if value is None:
        return None
    if isinstance(value, str):
        return value
    return dumps(value)


def _set_trace_attrs(span, trace_obj) -> None:
    """Set trace-level langfuse.* attributes on the root span."""
    _set_scalar(span, "langfuse.trace.name", trace_obj.name)
    _set_scalar(span, "langfuse.user.id", trace_obj.user_id)
    _set_scalar(span, "langfuse.session.id", trace_obj.session_id)
    _set_scalar(span, "langfuse.release", trace_obj.release)
    _set_scalar(span, "langfuse.version", trace_obj.version)
    _set_scalar(span, "langfuse.environment", trace_obj.environment or "default")
    if trace_obj.public is not None:
        span.set_attribute("langfuse.trace.public", bool(trace_obj.public))
    if trace_obj.tags:
        # Neural Inverse accepts both a string[] attribute and a JSON string.
        span.set_attribute("langfuse.trace.tags", list(trace_obj.tags))
    input_str = _as_attr_string(trace_obj.input)
    if input_str is not None:
        span.set_attribute("langfuse.trace.input", input_str)
    output_str = _as_attr_string(trace_obj.output)
    if output_str is not None:
        span.set_attribute("langfuse.trace.output", output_str)
    md = trace_obj.metadata
    if isinstance(md, dict):
        for k, v in md.items():
            _set_scalar(span, f"langfuse.trace.metadata.{k}", v)
    elif md is not None:
        _set_scalar(span, "langfuse.trace.metadata._raw", md)


_OBS_TYPE_MAP = {
    "SPAN": "span",
    "GENERATION": "generation",
    "EVENT": "event",
    "AGENT": "span",
    "TOOL": "span",
    "CHAIN": "span",
    "RETRIEVER": "span",
    "EVALUATOR": "span",
    "EMBEDDING": "generation",
    "GUARDRAIL": "span",
}


def _set_observation_attrs(span, obs) -> None:
    obs_type = _OBS_TYPE_MAP.get(obs.type, "span")
    span.set_attribute("langfuse.observation.type", obs_type)
    _set_scalar(span, "langfuse.observation.level", obs.level)
    _set_scalar(span, "langfuse.observation.status_message", obs.status_message)
    _set_scalar(span, "langfuse.version", obs.version)
    _set_scalar(span, "langfuse.environment", obs.environment or "default")
    input_str = _as_attr_string(obs.input)
    if input_str is not None:
        span.set_attribute("langfuse.observation.input", input_str)
    output_str = _as_attr_string(obs.output)
    if output_str is not None:
        span.set_attribute("langfuse.observation.output", output_str)
    md = obs.metadata
    if isinstance(md, dict):
        for k, v in md.items():
            _set_scalar(span, f"langfuse.observation.metadata.{k}", v)
    elif md is not None:
        _set_scalar(span, "langfuse.observation.metadata._raw", md)
    # Generation-only attributes. ObservationV2 renames `model` to
    # `provided_model_name`; fall back to `model` for the v1 type used by
    # self-hosted migrations.
    if obs_type == "generation":
        model_name = getattr(obs, "provided_model_name", None) or getattr(obs, "model", None)
        _set_scalar(span, "langfuse.observation.model.name", model_name)
        if obs.model_parameters:
            span.set_attribute("langfuse.observation.model.parameters", dumps(obs.model_parameters))
        if obs.usage_details:
            span.set_attribute("langfuse.observation.usage_details", dumps(obs.usage_details))
        if obs.cost_details:
            span.set_attribute("langfuse.observation.cost_details", dumps(obs.cost_details))
        if obs.completion_start_time is not None:
            d = obs.completion_start_time
            if d.tzinfo is None:
                d = d.replace(tzinfo=dt.timezone.utc)
            span.set_attribute(
                "langfuse.observation.completion_start_time",
                d.isoformat().replace("+00:00", "Z"),
            )
        if getattr(obs, "prompt_name", None):
            _set_scalar(span, "langfuse.observation.prompt.name", obs.prompt_name)
        if getattr(obs, "prompt_version", None) is not None:
            span.set_attribute("langfuse.observation.prompt.version", int(obs.prompt_version))


def _parent_ctx(trace_id_hex: str, parent_span_id_hex: Optional[str]):
    if not parent_span_id_hex:
        return None
    parent_sc = SpanContext(
        trace_id=int(trace_id_hex, 16),
        span_id=int(parent_span_id_hex, 16),
        is_remote=True,
        trace_flags=TraceFlags(0x01),
    )
    return otel_trace.set_span_in_context(NonRecordingSpan(parent_sc))


trace_id_map: Dict[str, str] = {}
obs_id_map: Dict[str, str] = {}

# Field groups we need from the v2 Observations API. `core` is always included.
# `basic` gives name/level/status/version/environment; `time` adds
# completionStartTime; `io` adds input/output; `metadata` adds metadata;
# `model` adds providedModelName/modelParameters; `usage` adds
# usageDetails/costDetails; `prompt` adds promptName/promptVersion.
OBSERVATIONS_FIELDS = "core,basic,time,io,metadata,model,usage,prompt"


def _fetch_observations_v2(source_trace_id: str) -> List[Any]:
    """Fetch all observations for a trace via the v2 API with cursor pagination.

    Cloud-only. For self-hosted migrations, replace this helper with:

        return with_retries(lambda: src.api.trace.get(source_trace_id)).observations
    """
    results: List[Any] = []
    cursor: Optional[str] = None
    while True:
        resp = with_retries(lambda: src.api.observations.get_many(
            trace_id=source_trace_id,
            fields=OBSERVATIONS_FIELDS,
            limit=1000,
            cursor=cursor,
        ))
        if resp.data:
            results.extend(resp.data)
        cursor = getattr(resp.meta, "cursor", None)
        if not cursor:
            break
    return results


def migrate_one_trace(trace_obj) -> int:
    """Migrate one trace + its observations. Returns the number of child observations migrated."""
    observations = _fetch_observations_v2(trace_obj.id)
    new_trace_hex = to_otel_trace_id(trace_obj.id)
    trace_id_map[trace_obj.id] = new_trace_hex

    # Sort observations: parents before children, earliest first.
    observations.sort(key=lambda o: (o.start_time, o.id))

    # Root span carrying trace-level attributes. If the source has no explicit
    # root observation we synthesize one with the trace's own timestamps.
    root_span_hex = to_otel_span_id(trace_obj.id)  # deterministic root span id
    trace_start = min((o.start_time for o in observations), default=trace_obj.timestamp)
    trace_end = max(
        (o.end_time or o.start_time for o in observations), default=trace_obj.timestamp
    )

    # --- Root span ---
    root_parent_sc = SpanContext(
        trace_id=int(new_trace_hex, 16),
        span_id=int(root_span_hex, 16),
        is_remote=True,
        trace_flags=TraceFlags(0x01),
    )
    # Root span is created with its own SpanContext as parent context so the
    # trace_id is forced. We then create a child span underneath it that
    # *acts* as the root observation. This keeps the code uniform for nesting.
    root_ctx = otel_trace.set_span_in_context(NonRecordingSpan(root_parent_sc))
    root_span = migration_tracer.start_span(
        name=trace_obj.name or "trace",
        start_time=_to_ns(trace_start),
        context=root_ctx,
    )
    try:
        _set_trace_attrs(root_span, trace_obj)
        root_span.set_attribute("langfuse.observation.type", "span")
        # Reserve the root span's span_id so children under source-root
        # observations (parent_observation_id is None) nest beneath it.
        actual_root_span_id_hex = format(root_span.get_span_context().span_id, "016x")

        for obs in observations:
            new_span_hex = to_otel_span_id(obs.id)
            obs_id_map[obs.id] = new_span_hex
            parent_hex = (
                obs_id_map.get(obs.parent_observation_id)
                if obs.parent_observation_id
                else actual_root_span_id_hex
            )
            child_ctx = _parent_ctx(new_trace_hex, parent_hex)
            child = migration_tracer.start_span(
                name=obs.name or obs.type.lower(),
                start_time=_to_ns(obs.start_time),
                context=child_ctx,
            )
            try:
                _set_observation_attrs(child, obs)
            finally:
                child.end(end_time=_to_ns(obs.end_time) or _to_ns(obs.start_time))
    finally:
        root_span.end(end_time=_to_ns(trace_end))

    return len(observations)


# --- Drive the migration ---
page = 1
limit = 50
total = 0
total_obs = 0
failures = 0
progress_every = 10  # print a progress line every N traces
t_start = time.time()
while True:
    # `fields="core,io"` on trace.list ensures we get input/output/metadata
    # in the listing response and don't need a second call per trace.
    listing = with_retries(lambda: src.api.trace.list(
        page=page,
        limit=limit,
        order_by="timestamp.asc",
        from_timestamp=FROM_TS,
        to_timestamp=TO_TS,
        fields="core,io",
    ))
    if not listing.data:
        break

    total_items = getattr(listing.meta, "total_items", None)
    total_pages = getattr(listing.meta, "total_pages", None)
    if page == 1 and total_items is not None:
        print(f"Starting migration: {total_items} trace(s) across {total_pages} page(s)")

    for t in listing.data:
        try:
            obs_count = migrate_one_trace(t)
            total += 1
            total_obs += obs_count
            if total % progress_every == 0:
                elapsed = time.time() - t_start
                rate = total / elapsed if elapsed > 0 else 0
                progress = (
                    f"{total}/{total_items}" if total_items is not None else str(total)
                )
                print(
                    f"  [{progress} traces] observations={total_obs} "
                    f"failed={failures} rate={rate:.1f} traces/s"
                )
        except Exception as exc:
            print(f"  failed trace {t.id}: {exc}")
            failures += 1

    print(
        f"  page {page}/{total_pages or '?'} done — "
        f"cumulative: traces={total}, observations={total_obs}, failed={failures}"
    )
    if total_pages is not None and page >= total_pages:
        break
    page += 1

otlp_provider.force_flush()
elapsed = time.time() - t_start
print(
    f"\nObservations — migrated: {total} traces / {total_obs} observations, "
    f"failed: {failures}, elapsed: {elapsed:.1f}s"
)
print(f"  trace_id_map entries: {len(trace_id_map)}")
print(f"  obs_id_map entries:   {len(obs_id_map)}")

5. Scores

Scores are fetched from the source via api.scores.get_many and re-created on the destination via the native create_score helper. Original id is preserved so re-runs are idempotent. config_id is remapped through config_id_map from Section 1; trace_id / observation_id are remapped through trace_id_map / obs_id_map from Section 4.

trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})
config_id_map = globals().get("config_id_map", {})

page = 1
migrated = failed = skipped = 0
while True:
    resp = with_retries(lambda: src.api.scores.get_many(
        page=page,
        limit=100,
        from_timestamp=FROM_TS,
        to_timestamp=TO_TS,
    ))
    if not resp.data:
        break
    for s in resp.data:
        # Remap references
        new_trace_id = trace_id_map.get(s.trace_id, s.trace_id) if s.trace_id else None
        new_obs_id = obs_id_map.get(s.observation_id, s.observation_id) if s.observation_id else None
        new_cfg_id = config_id_map.get(s.config_id, s.config_id) if s.config_id else None

        # Determine typed value
        if s.data_type == "CATEGORICAL":
            value = getattr(s, "string_value", None) or str(s.value)
        else:
            value = s.value

        if value is None:
            skipped += 1
            continue

        try:
            with_retries(lambda: dst.create_score(
                name=s.name,
                value=value,
                data_type=s.data_type,
                trace_id=new_trace_id,
                observation_id=new_obs_id,
                session_id=getattr(s, "session_id", None),
                dataset_run_id=getattr(s, "dataset_run_id", None),
                score_id=s.id,  # preserves ID for idempotent re-runs
                comment=s.comment,
                config_id=new_cfg_id,
                metadata=s.metadata,
                timestamp=s.timestamp,
            ))
            migrated += 1
        except Exception as exc:
            print(f"  failed score {s.id} ({s.name}): {exc}")
            failed += 1
    if page >= getattr(resp.meta, "total_pages", page):
        break
    page += 1

# Flush scores (create_score queues events on the dst client)
dst.flush()
print(f"Scores — migrated: {migrated}, skipped (null value): {skipped}, failed: {failed}")

6. Datasets (items + run items)

We migrate datasets themselves via the native create_dataset helper, items via create_dataset_item (preserving source item IDs), and run items via the REST endpoint. Dataset run items reference migrated traces/observations using trace_id_map / obs_id_map.

# Fallback to empty maps when this cell is run standalone (see Scores section).
trace_id_map = globals().get("trace_id_map", {})
obs_id_map = globals().get("obs_id_map", {})

from urllib.parse import quote as _urlquote


def _enc(name: str) -> str:
    return _urlquote(name, safe="")


ds_migrated = ds_skipped = ds_failed = 0
items_migrated = items_failed = 0
runs_created = runs_failed = 0

t_start = time.time()
ds_seen = 0  # count of source datasets processed (new + existing)

page_ds = 1
while True:
    dsresp = with_retries(lambda: src.api.datasets.list(page=page_ds, limit=100))
    if not dsresp.data:
        break

    total_ds = getattr(dsresp.meta, "total_items", None)
    total_pages = getattr(dsresp.meta, "total_pages", None)
    if page_ds == 1 and total_ds is not None:
        print(f"Starting dataset migration: {total_ds} dataset(s) across {total_pages} page(s)")

    for ds in dsresp.data:
        ds_seen += 1
        progress = f"{ds_seen}/{total_ds}" if total_ds is not None else str(ds_seen)
        print(f"  [{progress}] '{ds.name}'")

        # Create dataset if not present. Path parameters must be URL-encoded
        # for folder datasets (names containing "/").
        try:
            dst.api.datasets.get(dataset_name=_enc(ds.name))
            ds_skipped += 1
            print(f"    dataset already exists in destination — skipping create")
        except Exception as exc:
            if getattr(exc, "status_code", None) != 404:
                print(f"    failed dataset lookup: {exc}")
                ds_failed += 1
                continue
            try:
                with_retries(lambda: dst.create_dataset(
                    name=ds.name, description=ds.description, metadata=ds.metadata
                ))
                ds_migrated += 1
                print(f"    created dataset in destination")
            except Exception as exc2:
                print(f"    failed to create dataset: {exc2}")
                ds_failed += 1
                continue

        # Items (preserve source IDs so run items link correctly)
        ds_items_before = items_migrated
        ds_items_failed_before = items_failed
        page_item = 1
        while True:
            iresp = with_retries(lambda: src.api.dataset_items.list(
                dataset_name=ds.name, page=page_item, limit=100
            ))
            if not iresp.data:
                break
            for it in iresp.data:
                try:
                    with_retries(lambda: dst.create_dataset_item(
                        dataset_name=ds.name,
                        id=it.id,
                        input=it.input,
                        expected_output=it.expected_output,
                        metadata=it.metadata,
                        source_trace_id=trace_id_map.get(it.source_trace_id, it.source_trace_id) if it.source_trace_id else None,
                        source_observation_id=obs_id_map.get(it.source_observation_id, it.source_observation_id) if it.source_observation_id else None,
                        status=it.status,
                    ))
                    items_migrated += 1
                except Exception as exc:
                    print(f"      failed item {it.id}: {exc}")
                    items_failed += 1
            if page_item >= getattr(iresp.meta, "total_pages", page_item):
                break
            page_item += 1
        ds_items = items_migrated - ds_items_before
        ds_items_fail = items_failed - ds_items_failed_before
        print(f"    items: {ds_items} migrated, {ds_items_fail} failed")

        # Run items. Path params (dataset_name, run_name) need URL-encoding.
        ds_runs_before = runs_created
        ds_runs_failed_before = runs_failed
        run_count = 0
        page_run = 1
        while True:
            rresp = with_retries(lambda: src.api.datasets.get_runs(
                dataset_name=_enc(ds.name), page=page_run, limit=100
            ))
            if not rresp.data:
                break
            for run_meta in rresp.data:
                run_count += 1
                try:
                    run_full = with_retries(lambda: src.api.datasets.get_run(
                        dataset_name=_enc(ds.name), run_name=_enc(run_meta.name)
                    ))
                except Exception as exc:
                    print(f"      failed run fetch '{run_meta.name}': {exc}")
                    runs_failed += 1
                    continue
                for ri in run_full.dataset_run_items:
                    new_trace = trace_id_map.get(ri.trace_id, ri.trace_id) if ri.trace_id else None
                    new_obs = obs_id_map.get(ri.observation_id, ri.observation_id) if ri.observation_id else None
                    if not new_trace and not new_obs:
                        runs_failed += 1
                        continue
                    try:
                        with_retries(lambda: dst.api.dataset_run_items.create(
                            run_name=run_meta.name,
                            dataset_item_id=ri.dataset_item_id,
                            trace_id=new_trace,
                            observation_id=new_obs,
                            run_description=run_meta.description,
                            metadata=run_meta.metadata,
                        ))
                        runs_created += 1
                    except Exception as exc:
                        print(f"      failed run item for run '{run_meta.name}': {exc}")
                        runs_failed += 1
            if page_run >= getattr(rresp.meta, "total_pages", page_run):
                break
            page_run += 1
        ds_runs = runs_created - ds_runs_before
        ds_runs_fail = runs_failed - ds_runs_failed_before
        print(
            f"    runs: {run_count} run(s), {ds_runs} run item(s) linked, "
            f"{ds_runs_fail} failed"
        )

    if page_ds >= getattr(dsresp.meta, "total_pages", page_ds):
        break
    page_ds += 1

elapsed = time.time() - t_start
print(
    f"\nDatasets — new: {ds_migrated}, existing: {ds_skipped}, failed: {ds_failed}  |  "
    f"items: {items_migrated} migrated / {items_failed} failed  |  "
    f"run items: {runs_created} linked / {runs_failed} failed  |  "
    f"elapsed: {elapsed:.1f}s"
)

Was this page helpful?