Skip to content

Module blueye.sdk.connection

View Source
from __future__ import annotations

import importlib.metadata

import logging

import platform

import queue

import threading

import uuid

from typing import Any, Callable, Dict, List, NamedTuple, Tuple

import blueye.protocol

import proto

import zmq

logger = logging.getLogger(__name__)

class WatchdogPublisher(threading.Thread):

    def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):

        super().__init__(daemon=True)

        self._parent_drone = parent_drone

        self._context = context or zmq.Context().instance()

        self._socket = self._context.socket(zmq.PUB)

        self._socket.connect(f"tcp://{self._parent_drone._ip}:5557")

        self._exit_flag = threading.Event()

    def run(self):

        duration = 0

        WATCHDOG_DELAY = 1

        while not self._exit_flag.wait(WATCHDOG_DELAY):

            self.pet_watchdog(duration)

            duration += 1

    def pet_watchdog(self, duration):

        msg = blueye.protocol.WatchdogCtrl(

            connection_duration={"value": duration}, client_id=self._parent_drone.client_id

        )

        self._socket.send_multipart(

            [

                bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                blueye.protocol.WatchdogCtrl.serialize(msg),

            ]

        )

    def stop(self):

        """Stop the watchdog thread started by run()"""

        self._exit_flag.set()

class Callback(NamedTuple):

    """Specifications for callback for telemetry messages"""

    message_filter: List[proto.messages.Message]

    function: Callable[[str, proto.message.Message], None]

    pass_raw_data: bool

    uuid_hex: str

    kwargs: Dict[str, Any]

class TelemetryClient(threading.Thread):

    def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):

        super().__init__(daemon=True)

        self._parent_drone = parent_drone

        self._context = context or zmq.Context().instance()

        self._socket = self._context.socket(zmq.SUB)

        self._socket.connect(f"tcp://{self._parent_drone._ip}:5555")

        self._socket.setsockopt_string(zmq.SUBSCRIBE, "")

        self._exit_flag = threading.Event()

        self._state_lock = threading.Lock()

        self._callbacks: List[Callback] = []

        self._state: Dict[proto.message.Message, bytes] = {}

        """`_state` is dictionary of the latest received messages, where the key is the protobuf

        message class, eg. blueye.protocol.DepthTel and the value is the serialized protobuf

        message"""

    def _handle_message(self, msg: Tuple[bytes, bytes]):

        msg_type_name = msg[0].decode("utf-8").replace("blueye.protocol.", "")

        try:

            msg_type = blueye.protocol.__getattribute__(msg_type_name)

        except AttributeError:

            # If a new telemetry message is introduced before the SDK is updated this can

            # be a common occurence, so choosing to log with info instead of warning

            logger.info(f"Ignoring unknown message type: {msg_type_name}")

            return

        msg_payload = msg[1]

        with self._state_lock:

            self._state[msg_type] = msg_payload

        for callback in self._callbacks:

            if msg_type in callback.message_filter or callback.message_filter == []:

                if callback.pass_raw_data:

                    callback.function(msg_type_name, msg_payload, **callback.kwargs)

                else:

                    msg_deserialized = msg_type.deserialize(msg_payload)

                    callback.function(msg_type_name, msg_deserialized, **callback.kwargs)

    def run(self):

        poller = zmq.Poller()

        poller.register(self._socket, zmq.POLLIN)

        while not self._exit_flag.is_set():

            events_to_be_processed = poller.poll(10)

            if len(events_to_be_processed) > 0:

                msg = self._socket.recv_multipart()

                self._handle_message(msg)

    def add_callback(

        self,

        msg_filter: List[proto.message.Message],

        callback_function: Callable[[str, proto.message.Message], None],

        raw: bool,

        **kwargs,

    ):

        uuid_hex = uuid.uuid1().hex

        self._callbacks.append(Callback(msg_filter, callback_function, raw, uuid_hex, kwargs))

        return uuid_hex

    def remove_callback(self, callback_id):

        try:

            self._callbacks.pop([cb.uuid_hex for cb in self._callbacks].index(callback_id))

        except ValueError:

            logger.warning(f"Callback with id {callback_id} not found, ignoring")

    def get(self, key: proto.message.Message):

        with self._state_lock:

            return self._state[key]

    def stop(self):

        self._exit_flag.set()

class CtrlClient(threading.Thread):

    def __init__(

        self,

        parent_drone: "blueye.sdk.Drone",

        context: zmq.Context = None,

    ):

        super().__init__(daemon=True)

        self._context = context or zmq.Context().instance()

        self._parent_drone = parent_drone

        self._drone_pub_socket = self._context.socket(zmq.PUB)

        self._drone_pub_socket.connect(f"tcp://{self._parent_drone._ip}:5557")

        self._messages_to_send = queue.Queue()

        self._exit_flag = threading.Event()

    def run(self):

        while not self._exit_flag.is_set():

            try:

                msg = self._messages_to_send.get(timeout=0.1)

                self._drone_pub_socket.send_multipart(

                    [

                        bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                        msg.__class__.serialize(msg),

                    ]

                )

            except queue.Empty:

                # No messages to send, so we can

                continue

    def stop(self):

        self._exit_flag.set()

    def set_lights(self, value: float):

        msg = blueye.protocol.LightsCtrl(lights={"value": value})

        self._messages_to_send.put(msg)

    def set_guest_port_lights(self, value: float):

        msg = blueye.protocol.GuestportLightsCtrl(lights={"value": value})

        self._messages_to_send.put(msg)

    def set_water_density(self, value: float):

        msg = blueye.protocol.WaterDensityCtrl(density={"value": value})

        self._messages_to_send.put(msg)

    def set_tilt_velocity(self, value: float):

        msg = blueye.protocol.TiltVelocityCtrl(velocity={"value": value})

        self._messages_to_send.put(msg)

    def set_tilt_stabilization(self, enabled: bool):

        msg = blueye.protocol.TiltStabilizationCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

    def set_motion_input(

        self, surge: float, sway: float, heave: float, yaw: float, slow: float, boost: float

    ):

        msg = blueye.protocol.MotionInputCtrl(

            motion_input={

                "surge": surge,

                "sway": sway,

                "heave": heave,

                "yaw": yaw,

                "slow": slow,

                "boost": boost,

            }

        )

        self._messages_to_send.put(msg)

    def set_auto_depth_state(self, enabled: bool):

        msg = blueye.protocol.AutoDepthCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

    def set_auto_heading_state(self, enabled: bool):

        msg = blueye.protocol.AutoHeadingCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

    def set_recording_state(self, main_enabled: bool, guestport_enabled: bool):

        msg = blueye.protocol.RecordCtrl(

            record_on={"main": main_enabled, "guestport": guestport_enabled}

        )

        self._messages_to_send.put(msg)

    def take_still_picture(self):

        msg = blueye.protocol.TakePictureCtrl()

        self._messages_to_send.put(msg)

    def set_gripper_velocities(self, grip: float, rotation: float):

        msg = blueye.protocol.GripperCtrl(

            gripper_velocities={"grip_velocity": grip, "rotate_velocity": rotation}

        )

        self._messages_to_send.put(msg)

