Skip to content

MLflow Helper

MLflow integration utilities for experiment tracking and result management.

Recommendation

Create dedicated experiments for evaluations. Don't mix with production traces.

Async (preferred)

ragpill.evaluate_testset_with_mlflow async

evaluate_testset_with_mlflow(testset, task, mlflow_settings=None, model_params=None)

Evaluate a testset with comprehensive MLflow logging and tracking.

This function orchestrates the complete evaluation workflow:

  1. Sets up MLflow experiment and starts a run
  2. Wraps the task with MLflow tracing
  3. Evaluates all test cases using the provided task
  4. Cleans up LLMJudge traces (which clutter the UI)
  5. Maps traces to evaluation results
  6. Logs metrics, parameters, and assessments to MLflow
  7. Tags traces with case metadata for filtering and analysis

The function automatically:

  • Logs overall accuracy
  • Logs accuracy per tag for granular analysis
  • Attaches feedback/assessments to each trace
  • Preserves trace IDs for later inspection
  • Logs model parameters for reproducibility

Parameters:

Name Type Description Default
testset Dataset[Any, Any, CaseMetadataT]

The dataset to evaluate, created via load_testset or constructed manually using Case objects

required
task TaskType

The task to evaluate - can be either synchronous or asynchronous callable. Should accept inputs of type InputsT and return outputs of type OutputT. Example: async def my_agent(question: str) -> str: ...

required
mlflow_settings MLFlowSettings | None

MLflow configuration settings. If None, loads from environment variables: - EVAL_MLFLOW_TRACKING_URI: MLflow tracking server URI - EVAL_MLFLOW_EXPERIMENT_NAME: Experiment name for grouping runs - EVAL_MLFLOW_TRACKING_USERNAME: Authentication username (if needed) - EVAL_MLFLOW_TRACKING_PASSWORD: Authentication password (if needed)

None
model_params dict[str, str] | None

Optional dictionary of model/system parameters to log for reproducibility. Examples: {"system_prompt": "...", "model": "gpt-4o", "temperature": "0.7", "retrieval_k": "5", "rerank_model": "..."}

None

Returns:

Type Description
DataFrame

pandas.DataFrame: Evaluation results with columns: - inputs: Test case input - output: Task output - evaluator_result: Boolean pass/fail result - evaluator_data: Evaluator-specific data (e.g., rubric for LLMJudge) - evaluator_reason: Explanation for the result - expected: Whether pass was expected - attributes: JSON-encoded custom attributes - tags: Set of tags for categorization - task_duration: Time taken for task execution - evaluator_name: Name of the evaluator - case_name: Name of the test case - case_id: Unique identifier for the case - source_type: "LLM_JUDGE" or "CODE" - source_id: Evaluator class name - input_key: Hash of the input - trace_id: MLflow trace ID for inspection

Example
import mlflow
from ragpill.csv.testset import load_testset, default_evaluator_classes
from ragpill.mlflow_helper import evaluate_testset_with_mlflow
from ragpill.settings import MLFlowSettings

# Load test dataset
testset = load_testset(
    csv_path="testset.csv",
    evaluator_classes=default_evaluator_classes,
)

# Define your task
async def my_agent(question: str) -> str:
    # Your agent logic here
    return f"Answer to: {question}"

# Run evaluation with MLflow tracking
results_df = evaluate_testset_with_mlflow(
    testset=testset,
    task=my_agent,
    model_params={
        "model": "gpt-4o-mini",
        "temperature": "0.7",
        "system_prompt": "You are a helpful assistant",
    }
)

# Analyze results
print(f"Overall accuracy: {results_df['evaluator_result'].mean():.2%}")
Note

This function will start and end an MLflow run. Make sure MLflow tracking is properly configured before calling this function.

See Also

load_testset: Create test datasets from CSV files MLFlowSettings: MLflow configuration settings

