Social rats - PAIR-R24M#

Data from Marshall et al. (2021)¹ (figshare), a multi-animal 3D pose dataset about the dyadic interactions in laboratory rats.


¹ Marshall, J., Klibaite, U., Gellis, A., Aldarondo, D., Olveczky, B., & Dunn, T. W. (2021). The PAIR-R24M Dataset for Multi-animal 3D Pose Estimation. Proceedings of the Neural Information Processing Systems Track on Datasets and Benchmarks, 1. https://datasets-benchmarks-proceedings.neurips.cc/paper/2021/hash/1ff8a7b5dc7a7d1f0ed65aaa29c04b1e-Abstract-round1.html

import numpy as np
import pandas as pd
import xarray as xr
from pathlib import Path
from typing import Optional
from movement.kinematics import compute_velocity, compute_speed
from movement.utils.vector import compute_norm

import ethograph as eto
from ethograph.io.nwb_alignment import align_media_per_trial
import numpy as np
import pandas as pd
import xarray as xr
from pathlib import Path

try:
    _here = Path(__vsc_ipynb_file__).parent
except NameError:
    _here = Path().resolve()

# TODO: adjust to your local copy of the PAIR-R24M dataset
path = r"C:\Users\aksel\Documents\Code\EthoGraph\data\20210119_Recording_SR1_SR2_social_vidtwo\markerDataset.csv"

SESSION = "20210119_Recording_SR1_SR2_social_vidtwo"
DATA_DIR = _here.parent / "data" / SESSION
VIDEO_DIR = DATA_DIR / "videos"
CAMERAS = ["Camera1", "Camera2", "Camera3", "Camera4", "Camera5", "Camera6"]
CHUNK_SIZE = 3500
FPS = 120
def from_pair24_csv(
    file_path: Path | str,
    fps: Optional[float] = None,
) -> xr.Dataset:
    df = pd.read_csv(file_path)

    keypoint_names = [
        "HeadF", "HeadB", "HeadL", "SpineF", "SpineM", "SpineL",
        "Offset1", "Offset2", "HipL", "HipR", "ShoulderL", "ShoulderR",
    ]
    individual_names = ["an1", "an2"]
    position_types = ["aligned", "absolute"]
    n_frames = len(df)
    n_keypoints = len(keypoint_names)
    n_individuals = len(individual_names)
    n_space = 3

    position_array = np.zeros((n_frames, len(position_types), n_space, n_keypoints, n_individuals))
    confidence_array = np.ones((n_frames, len(position_types), n_keypoints, n_individuals))

    for p, pos_type in enumerate(position_types):
        csv_prefix = "alignedPosition" if pos_type == "aligned" else "absolutePosition"
        for i, individual in enumerate(individual_names):
            for j, keypoint in enumerate(keypoint_names):
                for k, coord in enumerate(["x", "y", "z"]):
                    col_name = f"{csv_prefix}_{individual}_{keypoint}_{coord}"
                    if col_name in df.columns:
                        position_array[:, p, k, j, i] = df[col_name].values

    time_coords = np.arange(n_frames, dtype=float) / fps

    ds = xr.Dataset(
        data_vars={
            "position": xr.DataArray(
                position_array,
                dims=["time", "position_type", "space", "keypoints", "individuals"],
            ),
            "confidence": xr.DataArray(
                confidence_array,
                dims=["time", "position_type", "keypoints", "individuals"],
            ),
        },
        coords={
            "time": time_coords,
            "position_type": position_types,
            "space": ["x", "y", "z"],
            "keypoints": keypoint_names,
            "individuals": ["mouse 1", "mouse 2"],
        },
        attrs={"source_software": "DeepLabCut", "fps": fps},
    )

    com_data = np.zeros((n_frames, n_space, n_individuals))
    for i, individual in enumerate(individual_names):
        for j, coord in enumerate(["x", "y", "z"]):
            col_name = f"centerOfmass_{individual}_{coord}"
            if col_name in df.columns:
                com_data[:, j, i] = df[col_name].values
    ds["center_of_mass"] = xr.DataArray(com_data, dims=["time", "space", "individuals"])

    return ds


def split_into_chunks(
    ds_full: xr.Dataset,
    chunk_size: int,
    cameras: list[str],
    fps: float,
) -> tuple[list[xr.Dataset], pd.DataFrame]:
    n_frames = ds_full.sizes["time"]
    n_chunks = n_frames // chunk_size

    datasets = []
    rows = []

    for i in range(n_chunks):
        start_idx = i * chunk_size
        end_idx = start_idx + chunk_size
        start_frame = start_idx

        ds_chunk = ds_full.isel(time=slice(start_idx, end_idx)).copy()
        ds_chunk = ds_chunk.assign_coords(time=np.arange(chunk_size) / fps)
        ds_chunk.attrs["trial"] = i

        ds_chunk["pairwise_distance"] = compute_norm(
            ds_chunk.center_of_mass.sel(individuals="mouse 1")
            - ds_chunk.center_of_mass.sel(individuals="mouse 2")
        )
        ds_chunk["nose_nose_distance"] = compute_norm(
            ds_chunk.position.sel(keypoints="HeadF", individuals="mouse 1", position_type="absolute")
            - ds_chunk.position.sel(keypoints="HeadF", individuals="mouse 2", position_type="absolute")
        )
        ds_chunk["velocity"] = compute_velocity(ds_chunk.position.sel(position_type="aligned"))
        ds_chunk["speed"] = compute_speed(ds_chunk.position.sel(position_type="aligned"))

        for feature in ["nose_nose_distance", "velocity", "speed"]:
            ds_chunk[feature].attrs["type"] = "features"

        datasets.append(ds_chunk)

        row = {"trial": i}
        for cam in cameras:
            row[f"video_{cam}"] = str(VIDEO_DIR / cam / f"{start_frame}.mp4")
        rows.append(row)

    remaining = n_frames % chunk_size
    if remaining > 0:
        print(f"Discarded {remaining} frames at the end (not a full chunk)")
    print(f"Created {len(datasets)} chunks of {chunk_size} frames each")

    return datasets, pd.DataFrame(rows)
ds_full = from_pair24_csv(path, fps=FPS)
ds_full
datasets, session_table = split_into_chunks(
    ds_full, chunk_size=CHUNK_SIZE, cameras=CAMERAS, fps=FPS
)
output_path = DATA_DIR / "Trial_data.nc"
output_path.parent.mkdir(parents=True, exist_ok=True)

# Build NWB alignment
nwb_path = output_path.parent / ".ethograph" / "alignment.nwb"
align_media_per_trial(
    trial_table=session_table,
    stream_rates={"video": float(FPS)},
    output_path=nwb_path,
)

# Build and save TrialTree
dt = eto.from_datasets(datasets)
dt.save(output_path)
print(f"Saved dataset to {output_path}")
from ethograph.labels.converters import write_mapping_file
from ethograph.utils.paths import SETTINGS_DIR

mapping = {
    "Background": 0,
    "Idle": 1,
    "SmallMovement": 2,
    "HeadTilt": 3,
    "Groom": 4,
    "Sniff": 5,
    "Investigate": 6,
    "RearUp": 7,
    "RearDown": 8,
    "CrouchExplore": 9,
    "Amble": 10,
    "Locomotion": 11,
}

mapping_path = output_path.parent / SETTINGS_DIR / "mapping.txt"
write_mapping_file(mapping_path, mapping)
print(f"Saved mapping to {mapping_path}")