class ReqRepClient(threading.Thread):

    def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):

        super().__init__(daemon=True)

        self._context = context or zmq.Context().instance()

        self._parent_drone = parent_drone

        self._socket = self._context.socket(zmq.REQ)

        self._socket.connect(f"tcp://{self._parent_drone._ip}:5556")

        self._requests_to_send = queue.Queue()

        self._exit_flag = threading.Event()

    @staticmethod

    def _get_client_info() -> blueye.protocol.ClientInfo:

        client_info = blueye.protocol.ClientInfo(

            type="SDK",

            version=f"{importlib.metadata.version('blueye.sdk')}",

            device_type="Computer",

            platform=f"{platform.system()}",

            platform_version=f"{platform.release()}",

            name=f"{platform.node()}",

        )

        return client_info

    @staticmethod

    def _parse_type_to_string(msg: proto.message.MessageMeta | str) -> str:

        message_type = (

            msg.meta.full_name.replace("blueye.protocol.", "")

            if type(msg) is proto.message.MessageMeta

            else msg.replace("blueye.protocol.", "")

        )

        return message_type

    def run(self):

        while not self._exit_flag.is_set():

            try:

                msg, response_type, response_callback_queue = self._requests_to_send.get(

                    timeout=0.1

                )

                self._socket.send_multipart(

                    [

                        bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                        msg.__class__.serialize(msg),

                    ]

                )

            except queue.Empty:

                # No requests to send, so we can

                continue

            # TODO: Deal with timeout

            resp = self._socket.recv_multipart()

            resp_deserialized = response_type.deserialize(resp[1])

            response_callback_queue.put(resp_deserialized)

    def stop(self):

        self._exit_flag.set()

    def _send_request_get_response(

        self,

        request: proto.message.Message,

        expected_response: proto.message.Message,

        timeout: float,

    ):

        response_queue = queue.Queue(maxsize=1)

        self._requests_to_send.put((request, expected_response, response_queue))

        try:

            return response_queue.get(timeout=timeout)

        except queue.Empty:

            raise blueye.protocol.exceptions.ResponseTimeout(

                "No response received from drone before timeout"

            )

    def ping(self, timeout: float) -> blueye.protocol.PingRep:

        request = blueye.protocol.PingReq()

        return self._send_request_get_response(request, blueye.protocol.PingRep, timeout)

    def get_camera_parameters(

        self, camera: blueye.protocol.Camera, timeout: float = 0.05

    ) -> blueye.protocol.CameraParameters:

        request = blueye.protocol.GetCameraParametersReq(camera=camera)

        response = self._send_request_get_response(

            request, blueye.protocol.GetCameraParametersRep, timeout

        )

        return response.camera_parameters

    def set_camera_parameters(

        self,

        parameters: blueye.protocol.CameraParameters,

        timeout: float = 0.05,

    ):

        request = blueye.protocol.SetCameraParametersReq(camera_parameters=parameters)

        return self._send_request_get_response(

            request, blueye.protocol.SetCameraParametersRep, timeout

        )

    def get_overlay_parameters(self, timeout: float = 0.05) -> blueye.protocol.OverlayParameters:

        request = blueye.protocol.GetOverlayParametersReq()

        response = self._send_request_get_response(

            request, blueye.protocol.GetOverlayParametersRep, timeout

        )

        return response.overlay_parameters

    def set_overlay_parameters(

        self, parameters: blueye.protocol.OverlayParameters, timeout: float = 0.05

    ):

        request = blueye.protocol.SetOverlayParametersReq(overlay_parameters=parameters)

        return self._send_request_get_response(

            request, blueye.protocol.SetOverlayParametersRep, timeout

        )

    def sync_time(self, time: int, timeout: float = 0.05):

        request = blueye.protocol.SyncTimeReq(

            time={"unix_timestamp": {"seconds": time, "nanos": 0}}

        )

        return self._send_request_get_response(request, blueye.protocol.SyncTimeRep, timeout)

    def connect_client(

        self, client_info: blueye.protocol.ClientInfo = None, timeout: float = 0.05

    ) -> blueye.protocol.ConnectClientRep:

        client = client_info or self._get_client_info()

        request = blueye.protocol.ConnectClientReq(client_info=client)

        return self._send_request_get_response(request, blueye.protocol.ConnectClientRep, timeout)

    def disconnect_client(

        self, client_id: int, timeout: float = 0.05

    ) -> blueye.protocol.DisconnectClientRep:

        request = blueye.protocol.DisconnectClientReq(client_id=client_id)

        return self._send_request_get_response(

            request, blueye.protocol.DisconnectClientRep, timeout

        )

    def set_telemetry_msg_publish_frequency(

        self, msg: proto.message.MessageMeta | str, frequency: float, timeout: float = 0.05

    ) -> blueye.protocol.SetPubFrequencyRep:

        message_type = self._parse_type_to_string(msg)

        request = blueye.protocol.SetPubFrequencyReq(

            message_type=message_type,

            frequency=frequency,

        )

        return self._send_request_get_response(request, blueye.protocol.SetPubFrequencyRep, timeout)

    def get_telemetry_msg(self, msg: proto.message.MessageMeta | str, timeout: float = 0.05):

        message_type = self._parse_type_to_string(msg)

        request = blueye.protocol.GetTelemetryReq(message_type=message_type)

        return self._send_request_get_response(request, blueye.protocol.GetTelemetryRep, timeout)

Variables

logger

Classes

Callback

class Callback(
    /,
    *args,
    **kwargs
)