Source code in src/ragpill/mlflow_helper.py
async def evaluate_testset_with_mlflow(
    testset: Dataset[Any, Any, CaseMetadataT],
    task: TaskType,
    mlflow_settings: MLFlowSettings | None = None,
    model_params: dict[str, str] | None = None,
) -> pd.DataFrame:
    """Evaluate a testset with comprehensive MLflow logging and tracking.

    This function orchestrates the complete evaluation workflow:

    1. Sets up MLflow experiment and starts a run
    2. Wraps the task with MLflow tracing
    3. Evaluates all test cases using the provided task
    4. Cleans up LLMJudge traces (which clutter the UI)
    5. Maps traces to evaluation results
    6. Logs metrics, parameters, and assessments to MLflow
    7. Tags traces with case metadata for filtering and analysis

    The function automatically:

    - Logs overall accuracy
    - Logs accuracy per tag for granular analysis
    - Attaches feedback/assessments to each trace
    - Preserves trace IDs for later inspection
    - Logs model parameters for reproducibility

    Args:
        testset: The dataset to evaluate, created via
            [`load_testset`][ragpill.csv.testset.load_testset]
            or constructed manually using [`Case`](https://ai.pydantic.dev/api/pydantic_evals/dataset/#pydantic_evals.dataset.Case) objects
        task: The task to evaluate - can be either synchronous or asynchronous callable.
            Should accept inputs of type `InputsT` and return outputs of type `OutputT`.
            Example: `async def my_agent(question: str) -> str: ...`
        mlflow_settings: MLflow configuration settings. If None, loads from environment variables:
            - `EVAL_MLFLOW_TRACKING_URI`: MLflow tracking server URI
            - `EVAL_MLFLOW_EXPERIMENT_NAME`: Experiment name for grouping runs
            - `EVAL_MLFLOW_TRACKING_USERNAME`: Authentication username (if needed)
            - `EVAL_MLFLOW_TRACKING_PASSWORD`: Authentication password (if needed)
        model_params: Optional dictionary of model/system parameters to log for reproducibility.
            Examples: `{"system_prompt": "...", "model": "gpt-4o", "temperature": "0.7",
            "retrieval_k": "5", "rerank_model": "..."}`

    Returns:
        pandas.DataFrame: Evaluation results with columns:
            - `inputs`: Test case input
            - `output`: Task output
            - `evaluator_result`: Boolean pass/fail result
            - `evaluator_data`: Evaluator-specific data (e.g., rubric for LLMJudge)
            - `evaluator_reason`: Explanation for the result
            - `expected`: Whether pass was expected
            - `attributes`: JSON-encoded custom attributes
            - `tags`: Set of tags for categorization
            - `task_duration`: Time taken for task execution
            - `evaluator_name`: Name of the evaluator
            - `case_name`: Name of the test case
            - `case_id`: Unique identifier for the case
            - `source_type`: "LLM_JUDGE" or "CODE"
            - `source_id`: Evaluator class name
            - `input_key`: Hash of the input
            - `trace_id`: MLflow trace ID for inspection

    Example:
        ```python
        import mlflow
        from ragpill.csv.testset import load_testset, default_evaluator_classes
        from ragpill.mlflow_helper import evaluate_testset_with_mlflow
        from ragpill.settings import MLFlowSettings

        # Load test dataset
        testset = load_testset(
            csv_path="testset.csv",
            evaluator_classes=default_evaluator_classes,
        )

        # Define your task
        async def my_agent(question: str) -> str:
            # Your agent logic here
            return f"Answer to: {question}"

        # Run evaluation with MLflow tracking
        results_df = evaluate_testset_with_mlflow(
            testset=testset,
            task=my_agent,
            model_params={
                "model": "gpt-4o-mini",
                "temperature": "0.7",
                "system_prompt": "You are a helpful assistant",
            }
        )

        # Analyze results
        print(f"Overall accuracy: {results_df['evaluator_result'].mean():.2%}")
        ```

    Note:
        This function will start and end an MLflow run. Make sure MLflow tracking
        is properly configured before calling this function.

    See Also:
        [`load_testset`][ragpill.csv.testset.load_testset]:
            Create test datasets from CSV files
        [`MLFlowSettings`][ragpill.settings.MLFlowSettings]:
            MLflow configuration settings
    """
    mlflow_settings = mlflow_settings or MLFlowSettings()  # pyright: ignore[reportCallIssue]
    _setup_mlflow_experiment(mlflow_settings)
    _fix_evaluator_global_flag(testset)
    testsetresults = await testset.evaluate(_mlflow_runnable_wrapper(task))
    experiment, latest_run_id = _delete_llm_judge_traces(mlflow_settings)
    input_key_trace_map = _get_input_key_trace_id_map(experiment, latest_run_id)
    input_key_report_case_map = _get_input_key_report_case_map(testsetresults, testset)
    eval_metadata_map = _get_evaluation_id_eval_metadata_map(testset)
    assert set(input_key_trace_map.keys()) == set(input_key_report_case_map.keys()), (
        "Input keys in traces and testsetresults do not match."
    )
    eval_result_df = _create_evaluation_dataframe(
        input_key_trace_map,
        input_key_report_case_map,
        eval_metadata_map,
    )
    _upload_mlflow(eval_result_df, input_key_report_case_map, model_params)

    mlflow.end_run()
    return eval_result_df

