Source code for pyelink.data

"""EyeLink data retrieval and buffering.

This module provides the DataBuffer class for managing sample and raw data
retrieval and buffering from the EyeLink tracker.
"""

from __future__ import annotations

import logging
import time
from concurrent.futures import Future, ThreadPoolExecutor
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .core import EyeLink

import numpy as np

from .utils import RingBuffer

logger = logging.getLogger(__name__)


[docs] class DataBuffer: """Handles data retrieval and buffering from EyeLink tracker. This class manages: - Sample and raw data retrieval from tracker - Buffering data in a ring buffer - Background threads for continuous data collection """
[docs] def __init__( self, device: EyeLink, buffer_length: int = 0, use_buffer: bool = False, read_from_tracker_buffer: bool = True, record_raw_data: bool = False, ) -> None: """Initialize sample buffer. Args: device: Connected EyeLink instance buffer_length: Number of samples to store in ring buffer (0 = no buffering) use_buffer: Store data from tracker buffer in RingBuffer read_from_tracker_buffer: Use getNextData() instead of getNewestSample() (the former draws from an internal buffer and should miss fewer samples) record_raw_data: Whether raw pupil/CR data is being recorded """ self.device = device self.buffer_length = buffer_length self.use_buffer = use_buffer self.read_from_tracker_buffer = read_from_tracker_buffer self.record_raw_data = record_raw_data # Eye indices self.left_eye = 0 self.right_eye = 1 self.binocular = 2 # Timestamp tracking self.t_old = 0 # Initialize ring buffer if needed if buffer_length != 0: self.buf = RingBuffer(maxlen=buffer_length) # Thread pool executor for background data collection self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="DataBuffer") self._sample_future: Future | None = None self._raw_future: Future | None = None self._sample_stop = True self._raw_stop = True
[docs] def get_timestamp(self) -> float: """Get timestamp of latest sample. Returns: Timestamp in milliseconds, or np.nan if not connected """ timestamp = np.nan if self.device.realconnect: sample = self.device.get_newest_sample() if sample is not None: # Use getRawSampleTime() if recording raw data, otherwise use getTime() timestamp = sample.getRawSampleTime() if self.record_raw_data else sample.getTime() return timestamp
[docs] def get_sample(self, write_to_edf: bool = False) -> tuple[int | float, list | None]: """Get the latest gaze sample over the link. The link must have been activated first: tracker.startRecording(0, 0, 1, 1) If both eyes are used, the left one is chosen by default. Eye indices: 0 - left eye 1 - right eye 2 - binocular (returns [lx, ly, lpupil, rx, ry, rpupil]) Args: write_to_edf: Write gaze data to edf-file as messages Returns: tuple: (timestamp, sample_info) - timestamp: Sample timestamp or -1 if no sample - sample_info: List of sample data or None """ # Use these values if nothing else is produced t = -1 sample_info = None # Check which eye is being recorded eye_used = self.device.eye_available() if self.device.realconnect: # Keep getting samples until a new sample is found timeout = 0.010 # Don't wait for new samples longer than 10 ms t0 = time.time() while (time.time() - t0) < timeout: sample = self.device.get_newest_sample() if sample is not None: t = sample.getTime() if t != self.t_old: sample_info = self._unpack_sample(t, sample, eye_used, write_to_edf) break self.t_old = t return t, sample_info
[docs] def get_sample_from_buffer(self, write_to_edf: bool = False) -> tuple[int | float, list | None]: """Get the latest gaze sample from the EyeLink buffer. Uses getNextData() which draws from an internal buffer and should miss fewer samples than getNewestSample(). Data type constants from pylink: 200 - Sample data 4 - ENDBLINK event 8 - ENDFIX event 0x3F/0 - No data available Args: write_to_edf: Write gaze data to edf-file as messages Returns: tuple: (timestamp, sample_info) """ # Use these values if nothing else is produced t = -1 sample_info = None # Check which eye is being recorded eye_used = self.device.eye_available() if self.device.realconnect: # Keep getting samples until a new sample is found timeout = 0.010 t0 = time.time() while (time.time() - t0) < timeout: data_type = self.device.get_next_data() if data_type == 200: # Sample data sample = self.device.get_float_data() if sample is not None: t = sample.getTime() sample_info = self._unpack_sample(t, sample, eye_used, write_to_edf) break elif data_type in {0x3F, 0}: # No data available break else: continue return t, sample_info
def _unpack_sample(self, t: float, sample: object, eye_used: int, write_to_edf: bool) -> list: """Convert sample into list format. Args: t: Sample timestamp sample: Sample object from tracker eye_used: Eye index (0=left, 1=right, 2=binocular) write_to_edf: Whether to write sample as message to EDF Returns: list: Sample data [x, y, pupil] or [lx, ly, lpupil, rx, ry, rpupil] """ # Extract gaze positions and write as msg if eye_used == self.right_eye and sample.isRightSample(): gaze_position = sample.getRightEye().getGaze() pupil_size = sample.getRightEye().getPupilSize() sample_info = [*gaze_position, pupil_size] sample_str = " ".join([str(t), str(gaze_position[0]), str(gaze_position[1]), str(pupil_size)]) elif eye_used == self.left_eye and sample.isLeftSample(): gaze_position = sample.getLeftEye().getGaze() pupil_size = sample.getLeftEye().getPupilSize() sample_info = [*gaze_position, pupil_size] sample_str = " ".join([str(t), str(gaze_position[0]), str(gaze_position[1]), str(pupil_size)]) elif eye_used == self.binocular and sample.isBinocular(): r = sample.getRightEye().getGaze() pr = sample.getRightEye().getPupilSize() l = sample.getLeftEye().getGaze() pl = sample.getLeftEye().getPupilSize() sample_info = [l[0], l[1], pl, r[0], r[1], pr] sample_str = " ".join([ str(t), str(sample_info[0]), str(sample_info[1]), str(sample_info[2]), str(sample_info[3]), str(sample_info[4]), str(sample_info[5]), ]) # Write to edf? if write_to_edf: self.device.send_message(sample_str) # Add data to buffer if activated if self.buffer_length != 0: self.buf.append([t, *sample_info]) return sample_info
[docs] def get_raw_sample(self, write_to_edf: bool = False) -> tuple[int | float, list | None]: """Get the latest raw sample over the link. The link must have been activated first: tracker.startRecording(0, 0, 1, 1) Args: write_to_edf: Write data to edf-file as messages Returns: tuple: (timestamp, raw_data) """ # Use these values if nothing else is produced t = -1 raw = None # Check which eye is being recorded eye_used = self.device.eye_available() if self.device.realconnect: # Keep getting samples until a new sample is found timeout = 0.010 t0 = time.time() while (time.time() - t0) < timeout: rawsample = self.device.get_newest_sample() if rawsample is not None: t = rawsample.getRawSampleTime() # if the timestamps between old and new samples differ, it's new if t != self.t_old: raw = self._unpack_raw_sample(t, rawsample, eye_used, write_to_edf) break self.t_old = t return t, raw
[docs] def get_raw_sample_from_buffer(self, write_to_edf: bool = False) -> tuple[int | float, list | None]: """Get the latest raw sample from the EyeLink buffer. Data type constants from pylink: 200 - Sample data 0x3F/0 - No data available Args: write_to_edf: Write data to edf-file as messages Returns: tuple: (timestamp, raw_data) """ # Use these values if nothing else is produced t = -1 raw = None # Check which eye is being recorded eye_used = self.device.eye_available() if self.device.realconnect: # Keep getting samples until a new sample is found timeout = 0.010 t0 = time.time() while (time.time() - t0) < timeout: data_type = self.device.get_next_data() if data_type == 200: # Sample data rawsample = self.device.get_float_data() # if the timestamps between old and new samples differ, it's new if rawsample is not None: t = rawsample.getRawSampleTime() if t != self.t_old: raw = self._unpack_raw_sample(t, rawsample, eye_used, write_to_edf) self.t_old = t break elif data_type in {0x3F, 0}: # No data available break else: continue return t, raw
def _unpack_raw_sample(self, t: float, rawsample: object, eye_used: int, write_to_edf: bool) -> list: """Convert raw sample into message string. Args: t: Timestamp rawsample: Raw sample object eye_used: Eye index write_to_edf: Write to EDF file Returns: list: Raw sample data """ # Extract gaze positions and write as msg if eye_used == self.right_eye: raw = [ t, rawsample.getRightrRawPupil()[0], rawsample.getRightrRawPupil()[1], rawsample.getRightPupilArea(), rawsample.getRightPupilDimension()[0], rawsample.getRightPupilDimension()[1], rawsample.getRightRawCr()[0], rawsample.getRightRawCr()[1], rawsample.getRightCrArea(), rawsample.getRightRawCr2()[0], rawsample.getRightRawCr2()[1], rawsample.getRightCrArea2(), ] msg = " ".join(["R", " ".join([str(r) for r in raw])]) if write_to_edf: self.device.send_message(msg) elif eye_used == self.left_eye: raw = [ t, rawsample.getLeftrRawPupil()[0], rawsample.getLeftrRawPupil()[1], rawsample.getLeftPupilArea(), rawsample.getLeftPupilDimension()[0], rawsample.getLeftPupilDimension()[1], rawsample.getLeftRawCr()[0], rawsample.getLeftRawCr()[1], rawsample.getLeftCrArea(), rawsample.getLeftRawCr2()[0], rawsample.getLeftRawCr2()[1], rawsample.getLeftCrArea2(), ] msg = " ".join(["L", " ".join([str(l) for l in raw])]) if write_to_edf: self.device.send_message(msg) elif eye_used == self.binocular: raw_l = [ t, rawsample.getLeftrRawPupil()[0], rawsample.getLeftrRawPupil()[1], rawsample.getLeftPupilArea(), rawsample.getLeftPupilDimension()[0], rawsample.getLeftPupilDimension()[1], rawsample.getLeftRawCr()[0], rawsample.getLeftRawCr()[1], rawsample.getLeftCrArea(), rawsample.getLeftRawCr2()[0], rawsample.getLeftRawCr2()[1], rawsample.getLeftCrArea2(), ] raw_r = [ rawsample.getRightrRawPupil()[0], rawsample.getRightrRawPupil()[1], rawsample.getRightPupilArea(), rawsample.getRightPupilDimension()[0], rawsample.getRightPupilDimension()[1], rawsample.getRightRawCr()[0], rawsample.getRightRawCr()[1], rawsample.getRightCrArea(), rawsample.getRightRawCr2()[0], rawsample.getRightRawCr2()[1], rawsample.getRightCrArea2(), ] raw = raw_l + raw_r msg = " ".join([ "L", " ".join([str(lraw) for lraw in raw_l]), "R", " ".join([str(rraw) for rraw in raw_r]), ]) if write_to_edf: self.device.send_message(msg) # Add data to buffer if activated if self.buffer_length != 0: self.buf.append(raw) else: logger.error("Something went wrong...") return raw
[docs] def start_sample_thread(self) -> None: """Start the sample thread for continuous sampling.""" if self._sample_future is not None and not self._sample_future.done(): logger.warning("Sample thread already running") return self._sample_stop = False self._sample_future = self._executor.submit(self._sample_loop) logger.info("Sample thread started")
def _sample_loop(self) -> None: """Continuously read raw data into the ring buffer (called by sample thread).""" k = 0 while True: if self._sample_stop: break if self.read_from_tracker_buffer: self.get_sample_from_buffer(write_to_edf=False) else: self.get_sample(write_to_edf=False) if np.mod(k, 10) == 0: time.sleep(0.001) k += 1
[docs] def stop_sample_thread(self) -> None: """Stop sample thread and wait for it to finish.""" if self._sample_future is None: return self._sample_stop = True # Wait for thread to finish gracefully (up to 5 seconds) try: self._sample_future.result(timeout=5.0) except TimeoutError: logger.warning("Sample thread did not stop within timeout") except Exception: logger.exception("Exception in sample thread")
[docs] def start_raw_thread(self) -> None: """Start the raw thread for continuous raw data collection.""" if self._raw_future is not None and not self._raw_future.done(): logger.warning("Raw thread already running") return self._raw_stop = False self._raw_future = self._executor.submit(self._raw_loop) logger.info("Raw thread started")
def _raw_loop(self) -> None: """Continuously read raw data into the ring buffer (called by raw thread).""" k = 0 while True: if self._raw_stop: break if self.read_from_tracker_buffer: self.get_raw_sample_from_buffer(write_to_edf=True) else: self.get_raw_sample(write_to_edf=True) if np.mod(k, 10) == 0: time.sleep(0.001) k += 1
[docs] def stop_raw_thread(self) -> None: """Stop raw thread and wait for it to finish.""" if self._raw_future is None: return self._raw_stop = True # Wait for thread to finish gracefully (up to 5 seconds) try: self._raw_future.result(timeout=5.0) except TimeoutError: logger.warning("Raw thread did not stop within timeout") except Exception: logger.exception("Exception in raw thread")
[docs] def shutdown(self) -> None: """Shutdown the thread pool executor gracefully.""" self.stop_sample_thread() self.stop_raw_thread() self._executor.shutdown(wait=True, cancel_futures=True) logger.info("DataBuffer executor shutdown complete")
[docs] def flush_samples(self) -> None: """Clear the sample buffer.""" if hasattr(self, "buf"): self.buf.clear()