Specifications for callback for telemetry messages

View Source
class Callback(NamedTuple):

    """Specifications for callback for telemetry messages"""

    message_filter: List[proto.messages.Message]

    function: Callable[[str, proto.message.Message], None]

    pass_raw_data: bool

    uuid_hex: str

    kwargs: Dict[str, Any]

Ancestors (in MRO)

  • builtins.tuple

Class variables

function
kwargs
message_filter
pass_raw_data
uuid_hex

Methods

count

def count(
    self,
    value,
    /
)

Return number of occurrences of value.

index

def index(
    self,
    value,
    start=0,
    stop=9223372036854775807,
    /
)

Return first index of value.

Raises ValueError if the value is not present.

CtrlClient

class CtrlClient(
    parent_drone: "'blueye.sdk.Drone'",
    context: 'zmq.Context' = None
)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

View Source
class CtrlClient(threading.Thread):

    def __init__(

        self,

        parent_drone: "blueye.sdk.Drone",

        context: zmq.Context = None,

    ):

        super().__init__(daemon=True)

        self._context = context or zmq.Context().instance()

        self._parent_drone = parent_drone

        self._drone_pub_socket = self._context.socket(zmq.PUB)

        self._drone_pub_socket.connect(f"tcp://{self._parent_drone._ip}:5557")

        self._messages_to_send = queue.Queue()

        self._exit_flag = threading.Event()

    def run(self):

        while not self._exit_flag.is_set():

            try:

                msg = self._messages_to_send.get(timeout=0.1)

                self._drone_pub_socket.send_multipart(

                    [

                        bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                        msg.__class__.serialize(msg),

                    ]

                )

            except queue.Empty:

                # No messages to send, so we can

                continue

    def stop(self):

        self._exit_flag.set()

    def set_lights(self, value: float):

        msg = blueye.protocol.LightsCtrl(lights={"value": value})

        self._messages_to_send.put(msg)

    def set_guest_port_lights(self, value: float):

        msg = blueye.protocol.GuestportLightsCtrl(lights={"value": value})

        self._messages_to_send.put(msg)

    def set_water_density(self, value: float):

        msg = blueye.protocol.WaterDensityCtrl(density={"value": value})

        self._messages_to_send.put(msg)

    def set_tilt_velocity(self, value: float):

        msg = blueye.protocol.TiltVelocityCtrl(velocity={"value": value})

        self._messages_to_send.put(msg)

    def set_tilt_stabilization(self, enabled: bool):

        msg = blueye.protocol.TiltStabilizationCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

    def set_motion_input(

        self, surge: float, sway: float, heave: float, yaw: float, slow: float, boost: float

    ):

        msg = blueye.protocol.MotionInputCtrl(

            motion_input={

                "surge": surge,

                "sway": sway,

                "heave": heave,

                "yaw": yaw,

                "slow": slow,

                "boost": boost,

            }

        )

        self._messages_to_send.put(msg)

    def set_auto_depth_state(self, enabled: bool):

        msg = blueye.protocol.AutoDepthCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

    def set_auto_heading_state(self, enabled: bool):

        msg = blueye.protocol.AutoHeadingCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

    def set_recording_state(self, main_enabled: bool, guestport_enabled: bool):

        msg = blueye.protocol.RecordCtrl(

            record_on={"main": main_enabled, "guestport": guestport_enabled}

        )

        self._messages_to_send.put(msg)

    def take_still_picture(self):

        msg = blueye.protocol.TakePictureCtrl()

        self._messages_to_send.put(msg)

    def set_gripper_velocities(self, grip: float, rotation: float):

        msg = blueye.protocol.GripperCtrl(

            gripper_velocities={"grip_velocity": grip, "rotate_velocity": rotation}

        )

        self._messages_to_send.put(msg)

Ancestors (in MRO)

  • threading.Thread

Instance variables

daemon

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

Methods

getName

def getName(
    self
)

Return a string used for identification purposes only.

This method is deprecated, use the name attribute instead.