Sync wrapper

Use this when await is not available (plain scripts, CLI tools, synchronous test suites). It runs the async version in a dedicated thread, so it is safe to call from both sync and async contexts — including Jupyter notebooks and FastAPI route handlers.

ragpill.evaluate_testset_with_mlflow_sync

evaluate_testset_with_mlflow_sync(testset, task, mlflow_settings=None, model_params=None)

Synchronous wrapper around evaluate_testset_with_mlflow.

Prefer the async version when possible. Use this wrapper when you cannot use await — for example in plain scripts, CLI tools, or synchronous test suites.

Internally, this runs the async function via asyncio.run() inside a fresh thread from a ThreadPoolExecutor. That thread has no running event loop, so asyncio.run() always succeeds — even when the caller is already inside a running event loop (e.g. Jupyter, FastAPI, or an asyncio-based test runner).

Parameters:

Name Type Description Default
testset Dataset[Any, Any, CaseMetadataT]

The dataset to evaluate.

required
task TaskType

The task to evaluate — sync or async callable.

required
mlflow_settings MLFlowSettings | None

MLflow configuration. If None, loaded from environment variables.

None
model_params dict[str, str] | None

Optional model/system parameters to log for reproducibility.

None

Returns:

Type Description
DataFrame

pandas.DataFrame: Same evaluation results as the async version.

See Also

evaluate_testset_with_mlflow: The async version of this function.

Source code in src/ragpill/mlflow_helper.py
def evaluate_testset_with_mlflow_sync(
    testset: Dataset[Any, Any, CaseMetadataT],
    task: TaskType,
    mlflow_settings: MLFlowSettings | None = None,
    model_params: dict[str, str] | None = None,
) -> pd.DataFrame:
    """Synchronous wrapper around [`evaluate_testset_with_mlflow`][ragpill.mlflow_helper.evaluate_testset_with_mlflow].

    Prefer the async version when possible. Use this wrapper when you cannot use `await` —
    for example in plain scripts, CLI tools, or synchronous test suites.

    Internally, this runs the async function via `asyncio.run()` inside a fresh thread from a
    `ThreadPoolExecutor`. That thread has no running event loop, so `asyncio.run()` always
    succeeds — even when the *caller* is already inside a running event loop (e.g. Jupyter,
    FastAPI, or an `asyncio`-based test runner).

    Args:
        testset: The dataset to evaluate.
        task: The task to evaluate — sync or async callable.
        mlflow_settings: MLflow configuration. If None, loaded from environment variables.
        model_params: Optional model/system parameters to log for reproducibility.

    Returns:
        pandas.DataFrame: Same evaluation results as the async version.

    See Also:
        [`evaluate_testset_with_mlflow`][ragpill.mlflow_helper.evaluate_testset_with_mlflow]:
            The async version of this function.
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(
            asyncio.run,
            evaluate_testset_with_mlflow(testset, task, mlflow_settings, model_params),
        )
        return future.result()

See Also