View Source
    def getName(self):

        """Return a string used for identification purposes only.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('getName() is deprecated, get the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.name

isDaemon

def isDaemon(
    self
)

Return whether this thread is a daemon.

This method is deprecated, use the daemon attribute instead.

View Source
    def isDaemon(self):

        """Return whether this thread is a daemon.

        This method is deprecated, use the daemon attribute instead.

        """

        import warnings

        warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.daemon

is_alive

def is_alive(
    self
)

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

View Source
    def is_alive(self):

        """Return whether the thread is alive.

        This method returns True just before the run() method starts until just

        after the run() method terminates. See also the module function

        enumerate().

        """

        assert self._initialized, "Thread.__init__() not called"

        if self._is_stopped or not self._started.is_set():

            return False

        self._wait_for_tstate_lock(False)

        return not self._is_stopped

join

def join(
    self,
    timeout=None
)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

View Source
    def join(self, timeout=None):

        """Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is

        called terminates -- either normally or through an unhandled exception

        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a

        floating point number specifying a timeout for the operation in seconds

        (or fractions thereof). As join() always returns None, you must call

        is_alive() after join() to decide whether a timeout happened -- if the

        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will

        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current

        thread as that would cause a deadlock. It is also an error to join() a

        thread before it has been started and attempts to do so raises the same

        exception.

        """

        if not self._initialized:

            raise RuntimeError("Thread.__init__() not called")

        if not self._started.is_set():

            raise RuntimeError("cannot join thread before it is started")

        if self is current_thread():

            raise RuntimeError("cannot join current thread")

        if timeout is None:

            self._wait_for_tstate_lock()

        else:

            # the behavior of a negative timeout isn't documented, but

            # historically .join(timeout=x) for x<0 has acted as if timeout=0

            self._wait_for_tstate_lock(timeout=max(timeout, 0))

run

def run(
    self
)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

View Source
    def run(self):

        while not self._exit_flag.is_set():

            try:

                msg = self._messages_to_send.get(timeout=0.1)

                self._drone_pub_socket.send_multipart(

                    [

                        bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                        msg.__class__.serialize(msg),

                    ]

                )

            except queue.Empty:

                # No messages to send, so we can

                continue

setDaemon

def setDaemon(
    self,
    daemonic
)

Set whether this thread is a daemon.

This method is deprecated, use the .daemon property instead.

View Source
    def setDaemon(self, daemonic):

        """Set whether this thread is a daemon.

        This method is deprecated, use the .daemon property instead.

        """

        import warnings

        warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.daemon = daemonic

setName

def setName(
    self,
    name
)

Set the name string for this thread.

This method is deprecated, use the name attribute instead.

View Source
    def setName(self, name):

        """Set the name string for this thread.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('setName() is deprecated, set the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.name = name

set_auto_depth_state

def set_auto_depth_state(
    self,
    enabled: 'bool'
)
View Source
    def set_auto_depth_state(self, enabled: bool):

        msg = blueye.protocol.AutoDepthCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

set_auto_heading_state

def set_auto_heading_state(
    self,
    enabled: 'bool'
)
View Source
    def set_auto_heading_state(self, enabled: bool):

        msg = blueye.protocol.AutoHeadingCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

set_gripper_velocities

def set_gripper_velocities(
    self,
    grip: 'float',
    rotation: 'float'
)
View Source
    def set_gripper_velocities(self, grip: float, rotation: float):

        msg = blueye.protocol.GripperCtrl(

            gripper_velocities={"grip_velocity": grip, "rotate_velocity": rotation}

        )

        self._messages_to_send.put(msg)

set_guest_port_lights

def set_guest_port_lights(
    self,
    value: 'float'
)
View Source
    def set_guest_port_lights(self, value: float):

        msg = blueye.protocol.GuestportLightsCtrl(lights={"value": value})

        self._messages_to_send.put(msg)

set_lights

def set_lights(
    self,
    value: 'float'
)
View Source
    def set_lights(self, value: float):

        msg = blueye.protocol.LightsCtrl(lights={"value": value})

        self._messages_to_send.put(msg)

set_motion_input

def set_motion_input(
    self,
    surge: 'float',
    sway: 'float',
    heave: 'float',
    yaw: 'float',
    slow: 'float',
    boost: 'float'
)
View Source
    def set_motion_input(

        self, surge: float, sway: float, heave: float, yaw: float, slow: float, boost: float

    ):

        msg = blueye.protocol.MotionInputCtrl(

            motion_input={

                "surge": surge,

                "sway": sway,

                "heave": heave,

                "yaw": yaw,

                "slow": slow,

                "boost": boost,

            }

        )

        self._messages_to_send.put(msg)

set_recording_state

def set_recording_state(
    self,
    main_enabled: 'bool',
    guestport_enabled: 'bool'
)
View Source
    def set_recording_state(self, main_enabled: bool, guestport_enabled: bool):

        msg = blueye.protocol.RecordCtrl(

            record_on={"main": main_enabled, "guestport": guestport_enabled}

        )

        self._messages_to_send.put(msg)

set_tilt_stabilization

def set_tilt_stabilization(
    self,
    enabled: 'bool'
)
View Source
    def set_tilt_stabilization(self, enabled: bool):

        msg = blueye.protocol.TiltStabilizationCtrl(state={"enabled": enabled})

        self._messages_to_send.put(msg)

set_tilt_velocity

def set_tilt_velocity(
    self,
    value: 'float'
)
View Source
    def set_tilt_velocity(self, value: float):

        msg = blueye.protocol.TiltVelocityCtrl(velocity={"value": value})

        self._messages_to_send.put(msg)

set_water_density

def set_water_density(
    self,
    value: 'float'
)
View Source
    def set_water_density(self, value: float):

        msg = blueye.protocol.WaterDensityCtrl(density={"value": value})

        self._messages_to_send.put(msg)

start

def start(
    self
)

Start the thread's activity.

It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

View Source
    def start(self):

        """Start the thread's activity.

        It must be called at most once per thread object. It arranges for the

        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the

        same thread object.

        """

        if not self._initialized:

            raise RuntimeError("thread.__init__() not called")

        if self._started.is_set():

            raise RuntimeError("threads can only be started once")

        with _active_limbo_lock:

            _limbo[self] = self

        try:

            _start_new_thread(self._bootstrap, ())

        except Exception:

            with _active_limbo_lock:

                del _limbo[self]

            raise

        self._started.wait()

stop

def stop(
    self
)
View Source
    def stop(self):

        self._exit_flag.set()

take_still_picture

def take_still_picture(
    self
)
View Source
    def take_still_picture(self):

        msg = blueye.protocol.TakePictureCtrl()

        self._messages_to_send.put(msg)

ReqRepClient

class ReqRepClient(
    parent_drone: "'blueye.sdk.Drone'",
    context: 'zmq.Context' = None
)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

View Source
class ReqRepClient(threading.Thread):

    def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):

        super().__init__(daemon=True)

        self._context = context or zmq.Context().instance()

        self._parent_drone = parent_drone

        self._socket = self._context.socket(zmq.REQ)

        self._socket.connect(f"tcp://{self._parent_drone._ip}:5556")

        self._requests_to_send = queue.Queue()

        self._exit_flag = threading.Event()

    @staticmethod

    def _get_client_info() -> blueye.protocol.ClientInfo:

        client_info = blueye.protocol.ClientInfo(

            type="SDK",

            version=f"{importlib.metadata.version('blueye.sdk')}",

            device_type="Computer",

            platform=f"{platform.system()}",

            platform_version=f"{platform.release()}",

            name=f"{platform.node()}",

        )

        return client_info

    @staticmethod

    def _parse_type_to_string(msg: proto.message.MessageMeta | str) -> str:

        message_type = (

            msg.meta.full_name.replace("blueye.protocol.", "")

            if type(msg) is proto.message.MessageMeta

            else msg.replace("blueye.protocol.", "")

        )

        return message_type

    def run(self):

        while not self._exit_flag.is_set():

            try:

                msg, response_type, response_callback_queue = self._requests_to_send.get(

                    timeout=0.1

                )

                self._socket.send_multipart(

                    [

                        bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                        msg.__class__.serialize(msg),

                    ]

                )

            except queue.Empty:

                # No requests to send, so we can

                continue

            # TODO: Deal with timeout

            resp = self._socket.recv_multipart()

            resp_deserialized = response_type.deserialize(resp[1])

            response_callback_queue.put(resp_deserialized)

    def stop(self):

        self._exit_flag.set()

    def _send_request_get_response(

        self,

        request: proto.message.Message,

        expected_response: proto.message.Message,

        timeout: float,

    ):

        response_queue = queue.Queue(maxsize=1)

        self._requests_to_send.put((request, expected_response, response_queue))

        try:

            return response_queue.get(timeout=timeout)

        except queue.Empty:

            raise blueye.protocol.exceptions.ResponseTimeout(

                "No response received from drone before timeout"

            )

    def ping(self, timeout: float) -> blueye.protocol.PingRep:

        request = blueye.protocol.PingReq()

        return self._send_request_get_response(request, blueye.protocol.PingRep, timeout)

    def get_camera_parameters(

        self, camera: blueye.protocol.Camera, timeout: float = 0.05

    ) -> blueye.protocol.CameraParameters:

        request = blueye.protocol.GetCameraParametersReq(camera=camera)

        response = self._send_request_get_response(

            request, blueye.protocol.GetCameraParametersRep, timeout

        )

        return response.camera_parameters

    def set_camera_parameters(

        self,

        parameters: blueye.protocol.CameraParameters,

        timeout: float = 0.05,

    ):

        request = blueye.protocol.SetCameraParametersReq(camera_parameters=parameters)

        return self._send_request_get_response(

            request, blueye.protocol.SetCameraParametersRep, timeout

        )

    def get_overlay_parameters(self, timeout: float = 0.05) -> blueye.protocol.OverlayParameters:

        request = blueye.protocol.GetOverlayParametersReq()

        response = self._send_request_get_response(

            request, blueye.protocol.GetOverlayParametersRep, timeout

        )

        return response.overlay_parameters

    def set_overlay_parameters(

        self, parameters: blueye.protocol.OverlayParameters, timeout: float = 0.05

    ):

        request = blueye.protocol.SetOverlayParametersReq(overlay_parameters=parameters)

        return self._send_request_get_response(

            request, blueye.protocol.SetOverlayParametersRep, timeout

        )

    def sync_time(self, time: int, timeout: float = 0.05):

        request = blueye.protocol.SyncTimeReq(

            time={"unix_timestamp": {"seconds": time, "nanos": 0}}

        )

        return self._send_request_get_response(request, blueye.protocol.SyncTimeRep, timeout)

    def connect_client(

        self, client_info: blueye.protocol.ClientInfo = None, timeout: float = 0.05

    ) -> blueye.protocol.ConnectClientRep:

        client = client_info or self._get_client_info()

        request = blueye.protocol.ConnectClientReq(client_info=client)

        return self._send_request_get_response(request, blueye.protocol.ConnectClientRep, timeout)

    def disconnect_client(

        self, client_id: int, timeout: float = 0.05

    ) -> blueye.protocol.DisconnectClientRep:

        request = blueye.protocol.DisconnectClientReq(client_id=client_id)

        return self._send_request_get_response(

            request, blueye.protocol.DisconnectClientRep, timeout

        )

    def set_telemetry_msg_publish_frequency(

        self, msg: proto.message.MessageMeta | str, frequency: float, timeout: float = 0.05

    ) -> blueye.protocol.SetPubFrequencyRep:

        message_type = self._parse_type_to_string(msg)

        request = blueye.protocol.SetPubFrequencyReq(

            message_type=message_type,

            frequency=frequency,

        )

        return self._send_request_get_response(request, blueye.protocol.SetPubFrequencyRep, timeout)

    def get_telemetry_msg(self, msg: proto.message.MessageMeta | str, timeout: float = 0.05):

        message_type = self._parse_type_to_string(msg)

        request = blueye.protocol.GetTelemetryReq(message_type=message_type)

        return self._send_request_get_response(request, blueye.protocol.GetTelemetryRep, timeout)

Ancestors (in MRO)

  • threading.Thread

Instance variables

daemon

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

Methods

connect_client

def connect_client(
    self,
    client_info: 'blueye.protocol.ClientInfo' = None,
    timeout: 'float' = 0.05
) -> 'blueye.protocol.ConnectClientRep'
View Source
    def connect_client(

        self, client_info: blueye.protocol.ClientInfo = None, timeout: float = 0.05

    ) -> blueye.protocol.ConnectClientRep:

        client = client_info or self._get_client_info()

        request = blueye.protocol.ConnectClientReq(client_info=client)

        return self._send_request_get_response(request, blueye.protocol.ConnectClientRep, timeout)

disconnect_client

def disconnect_client(
    self,
    client_id: 'int',
    timeout: 'float' = 0.05
) -> 'blueye.protocol.DisconnectClientRep'
View Source
    def disconnect_client(

        self, client_id: int, timeout: float = 0.05

    ) -> blueye.protocol.DisconnectClientRep:

        request = blueye.protocol.DisconnectClientReq(client_id=client_id)

        return self._send_request_get_response(

            request, blueye.protocol.DisconnectClientRep, timeout

        )

getName

def getName(
    self
)

Return a string used for identification purposes only.

This method is deprecated, use the name attribute instead.

View Source
    def getName(self):

        """Return a string used for identification purposes only.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('getName() is deprecated, get the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.name

get_camera_parameters

def get_camera_parameters(
    self,
    camera: 'blueye.protocol.Camera',
    timeout: 'float' = 0.05
) -> 'blueye.protocol.CameraParameters'
View Source
    def get_camera_parameters(

        self, camera: blueye.protocol.Camera, timeout: float = 0.05

    ) -> blueye.protocol.CameraParameters:

        request = blueye.protocol.GetCameraParametersReq(camera=camera)

        response = self._send_request_get_response(

            request, blueye.protocol.GetCameraParametersRep, timeout

        )

        return response.camera_parameters

get_overlay_parameters

def get_overlay_parameters(
    self,
    timeout: 'float' = 0.05
) -> 'blueye.protocol.OverlayParameters'
View Source
    def get_overlay_parameters(self, timeout: float = 0.05) -> blueye.protocol.OverlayParameters:

        request = blueye.protocol.GetOverlayParametersReq()

        response = self._send_request_get_response(

            request, blueye.protocol.GetOverlayParametersRep, timeout

        )

        return response.overlay_parameters

get_telemetry_msg

def get_telemetry_msg(
    self,
    msg: 'proto.message.MessageMeta | str',
    timeout: 'float' = 0.05
)
View Source
    def get_telemetry_msg(self, msg: proto.message.MessageMeta | str, timeout: float = 0.05):

        message_type = self._parse_type_to_string(msg)

        request = blueye.protocol.GetTelemetryReq(message_type=message_type)

        return self._send_request_get_response(request, blueye.protocol.GetTelemetryRep, timeout)

isDaemon

def isDaemon(
    self
)

Return whether this thread is a daemon.

This method is deprecated, use the daemon attribute instead.

View Source
    def isDaemon(self):

        """Return whether this thread is a daemon.

        This method is deprecated, use the daemon attribute instead.

        """

        import warnings

        warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.daemon

is_alive

def is_alive(
    self
)

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

View Source
    def is_alive(self):

        """Return whether the thread is alive.

        This method returns True just before the run() method starts until just

        after the run() method terminates. See also the module function

        enumerate().

        """

        assert self._initialized, "Thread.__init__() not called"

        if self._is_stopped or not self._started.is_set():

            return False

        self._wait_for_tstate_lock(False)

        return not self._is_stopped

join

def join(
    self,
    timeout=None
)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

View Source
    def join(self, timeout=None):

        """Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is

        called terminates -- either normally or through an unhandled exception

        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a

        floating point number specifying a timeout for the operation in seconds

        (or fractions thereof). As join() always returns None, you must call

        is_alive() after join() to decide whether a timeout happened -- if the

        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will

        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current

        thread as that would cause a deadlock. It is also an error to join() a

        thread before it has been started and attempts to do so raises the same

        exception.

        """

        if not self._initialized:

            raise RuntimeError("Thread.__init__() not called")

        if not self._started.is_set():

            raise RuntimeError("cannot join thread before it is started")

        if self is current_thread():

            raise RuntimeError("cannot join current thread")

        if timeout is None:

            self._wait_for_tstate_lock()

        else:

            # the behavior of a negative timeout isn't documented, but

            # historically .join(timeout=x) for x<0 has acted as if timeout=0

            self._wait_for_tstate_lock(timeout=max(timeout, 0))

ping

def ping(
    self,
    timeout: 'float'
) -> 'blueye.protocol.PingRep'
View Source
    def ping(self, timeout: float) -> blueye.protocol.PingRep:

        request = blueye.protocol.PingReq()

        return self._send_request_get_response(request, blueye.protocol.PingRep, timeout)

run

def run(
    self
)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

View Source
    def run(self):

        while not self._exit_flag.is_set():

            try:

                msg, response_type, response_callback_queue = self._requests_to_send.get(

                    timeout=0.1

                )

                self._socket.send_multipart(

                    [

                        bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                        msg.__class__.serialize(msg),

                    ]

                )

            except queue.Empty:

                # No requests to send, so we can

                continue

            # TODO: Deal with timeout

            resp = self._socket.recv_multipart()

            resp_deserialized = response_type.deserialize(resp[1])

            response_callback_queue.put(resp_deserialized)

setDaemon

def setDaemon(
    self,
    daemonic
)

Set whether this thread is a daemon.

This method is deprecated, use the .daemon property instead.

View Source
    def setDaemon(self, daemonic):

        """Set whether this thread is a daemon.

        This method is deprecated, use the .daemon property instead.

        """

        import warnings

        warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.daemon = daemonic

setName

def setName(
    self,
    name
)

Set the name string for this thread.

This method is deprecated, use the name attribute instead.

View Source
    def setName(self, name):

        """Set the name string for this thread.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('setName() is deprecated, set the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.name = name

set_camera_parameters

def set_camera_parameters(
    self,
    parameters: 'blueye.protocol.CameraParameters',
    timeout: 'float' = 0.05
)
View Source
    def set_camera_parameters(

        self,

        parameters: blueye.protocol.CameraParameters,

        timeout: float = 0.05,

    ):

        request = blueye.protocol.SetCameraParametersReq(camera_parameters=parameters)

        return self._send_request_get_response(

            request, blueye.protocol.SetCameraParametersRep, timeout

        )

set_overlay_parameters

def set_overlay_parameters(
    self,
    parameters: 'blueye.protocol.OverlayParameters',
    timeout: 'float' = 0.05
)
View Source
    def set_overlay_parameters(

        self, parameters: blueye.protocol.OverlayParameters, timeout: float = 0.05

    ):

        request = blueye.protocol.SetOverlayParametersReq(overlay_parameters=parameters)

        return self._send_request_get_response(

            request, blueye.protocol.SetOverlayParametersRep, timeout

        )

set_telemetry_msg_publish_frequency

def set_telemetry_msg_publish_frequency(
    self,
    msg: 'proto.message.MessageMeta | str',
    frequency: 'float',
    timeout: 'float' = 0.05
) -> 'blueye.protocol.SetPubFrequencyRep'
View Source
    def set_telemetry_msg_publish_frequency(

        self, msg: proto.message.MessageMeta | str, frequency: float, timeout: float = 0.05

    ) -> blueye.protocol.SetPubFrequencyRep:

        message_type = self._parse_type_to_string(msg)

        request = blueye.protocol.SetPubFrequencyReq(

            message_type=message_type,

            frequency=frequency,

        )

        return self._send_request_get_response(request, blueye.protocol.SetPubFrequencyRep, timeout)

start

def start(
    self
)

Start the thread's activity.

It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

View Source
    def start(self):

        """Start the thread's activity.

        It must be called at most once per thread object. It arranges for the

        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the

        same thread object.

        """

        if not self._initialized:

            raise RuntimeError("thread.__init__() not called")

        if self._started.is_set():

            raise RuntimeError("threads can only be started once")

        with _active_limbo_lock:

            _limbo[self] = self

        try:

            _start_new_thread(self._bootstrap, ())

        except Exception:

            with _active_limbo_lock:

                del _limbo[self]

            raise

        self._started.wait()

stop

def stop(
    self
)
View Source
    def stop(self):

        self._exit_flag.set()

sync_time

def sync_time(
    self,
    time: 'int',
    timeout: 'float' = 0.05
)
View Source
    def sync_time(self, time: int, timeout: float = 0.05):

        request = blueye.protocol.SyncTimeReq(

            time={"unix_timestamp": {"seconds": time, "nanos": 0}}

        )

        return self._send_request_get_response(request, blueye.protocol.SyncTimeRep, timeout)

TelemetryClient

class TelemetryClient(
    parent_drone: "'blueye.sdk.Drone'",
    context: 'zmq.Context' = None
)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

View Source
class TelemetryClient(threading.Thread):

    def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):

        super().__init__(daemon=True)

        self._parent_drone = parent_drone

        self._context = context or zmq.Context().instance()

        self._socket = self._context.socket(zmq.SUB)

        self._socket.connect(f"tcp://{self._parent_drone._ip}:5555")

        self._socket.setsockopt_string(zmq.SUBSCRIBE, "")

        self._exit_flag = threading.Event()

        self._state_lock = threading.Lock()

        self._callbacks: List[Callback] = []

        self._state: Dict[proto.message.Message, bytes] = {}

        """`_state` is dictionary of the latest received messages, where the key is the protobuf

        message class, eg. blueye.protocol.DepthTel and the value is the serialized protobuf

        message"""

    def _handle_message(self, msg: Tuple[bytes, bytes]):

        msg_type_name = msg[0].decode("utf-8").replace("blueye.protocol.", "")

        try:

            msg_type = blueye.protocol.__getattribute__(msg_type_name)

        except AttributeError:

            # If a new telemetry message is introduced before the SDK is updated this can

            # be a common occurence, so choosing to log with info instead of warning

            logger.info(f"Ignoring unknown message type: {msg_type_name}")

            return

        msg_payload = msg[1]

        with self._state_lock:

            self._state[msg_type] = msg_payload

        for callback in self._callbacks:

            if msg_type in callback.message_filter or callback.message_filter == []:

                if callback.pass_raw_data:

                    callback.function(msg_type_name, msg_payload, **callback.kwargs)

                else:

                    msg_deserialized = msg_type.deserialize(msg_payload)

                    callback.function(msg_type_name, msg_deserialized, **callback.kwargs)

    def run(self):

        poller = zmq.Poller()

        poller.register(self._socket, zmq.POLLIN)

        while not self._exit_flag.is_set():

            events_to_be_processed = poller.poll(10)

            if len(events_to_be_processed) > 0:

                msg = self._socket.recv_multipart()

                self._handle_message(msg)

    def add_callback(

        self,

        msg_filter: List[proto.message.Message],

        callback_function: Callable[[str, proto.message.Message], None],

        raw: bool,

        **kwargs,

    ):

        uuid_hex = uuid.uuid1().hex

        self._callbacks.append(Callback(msg_filter, callback_function, raw, uuid_hex, kwargs))

        return uuid_hex

    def remove_callback(self, callback_id):

        try:

            self._callbacks.pop([cb.uuid_hex for cb in self._callbacks].index(callback_id))

        except ValueError:

            logger.warning(f"Callback with id {callback_id} not found, ignoring")

    def get(self, key: proto.message.Message):

        with self._state_lock:

            return self._state[key]

    def stop(self):

        self._exit_flag.set()

Ancestors (in MRO)

  • threading.Thread

Instance variables

daemon

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

Methods

add_callback

def add_callback(
    self,
    msg_filter: 'List[proto.message.Message]',
    callback_function: 'Callable[[str, proto.message.Message], None]',
    raw: 'bool',
    **kwargs
)
View Source
    def add_callback(

        self,

        msg_filter: List[proto.message.Message],

        callback_function: Callable[[str, proto.message.Message], None],

        raw: bool,

        **kwargs,

    ):

        uuid_hex = uuid.uuid1().hex

        self._callbacks.append(Callback(msg_filter, callback_function, raw, uuid_hex, kwargs))

        return uuid_hex

get

def get(
    self,
    key: 'proto.message.Message'
)
View Source
    def get(self, key: proto.message.Message):

        with self._state_lock:

            return self._state[key]

getName

def getName(
    self
)

Return a string used for identification purposes only.

This method is deprecated, use the name attribute instead.

View Source
    def getName(self):

        """Return a string used for identification purposes only.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('getName() is deprecated, get the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.name

isDaemon

def isDaemon(
    self
)

Return whether this thread is a daemon.

This method is deprecated, use the daemon attribute instead.

View Source
    def isDaemon(self):

        """Return whether this thread is a daemon.

        This method is deprecated, use the daemon attribute instead.

        """

        import warnings

        warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.daemon

is_alive

def is_alive(
    self
)

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

View Source
    def is_alive(self):

        """Return whether the thread is alive.

        This method returns True just before the run() method starts until just

        after the run() method terminates. See also the module function

        enumerate().

        """

        assert self._initialized, "Thread.__init__() not called"

        if self._is_stopped or not self._started.is_set():

            return False

        self._wait_for_tstate_lock(False)

        return not self._is_stopped

join

def join(
    self,
    timeout=None
)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

View Source
    def join(self, timeout=None):

        """Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is

        called terminates -- either normally or through an unhandled exception

        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a

        floating point number specifying a timeout for the operation in seconds

        (or fractions thereof). As join() always returns None, you must call

        is_alive() after join() to decide whether a timeout happened -- if the

        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will

        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current

        thread as that would cause a deadlock. It is also an error to join() a

        thread before it has been started and attempts to do so raises the same

        exception.

        """

        if not self._initialized:

            raise RuntimeError("Thread.__init__() not called")

        if not self._started.is_set():

            raise RuntimeError("cannot join thread before it is started")

        if self is current_thread():

            raise RuntimeError("cannot join current thread")

        if timeout is None:

            self._wait_for_tstate_lock()

        else:

            # the behavior of a negative timeout isn't documented, but

            # historically .join(timeout=x) for x<0 has acted as if timeout=0

            self._wait_for_tstate_lock(timeout=max(timeout, 0))

remove_callback

def remove_callback(
    self,
    callback_id
)
View Source
    def remove_callback(self, callback_id):

        try:

            self._callbacks.pop([cb.uuid_hex for cb in self._callbacks].index(callback_id))

        except ValueError:

            logger.warning(f"Callback with id {callback_id} not found, ignoring")

run

def run(
    self
)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

View Source
    def run(self):

        poller = zmq.Poller()

        poller.register(self._socket, zmq.POLLIN)

        while not self._exit_flag.is_set():

            events_to_be_processed = poller.poll(10)

            if len(events_to_be_processed) > 0:

                msg = self._socket.recv_multipart()

                self._handle_message(msg)

setDaemon

def setDaemon(
    self,
    daemonic
)

Set whether this thread is a daemon.

This method is deprecated, use the .daemon property instead.

View Source
    def setDaemon(self, daemonic):

        """Set whether this thread is a daemon.

        This method is deprecated, use the .daemon property instead.

        """

        import warnings

        warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.daemon = daemonic

setName

def setName(
    self,
    name
)

Set the name string for this thread.

This method is deprecated, use the name attribute instead.

View Source
    def setName(self, name):

        """Set the name string for this thread.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('setName() is deprecated, set the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.name = name

start

def start(
    self
)

Start the thread's activity.

It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

View Source
    def start(self):

        """Start the thread's activity.

        It must be called at most once per thread object. It arranges for the

        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the

        same thread object.

        """

        if not self._initialized:

            raise RuntimeError("thread.__init__() not called")

        if self._started.is_set():

            raise RuntimeError("threads can only be started once")

        with _active_limbo_lock:

            _limbo[self] = self

        try:

            _start_new_thread(self._bootstrap, ())

        except Exception:

            with _active_limbo_lock:

                del _limbo[self]

            raise

        self._started.wait()

stop

def stop(
    self
)
View Source
    def stop(self):

        self._exit_flag.set()

WatchdogPublisher

class WatchdogPublisher(
    parent_drone: "'blueye.sdk.Drone'",
    context: 'zmq.Context' = None
)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

View Source
class WatchdogPublisher(threading.Thread):

    def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):

        super().__init__(daemon=True)

        self._parent_drone = parent_drone

        self._context = context or zmq.Context().instance()

        self._socket = self._context.socket(zmq.PUB)

        self._socket.connect(f"tcp://{self._parent_drone._ip}:5557")

        self._exit_flag = threading.Event()

    def run(self):

        duration = 0

        WATCHDOG_DELAY = 1

        while not self._exit_flag.wait(WATCHDOG_DELAY):

            self.pet_watchdog(duration)

            duration += 1

    def pet_watchdog(self, duration):

        msg = blueye.protocol.WatchdogCtrl(

            connection_duration={"value": duration}, client_id=self._parent_drone.client_id

        )

        self._socket.send_multipart(

            [

                bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                blueye.protocol.WatchdogCtrl.serialize(msg),

            ]

        )

    def stop(self):

        """Stop the watchdog thread started by run()"""

        self._exit_flag.set()

Ancestors (in MRO)

  • threading.Thread

Instance variables

daemon

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

Methods

getName

def getName(
    self
)

Return a string used for identification purposes only.

This method is deprecated, use the name attribute instead.

View Source
    def getName(self):

        """Return a string used for identification purposes only.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('getName() is deprecated, get the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.name

isDaemon

def isDaemon(
    self
)

Return whether this thread is a daemon.

This method is deprecated, use the daemon attribute instead.

View Source
    def isDaemon(self):

        """Return whether this thread is a daemon.

        This method is deprecated, use the daemon attribute instead.

        """

        import warnings

        warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        return self.daemon

is_alive

def is_alive(
    self
)

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

View Source
    def is_alive(self):

        """Return whether the thread is alive.

        This method returns True just before the run() method starts until just

        after the run() method terminates. See also the module function

        enumerate().

        """

        assert self._initialized, "Thread.__init__() not called"

        if self._is_stopped or not self._started.is_set():

            return False

        self._wait_for_tstate_lock(False)

        return not self._is_stopped

join

def join(
    self,
    timeout=None
)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

View Source
    def join(self, timeout=None):

        """Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is

        called terminates -- either normally or through an unhandled exception

        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a

        floating point number specifying a timeout for the operation in seconds

        (or fractions thereof). As join() always returns None, you must call

        is_alive() after join() to decide whether a timeout happened -- if the

        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will

        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current

        thread as that would cause a deadlock. It is also an error to join() a

        thread before it has been started and attempts to do so raises the same

        exception.

        """

        if not self._initialized:

            raise RuntimeError("Thread.__init__() not called")

        if not self._started.is_set():

            raise RuntimeError("cannot join thread before it is started")

        if self is current_thread():

            raise RuntimeError("cannot join current thread")

        if timeout is None:

            self._wait_for_tstate_lock()

        else:

            # the behavior of a negative timeout isn't documented, but

            # historically .join(timeout=x) for x<0 has acted as if timeout=0

            self._wait_for_tstate_lock(timeout=max(timeout, 0))

pet_watchdog

def pet_watchdog(
    self,
    duration
)
View Source
    def pet_watchdog(self, duration):

        msg = blueye.protocol.WatchdogCtrl(

            connection_duration={"value": duration}, client_id=self._parent_drone.client_id

        )

        self._socket.send_multipart(

            [

                bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),

                blueye.protocol.WatchdogCtrl.serialize(msg),

            ]

        )

run

def run(
    self
)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

View Source
    def run(self):

        duration = 0

        WATCHDOG_DELAY = 1

        while not self._exit_flag.wait(WATCHDOG_DELAY):

            self.pet_watchdog(duration)

            duration += 1

setDaemon

def setDaemon(
    self,
    daemonic
)

Set whether this thread is a daemon.

This method is deprecated, use the .daemon property instead.

View Source
    def setDaemon(self, daemonic):

        """Set whether this thread is a daemon.

        This method is deprecated, use the .daemon property instead.

        """

        import warnings

        warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.daemon = daemonic

setName

def setName(
    self,
    name
)

Set the name string for this thread.

This method is deprecated, use the name attribute instead.

View Source
    def setName(self, name):

        """Set the name string for this thread.

        This method is deprecated, use the name attribute instead.

        """

        import warnings

        warnings.warn('setName() is deprecated, set the name attribute instead',

                      DeprecationWarning, stacklevel=2)

        self.name = name

start

def start(
    self
)

Start the thread's activity.

It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

View Source
    def start(self):

        """Start the thread's activity.

        It must be called at most once per thread object. It arranges for the

        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the

        same thread object.

        """

        if not self._initialized:

            raise RuntimeError("thread.__init__() not called")

        if self._started.is_set():

            raise RuntimeError("threads can only be started once")

        with _active_limbo_lock:

            _limbo[self] = self

        try:

            _start_new_thread(self._bootstrap, ())

        except Exception:

            with _active_limbo_lock:

                del _limbo[self]

            raise

        self._started.wait()

stop

def stop(
    self
)

Stop the watchdog thread started by run()

View Source
    def stop(self):

        """Stop the watchdog thread started by run()"""

        self._exit_flag.set()