Module blueye.sdk.connection
View Source
from __future__ import annotations
import importlib.metadata
import logging
import platform
import queue
import threading
import uuid
from typing import Any, Callable, Dict, List, NamedTuple, Tuple
import blueye.protocol
import proto
import zmq
logger = logging.getLogger(__name__)
class WatchdogPublisher(threading.Thread):
def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):
super().__init__(daemon=True)
self._parent_drone = parent_drone
self._context = context or zmq.Context().instance()
self._socket = self._context.socket(zmq.PUB)
self._socket.connect(f"tcp://{self._parent_drone._ip}:5557")
self._exit_flag = threading.Event()
def run(self):
duration = 0
WATCHDOG_DELAY = 1
while not self._exit_flag.wait(WATCHDOG_DELAY):
self.pet_watchdog(duration)
duration += 1
def pet_watchdog(self, duration):
msg = blueye.protocol.WatchdogCtrl(
connection_duration={"value": duration}, client_id=self._parent_drone.client_id
)
self._socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
blueye.protocol.WatchdogCtrl.serialize(msg),
]
)
def stop(self):
"""Stop the watchdog thread started by run()"""
self._exit_flag.set()
class Callback(NamedTuple):
"""Specifications for callback for telemetry messages"""
message_filter: List[proto.messages.Message]
function: Callable[[str, proto.message.Message], None]
pass_raw_data: bool
uuid_hex: str
kwargs: Dict[str, Any]
class TelemetryClient(threading.Thread):
def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):
super().__init__(daemon=True)
self._parent_drone = parent_drone
self._context = context or zmq.Context().instance()
self._socket = self._context.socket(zmq.SUB)
self._socket.connect(f"tcp://{self._parent_drone._ip}:5555")
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
self._exit_flag = threading.Event()
self._state_lock = threading.Lock()
self._callbacks: List[Callback] = []
self._state: Dict[proto.message.Message, bytes] = {}
"""`_state` is dictionary of the latest received messages, where the key is the protobuf
message class, eg. blueye.protocol.DepthTel and the value is the serialized protobuf
message"""
def _handle_message(self, msg: Tuple[bytes, bytes]):
msg_type_name = msg[0].decode("utf-8").replace("blueye.protocol.", "")
try:
msg_type = blueye.protocol.__getattribute__(msg_type_name)
except AttributeError:
# If a new telemetry message is introduced before the SDK is updated this can
# be a common occurence, so choosing to log with info instead of warning
logger.info(f"Ignoring unknown message type: {msg_type_name}")
return
msg_payload = msg[1]
with self._state_lock:
self._state[msg_type] = msg_payload
for callback in self._callbacks:
if msg_type in callback.message_filter or callback.message_filter == []:
if callback.pass_raw_data:
callback.function(msg_type_name, msg_payload, **callback.kwargs)
else:
msg_deserialized = msg_type.deserialize(msg_payload)
callback.function(msg_type_name, msg_deserialized, **callback.kwargs)
def run(self):
poller = zmq.Poller()
poller.register(self._socket, zmq.POLLIN)
while not self._exit_flag.is_set():
events_to_be_processed = poller.poll(10)
if len(events_to_be_processed) > 0:
msg = self._socket.recv_multipart()
self._handle_message(msg)
def add_callback(
self,
msg_filter: List[proto.message.Message],
callback_function: Callable[[str, proto.message.Message], None],
raw: bool,
**kwargs,
):
uuid_hex = uuid.uuid1().hex
self._callbacks.append(Callback(msg_filter, callback_function, raw, uuid_hex, kwargs))
return uuid_hex
def remove_callback(self, callback_id):
try:
self._callbacks.pop([cb.uuid_hex for cb in self._callbacks].index(callback_id))
except ValueError:
logger.warning(f"Callback with id {callback_id} not found, ignoring")
def get(self, key: proto.message.Message):
with self._state_lock:
return self._state[key]
def stop(self):
self._exit_flag.set()
class CtrlClient(threading.Thread):
def __init__(
self,
parent_drone: "blueye.sdk.Drone",
context: zmq.Context = None,
):
super().__init__(daemon=True)
self._context = context or zmq.Context().instance()
self._parent_drone = parent_drone
self._drone_pub_socket = self._context.socket(zmq.PUB)
self._drone_pub_socket.connect(f"tcp://{self._parent_drone._ip}:5557")
self._messages_to_send = queue.Queue()
self._exit_flag = threading.Event()
def run(self):
while not self._exit_flag.is_set():
try:
msg = self._messages_to_send.get(timeout=0.1)
self._drone_pub_socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
msg.__class__.serialize(msg),
]
)
except queue.Empty:
# No messages to send, so we can
continue
def stop(self):
self._exit_flag.set()
def set_lights(self, value: float):
msg = blueye.protocol.LightsCtrl(lights={"value": value})
self._messages_to_send.put(msg)
def set_guest_port_lights(self, value: float):
msg = blueye.protocol.GuestportLightsCtrl(lights={"value": value})
self._messages_to_send.put(msg)
def set_water_density(self, value: float):
msg = blueye.protocol.WaterDensityCtrl(density={"value": value})
self._messages_to_send.put(msg)
def set_tilt_velocity(self, value: float):
msg = blueye.protocol.TiltVelocityCtrl(velocity={"value": value})
self._messages_to_send.put(msg)
def set_tilt_stabilization(self, enabled: bool):
msg = blueye.protocol.TiltStabilizationCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
def set_motion_input(
self, surge: float, sway: float, heave: float, yaw: float, slow: float, boost: float
):
msg = blueye.protocol.MotionInputCtrl(
motion_input={
"surge": surge,
"sway": sway,
"heave": heave,
"yaw": yaw,
"slow": slow,
"boost": boost,
}
)
self._messages_to_send.put(msg)
def set_auto_depth_state(self, enabled: bool):
msg = blueye.protocol.AutoDepthCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
def set_auto_heading_state(self, enabled: bool):
msg = blueye.protocol.AutoHeadingCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
def set_recording_state(self, main_enabled: bool, guestport_enabled: bool):
msg = blueye.protocol.RecordCtrl(
record_on={"main": main_enabled, "guestport": guestport_enabled}
)
self._messages_to_send.put(msg)
def take_still_picture(self):
msg = blueye.protocol.TakePictureCtrl()
self._messages_to_send.put(msg)
def set_gripper_velocities(self, grip: float, rotation: float):
msg = blueye.protocol.GripperCtrl(
gripper_velocities={"grip_velocity": grip, "rotate_velocity": rotation}
)
self._messages_to_send.put(msg)
class ReqRepClient(threading.Thread):
def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):
super().__init__(daemon=True)
self._context = context or zmq.Context().instance()
self._parent_drone = parent_drone
self._socket = self._context.socket(zmq.REQ)
self._socket.connect(f"tcp://{self._parent_drone._ip}:5556")
self._requests_to_send = queue.Queue()
self._exit_flag = threading.Event()
@staticmethod
def _get_client_info() -> blueye.protocol.ClientInfo:
client_info = blueye.protocol.ClientInfo(
type="SDK",
version=f"{importlib.metadata.version('blueye.sdk')}",
device_type="Computer",
platform=f"{platform.system()}",
platform_version=f"{platform.release()}",
name=f"{platform.node()}",
)
return client_info
@staticmethod
def _parse_type_to_string(msg: proto.message.MessageMeta | str) -> str:
message_type = (
msg.meta.full_name.replace("blueye.protocol.", "")
if type(msg) is proto.message.MessageMeta
else msg.replace("blueye.protocol.", "")
)
return message_type
def run(self):
while not self._exit_flag.is_set():
try:
msg, response_type, response_callback_queue = self._requests_to_send.get(
timeout=0.1
)
self._socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
msg.__class__.serialize(msg),
]
)
except queue.Empty:
# No requests to send, so we can
continue
# TODO: Deal with timeout
resp = self._socket.recv_multipart()
resp_deserialized = response_type.deserialize(resp[1])
response_callback_queue.put(resp_deserialized)
def stop(self):
self._exit_flag.set()
def _send_request_get_response(
self,
request: proto.message.Message,
expected_response: proto.message.Message,
timeout: float,
):
response_queue = queue.Queue(maxsize=1)
self._requests_to_send.put((request, expected_response, response_queue))
try:
return response_queue.get(timeout=timeout)
except queue.Empty:
raise blueye.protocol.exceptions.ResponseTimeout(
"No response received from drone before timeout"
)
def ping(self, timeout: float) -> blueye.protocol.PingRep:
request = blueye.protocol.PingReq()
return self._send_request_get_response(request, blueye.protocol.PingRep, timeout)
def get_camera_parameters(
self, camera: blueye.protocol.Camera, timeout: float = 0.05
) -> blueye.protocol.CameraParameters:
request = blueye.protocol.GetCameraParametersReq(camera=camera)
response = self._send_request_get_response(
request, blueye.protocol.GetCameraParametersRep, timeout
)
return response.camera_parameters
def set_camera_parameters(
self,
parameters: blueye.protocol.CameraParameters,
timeout: float = 0.05,
):
request = blueye.protocol.SetCameraParametersReq(camera_parameters=parameters)
return self._send_request_get_response(
request, blueye.protocol.SetCameraParametersRep, timeout
)
def get_overlay_parameters(self, timeout: float = 0.05) -> blueye.protocol.OverlayParameters:
request = blueye.protocol.GetOverlayParametersReq()
response = self._send_request_get_response(
request, blueye.protocol.GetOverlayParametersRep, timeout
)
return response.overlay_parameters
def set_overlay_parameters(
self, parameters: blueye.protocol.OverlayParameters, timeout: float = 0.05
):
request = blueye.protocol.SetOverlayParametersReq(overlay_parameters=parameters)
return self._send_request_get_response(
request, blueye.protocol.SetOverlayParametersRep, timeout
)
def sync_time(self, time: int, timeout: float = 0.05):
request = blueye.protocol.SyncTimeReq(
time={"unix_timestamp": {"seconds": time, "nanos": 0}}
)
return self._send_request_get_response(request, blueye.protocol.SyncTimeRep, timeout)
def connect_client(
self, client_info: blueye.protocol.ClientInfo = None, timeout: float = 0.05
) -> blueye.protocol.ConnectClientRep:
client = client_info or self._get_client_info()
request = blueye.protocol.ConnectClientReq(client_info=client)
return self._send_request_get_response(request, blueye.protocol.ConnectClientRep, timeout)
def disconnect_client(
self, client_id: int, timeout: float = 0.05
) -> blueye.protocol.DisconnectClientRep:
request = blueye.protocol.DisconnectClientReq(client_id=client_id)
return self._send_request_get_response(
request, blueye.protocol.DisconnectClientRep, timeout
)
def set_telemetry_msg_publish_frequency(
self, msg: proto.message.MessageMeta | str, frequency: float, timeout: float = 0.05
) -> blueye.protocol.SetPubFrequencyRep:
message_type = self._parse_type_to_string(msg)
request = blueye.protocol.SetPubFrequencyReq(
message_type=message_type,
frequency=frequency,
)
return self._send_request_get_response(request, blueye.protocol.SetPubFrequencyRep, timeout)
def get_telemetry_msg(self, msg: proto.message.MessageMeta | str, timeout: float = 0.05):
message_type = self._parse_type_to_string(msg)
request = blueye.protocol.GetTelemetryReq(message_type=message_type)
return self._send_request_get_response(request, blueye.protocol.GetTelemetryRep, timeout)
Variables
logger
Classes
Callback
class Callback(
/,
*args,
**kwargs
)
Specifications for callback for telemetry messages
View Source
class Callback(NamedTuple):
"""Specifications for callback for telemetry messages"""
message_filter: List[proto.messages.Message]
function: Callable[[str, proto.message.Message], None]
pass_raw_data: bool
uuid_hex: str
kwargs: Dict[str, Any]
Ancestors (in MRO)
- builtins.tuple
Class variables
function
kwargs
message_filter
pass_raw_data
uuid_hex
Methods
count
def count(
self,
value,
/
)
Return number of occurrences of value.
index
def index(
self,
value,
start=0,
stop=9223372036854775807,
/
)
Return first index of value.
Raises ValueError if the value is not present.
CtrlClient
class CtrlClient(
parent_drone: "'blueye.sdk.Drone'",
context: 'zmq.Context' = None
)
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
View Source
class CtrlClient(threading.Thread):
def __init__(
self,
parent_drone: "blueye.sdk.Drone",
context: zmq.Context = None,
):
super().__init__(daemon=True)
self._context = context or zmq.Context().instance()
self._parent_drone = parent_drone
self._drone_pub_socket = self._context.socket(zmq.PUB)
self._drone_pub_socket.connect(f"tcp://{self._parent_drone._ip}:5557")
self._messages_to_send = queue.Queue()
self._exit_flag = threading.Event()
def run(self):
while not self._exit_flag.is_set():
try:
msg = self._messages_to_send.get(timeout=0.1)
self._drone_pub_socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
msg.__class__.serialize(msg),
]
)
except queue.Empty:
# No messages to send, so we can
continue
def stop(self):
self._exit_flag.set()
def set_lights(self, value: float):
msg = blueye.protocol.LightsCtrl(lights={"value": value})
self._messages_to_send.put(msg)
def set_guest_port_lights(self, value: float):
msg = blueye.protocol.GuestportLightsCtrl(lights={"value": value})
self._messages_to_send.put(msg)
def set_water_density(self, value: float):
msg = blueye.protocol.WaterDensityCtrl(density={"value": value})
self._messages_to_send.put(msg)
def set_tilt_velocity(self, value: float):
msg = blueye.protocol.TiltVelocityCtrl(velocity={"value": value})
self._messages_to_send.put(msg)
def set_tilt_stabilization(self, enabled: bool):
msg = blueye.protocol.TiltStabilizationCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
def set_motion_input(
self, surge: float, sway: float, heave: float, yaw: float, slow: float, boost: float
):
msg = blueye.protocol.MotionInputCtrl(
motion_input={
"surge": surge,
"sway": sway,
"heave": heave,
"yaw": yaw,
"slow": slow,
"boost": boost,
}
)
self._messages_to_send.put(msg)
def set_auto_depth_state(self, enabled: bool):
msg = blueye.protocol.AutoDepthCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
def set_auto_heading_state(self, enabled: bool):
msg = blueye.protocol.AutoHeadingCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
def set_recording_state(self, main_enabled: bool, guestport_enabled: bool):
msg = blueye.protocol.RecordCtrl(
record_on={"main": main_enabled, "guestport": guestport_enabled}
)
self._messages_to_send.put(msg)
def take_still_picture(self):
msg = blueye.protocol.TakePictureCtrl()
self._messages_to_send.put(msg)
def set_gripper_velocities(self, grip: float, rotation: float):
msg = blueye.protocol.GripperCtrl(
gripper_velocities={"grip_velocity": grip, "rotate_velocity": rotation}
)
self._messages_to_send.put(msg)
Ancestors (in MRO)
- threading.Thread
Instance variables
daemon
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
ident
Thread identifier of this thread or None if it has not been started.
This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.
name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
native_id
Native integral thread ID of this thread, or None if it has not been started.
This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.
Methods
getName
def getName(
self
)
Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
View Source
def getName(self):
"""Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('getName() is deprecated, get the name attribute instead',
DeprecationWarning, stacklevel=2)
return self.name
isDaemon
def isDaemon(
self
)
Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
View Source
def isDaemon(self):
"""Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
"""
import warnings
warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
DeprecationWarning, stacklevel=2)
return self.daemon
is_alive
def is_alive(
self
)
Return whether the thread is alive.
This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().
View Source
def is_alive(self):
"""Return whether the thread is alive.
This method returns True just before the run() method starts until just
after the run() method terminates. See also the module function
enumerate().
"""
assert self._initialized, "Thread.__init__() not called"
if self._is_stopped or not self._started.is_set():
return False
self._wait_for_tstate_lock(False)
return not self._is_stopped
join
def join(
self,
timeout=None
)
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
View Source
def join(self, timeout=None):
"""Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
"""
if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
if not self._started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if timeout is None:
self._wait_for_tstate_lock()
else:
# the behavior of a negative timeout isn't documented, but
# historically .join(timeout=x) for x<0 has acted as if timeout=0
self._wait_for_tstate_lock(timeout=max(timeout, 0))
run
def run(
self
)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
View Source
def run(self):
while not self._exit_flag.is_set():
try:
msg = self._messages_to_send.get(timeout=0.1)
self._drone_pub_socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
msg.__class__.serialize(msg),
]
)
except queue.Empty:
# No messages to send, so we can
continue
setDaemon
def setDaemon(
self,
daemonic
)
Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
View Source
def setDaemon(self, daemonic):
"""Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
"""
import warnings
warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
DeprecationWarning, stacklevel=2)
self.daemon = daemonic
setName
def setName(
self,
name
)
Set the name string for this thread.
This method is deprecated, use the name attribute instead.
View Source
def setName(self, name):
"""Set the name string for this thread.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('setName() is deprecated, set the name attribute instead',
DeprecationWarning, stacklevel=2)
self.name = name
set_auto_depth_state
def set_auto_depth_state(
self,
enabled: 'bool'
)
View Source
def set_auto_depth_state(self, enabled: bool):
msg = blueye.protocol.AutoDepthCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
set_auto_heading_state
def set_auto_heading_state(
self,
enabled: 'bool'
)
View Source
def set_auto_heading_state(self, enabled: bool):
msg = blueye.protocol.AutoHeadingCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
set_gripper_velocities
def set_gripper_velocities(
self,
grip: 'float',
rotation: 'float'
)
View Source
def set_gripper_velocities(self, grip: float, rotation: float):
msg = blueye.protocol.GripperCtrl(
gripper_velocities={"grip_velocity": grip, "rotate_velocity": rotation}
)
self._messages_to_send.put(msg)
set_guest_port_lights
def set_guest_port_lights(
self,
value: 'float'
)
View Source
def set_guest_port_lights(self, value: float):
msg = blueye.protocol.GuestportLightsCtrl(lights={"value": value})
self._messages_to_send.put(msg)
set_lights
def set_lights(
self,
value: 'float'
)
View Source
def set_lights(self, value: float):
msg = blueye.protocol.LightsCtrl(lights={"value": value})
self._messages_to_send.put(msg)
set_motion_input
def set_motion_input(
self,
surge: 'float',
sway: 'float',
heave: 'float',
yaw: 'float',
slow: 'float',
boost: 'float'
)
View Source
def set_motion_input(
self, surge: float, sway: float, heave: float, yaw: float, slow: float, boost: float
):
msg = blueye.protocol.MotionInputCtrl(
motion_input={
"surge": surge,
"sway": sway,
"heave": heave,
"yaw": yaw,
"slow": slow,
"boost": boost,
}
)
self._messages_to_send.put(msg)
set_recording_state
def set_recording_state(
self,
main_enabled: 'bool',
guestport_enabled: 'bool'
)
View Source
def set_recording_state(self, main_enabled: bool, guestport_enabled: bool):
msg = blueye.protocol.RecordCtrl(
record_on={"main": main_enabled, "guestport": guestport_enabled}
)
self._messages_to_send.put(msg)
set_tilt_stabilization
def set_tilt_stabilization(
self,
enabled: 'bool'
)
View Source
def set_tilt_stabilization(self, enabled: bool):
msg = blueye.protocol.TiltStabilizationCtrl(state={"enabled": enabled})
self._messages_to_send.put(msg)
set_tilt_velocity
def set_tilt_velocity(
self,
value: 'float'
)
View Source
def set_tilt_velocity(self, value: float):
msg = blueye.protocol.TiltVelocityCtrl(velocity={"value": value})
self._messages_to_send.put(msg)
set_water_density
def set_water_density(
self,
value: 'float'
)
View Source
def set_water_density(self, value: float):
msg = blueye.protocol.WaterDensityCtrl(density={"value": value})
self._messages_to_send.put(msg)
start
def start(
self
)
Start the thread's activity.
It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the same thread object.
View Source
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
stop
def stop(
self
)
View Source
def stop(self):
self._exit_flag.set()
take_still_picture
def take_still_picture(
self
)
View Source
def take_still_picture(self):
msg = blueye.protocol.TakePictureCtrl()
self._messages_to_send.put(msg)
ReqRepClient
class ReqRepClient(
parent_drone: "'blueye.sdk.Drone'",
context: 'zmq.Context' = None
)
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
View Source
class ReqRepClient(threading.Thread):
def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):
super().__init__(daemon=True)
self._context = context or zmq.Context().instance()
self._parent_drone = parent_drone
self._socket = self._context.socket(zmq.REQ)
self._socket.connect(f"tcp://{self._parent_drone._ip}:5556")
self._requests_to_send = queue.Queue()
self._exit_flag = threading.Event()
@staticmethod
def _get_client_info() -> blueye.protocol.ClientInfo:
client_info = blueye.protocol.ClientInfo(
type="SDK",
version=f"{importlib.metadata.version('blueye.sdk')}",
device_type="Computer",
platform=f"{platform.system()}",
platform_version=f"{platform.release()}",
name=f"{platform.node()}",
)
return client_info
@staticmethod
def _parse_type_to_string(msg: proto.message.MessageMeta | str) -> str:
message_type = (
msg.meta.full_name.replace("blueye.protocol.", "")
if type(msg) is proto.message.MessageMeta
else msg.replace("blueye.protocol.", "")
)
return message_type
def run(self):
while not self._exit_flag.is_set():
try:
msg, response_type, response_callback_queue = self._requests_to_send.get(
timeout=0.1
)
self._socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
msg.__class__.serialize(msg),
]
)
except queue.Empty:
# No requests to send, so we can
continue
# TODO: Deal with timeout
resp = self._socket.recv_multipart()
resp_deserialized = response_type.deserialize(resp[1])
response_callback_queue.put(resp_deserialized)
def stop(self):
self._exit_flag.set()
def _send_request_get_response(
self,
request: proto.message.Message,
expected_response: proto.message.Message,
timeout: float,
):
response_queue = queue.Queue(maxsize=1)
self._requests_to_send.put((request, expected_response, response_queue))
try:
return response_queue.get(timeout=timeout)
except queue.Empty:
raise blueye.protocol.exceptions.ResponseTimeout(
"No response received from drone before timeout"
)
def ping(self, timeout: float) -> blueye.protocol.PingRep:
request = blueye.protocol.PingReq()
return self._send_request_get_response(request, blueye.protocol.PingRep, timeout)
def get_camera_parameters(
self, camera: blueye.protocol.Camera, timeout: float = 0.05
) -> blueye.protocol.CameraParameters:
request = blueye.protocol.GetCameraParametersReq(camera=camera)
response = self._send_request_get_response(
request, blueye.protocol.GetCameraParametersRep, timeout
)
return response.camera_parameters
def set_camera_parameters(
self,
parameters: blueye.protocol.CameraParameters,
timeout: float = 0.05,
):
request = blueye.protocol.SetCameraParametersReq(camera_parameters=parameters)
return self._send_request_get_response(
request, blueye.protocol.SetCameraParametersRep, timeout
)
def get_overlay_parameters(self, timeout: float = 0.05) -> blueye.protocol.OverlayParameters:
request = blueye.protocol.GetOverlayParametersReq()
response = self._send_request_get_response(
request, blueye.protocol.GetOverlayParametersRep, timeout
)
return response.overlay_parameters
def set_overlay_parameters(
self, parameters: blueye.protocol.OverlayParameters, timeout: float = 0.05
):
request = blueye.protocol.SetOverlayParametersReq(overlay_parameters=parameters)
return self._send_request_get_response(
request, blueye.protocol.SetOverlayParametersRep, timeout
)
def sync_time(self, time: int, timeout: float = 0.05):
request = blueye.protocol.SyncTimeReq(
time={"unix_timestamp": {"seconds": time, "nanos": 0}}
)
return self._send_request_get_response(request, blueye.protocol.SyncTimeRep, timeout)
def connect_client(
self, client_info: blueye.protocol.ClientInfo = None, timeout: float = 0.05
) -> blueye.protocol.ConnectClientRep:
client = client_info or self._get_client_info()
request = blueye.protocol.ConnectClientReq(client_info=client)
return self._send_request_get_response(request, blueye.protocol.ConnectClientRep, timeout)
def disconnect_client(
self, client_id: int, timeout: float = 0.05
) -> blueye.protocol.DisconnectClientRep:
request = blueye.protocol.DisconnectClientReq(client_id=client_id)
return self._send_request_get_response(
request, blueye.protocol.DisconnectClientRep, timeout
)
def set_telemetry_msg_publish_frequency(
self, msg: proto.message.MessageMeta | str, frequency: float, timeout: float = 0.05
) -> blueye.protocol.SetPubFrequencyRep:
message_type = self._parse_type_to_string(msg)
request = blueye.protocol.SetPubFrequencyReq(
message_type=message_type,
frequency=frequency,
)
return self._send_request_get_response(request, blueye.protocol.SetPubFrequencyRep, timeout)
def get_telemetry_msg(self, msg: proto.message.MessageMeta | str, timeout: float = 0.05):
message_type = self._parse_type_to_string(msg)
request = blueye.protocol.GetTelemetryReq(message_type=message_type)
return self._send_request_get_response(request, blueye.protocol.GetTelemetryRep, timeout)
Ancestors (in MRO)
- threading.Thread
Instance variables
daemon
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
ident
Thread identifier of this thread or None if it has not been started.
This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.
name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
native_id
Native integral thread ID of this thread, or None if it has not been started.
This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.
Methods
connect_client
def connect_client(
self,
client_info: 'blueye.protocol.ClientInfo' = None,
timeout: 'float' = 0.05
) -> 'blueye.protocol.ConnectClientRep'
View Source
def connect_client(
self, client_info: blueye.protocol.ClientInfo = None, timeout: float = 0.05
) -> blueye.protocol.ConnectClientRep:
client = client_info or self._get_client_info()
request = blueye.protocol.ConnectClientReq(client_info=client)
return self._send_request_get_response(request, blueye.protocol.ConnectClientRep, timeout)
disconnect_client
def disconnect_client(
self,
client_id: 'int',
timeout: 'float' = 0.05
) -> 'blueye.protocol.DisconnectClientRep'
View Source
def disconnect_client(
self, client_id: int, timeout: float = 0.05
) -> blueye.protocol.DisconnectClientRep:
request = blueye.protocol.DisconnectClientReq(client_id=client_id)
return self._send_request_get_response(
request, blueye.protocol.DisconnectClientRep, timeout
)
getName
def getName(
self
)
Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
View Source
def getName(self):
"""Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('getName() is deprecated, get the name attribute instead',
DeprecationWarning, stacklevel=2)
return self.name
get_camera_parameters
def get_camera_parameters(
self,
camera: 'blueye.protocol.Camera',
timeout: 'float' = 0.05
) -> 'blueye.protocol.CameraParameters'
View Source
def get_camera_parameters(
self, camera: blueye.protocol.Camera, timeout: float = 0.05
) -> blueye.protocol.CameraParameters:
request = blueye.protocol.GetCameraParametersReq(camera=camera)
response = self._send_request_get_response(
request, blueye.protocol.GetCameraParametersRep, timeout
)
return response.camera_parameters
get_overlay_parameters
def get_overlay_parameters(
self,
timeout: 'float' = 0.05
) -> 'blueye.protocol.OverlayParameters'
View Source
def get_overlay_parameters(self, timeout: float = 0.05) -> blueye.protocol.OverlayParameters:
request = blueye.protocol.GetOverlayParametersReq()
response = self._send_request_get_response(
request, blueye.protocol.GetOverlayParametersRep, timeout
)
return response.overlay_parameters
get_telemetry_msg
def get_telemetry_msg(
self,
msg: 'proto.message.MessageMeta | str',
timeout: 'float' = 0.05
)
View Source
def get_telemetry_msg(self, msg: proto.message.MessageMeta | str, timeout: float = 0.05):
message_type = self._parse_type_to_string(msg)
request = blueye.protocol.GetTelemetryReq(message_type=message_type)
return self._send_request_get_response(request, blueye.protocol.GetTelemetryRep, timeout)
isDaemon
def isDaemon(
self
)
Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
View Source
def isDaemon(self):
"""Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
"""
import warnings
warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
DeprecationWarning, stacklevel=2)
return self.daemon
is_alive
def is_alive(
self
)
Return whether the thread is alive.
This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().
View Source
def is_alive(self):
"""Return whether the thread is alive.
This method returns True just before the run() method starts until just
after the run() method terminates. See also the module function
enumerate().
"""
assert self._initialized, "Thread.__init__() not called"
if self._is_stopped or not self._started.is_set():
return False
self._wait_for_tstate_lock(False)
return not self._is_stopped
join
def join(
self,
timeout=None
)
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
View Source
def join(self, timeout=None):
"""Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
"""
if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
if not self._started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if timeout is None:
self._wait_for_tstate_lock()
else:
# the behavior of a negative timeout isn't documented, but
# historically .join(timeout=x) for x<0 has acted as if timeout=0
self._wait_for_tstate_lock(timeout=max(timeout, 0))
ping
def ping(
self,
timeout: 'float'
) -> 'blueye.protocol.PingRep'
View Source
def ping(self, timeout: float) -> blueye.protocol.PingRep:
request = blueye.protocol.PingReq()
return self._send_request_get_response(request, blueye.protocol.PingRep, timeout)
run
def run(
self
)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
View Source
def run(self):
while not self._exit_flag.is_set():
try:
msg, response_type, response_callback_queue = self._requests_to_send.get(
timeout=0.1
)
self._socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
msg.__class__.serialize(msg),
]
)
except queue.Empty:
# No requests to send, so we can
continue
# TODO: Deal with timeout
resp = self._socket.recv_multipart()
resp_deserialized = response_type.deserialize(resp[1])
response_callback_queue.put(resp_deserialized)
setDaemon
def setDaemon(
self,
daemonic
)
Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
View Source
def setDaemon(self, daemonic):
"""Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
"""
import warnings
warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
DeprecationWarning, stacklevel=2)
self.daemon = daemonic
setName
def setName(
self,
name
)
Set the name string for this thread.
This method is deprecated, use the name attribute instead.
View Source
def setName(self, name):
"""Set the name string for this thread.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('setName() is deprecated, set the name attribute instead',
DeprecationWarning, stacklevel=2)
self.name = name
set_camera_parameters
def set_camera_parameters(
self,
parameters: 'blueye.protocol.CameraParameters',
timeout: 'float' = 0.05
)
View Source
def set_camera_parameters(
self,
parameters: blueye.protocol.CameraParameters,
timeout: float = 0.05,
):
request = blueye.protocol.SetCameraParametersReq(camera_parameters=parameters)
return self._send_request_get_response(
request, blueye.protocol.SetCameraParametersRep, timeout
)
set_overlay_parameters
def set_overlay_parameters(
self,
parameters: 'blueye.protocol.OverlayParameters',
timeout: 'float' = 0.05
)
View Source
def set_overlay_parameters(
self, parameters: blueye.protocol.OverlayParameters, timeout: float = 0.05
):
request = blueye.protocol.SetOverlayParametersReq(overlay_parameters=parameters)
return self._send_request_get_response(
request, blueye.protocol.SetOverlayParametersRep, timeout
)
set_telemetry_msg_publish_frequency
def set_telemetry_msg_publish_frequency(
self,
msg: 'proto.message.MessageMeta | str',
frequency: 'float',
timeout: 'float' = 0.05
) -> 'blueye.protocol.SetPubFrequencyRep'
View Source
def set_telemetry_msg_publish_frequency(
self, msg: proto.message.MessageMeta | str, frequency: float, timeout: float = 0.05
) -> blueye.protocol.SetPubFrequencyRep:
message_type = self._parse_type_to_string(msg)
request = blueye.protocol.SetPubFrequencyReq(
message_type=message_type,
frequency=frequency,
)
return self._send_request_get_response(request, blueye.protocol.SetPubFrequencyRep, timeout)
start
def start(
self
)
Start the thread's activity.
It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the same thread object.
View Source
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
stop
def stop(
self
)
View Source
def stop(self):
self._exit_flag.set()
sync_time
def sync_time(
self,
time: 'int',
timeout: 'float' = 0.05
)
View Source
def sync_time(self, time: int, timeout: float = 0.05):
request = blueye.protocol.SyncTimeReq(
time={"unix_timestamp": {"seconds": time, "nanos": 0}}
)
return self._send_request_get_response(request, blueye.protocol.SyncTimeRep, timeout)
TelemetryClient
class TelemetryClient(
parent_drone: "'blueye.sdk.Drone'",
context: 'zmq.Context' = None
)
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
View Source
class TelemetryClient(threading.Thread):
def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):
super().__init__(daemon=True)
self._parent_drone = parent_drone
self._context = context or zmq.Context().instance()
self._socket = self._context.socket(zmq.SUB)
self._socket.connect(f"tcp://{self._parent_drone._ip}:5555")
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
self._exit_flag = threading.Event()
self._state_lock = threading.Lock()
self._callbacks: List[Callback] = []
self._state: Dict[proto.message.Message, bytes] = {}
"""`_state` is dictionary of the latest received messages, where the key is the protobuf
message class, eg. blueye.protocol.DepthTel and the value is the serialized protobuf
message"""
def _handle_message(self, msg: Tuple[bytes, bytes]):
msg_type_name = msg[0].decode("utf-8").replace("blueye.protocol.", "")
try:
msg_type = blueye.protocol.__getattribute__(msg_type_name)
except AttributeError:
# If a new telemetry message is introduced before the SDK is updated this can
# be a common occurence, so choosing to log with info instead of warning
logger.info(f"Ignoring unknown message type: {msg_type_name}")
return
msg_payload = msg[1]
with self._state_lock:
self._state[msg_type] = msg_payload
for callback in self._callbacks:
if msg_type in callback.message_filter or callback.message_filter == []:
if callback.pass_raw_data:
callback.function(msg_type_name, msg_payload, **callback.kwargs)
else:
msg_deserialized = msg_type.deserialize(msg_payload)
callback.function(msg_type_name, msg_deserialized, **callback.kwargs)
def run(self):
poller = zmq.Poller()
poller.register(self._socket, zmq.POLLIN)
while not self._exit_flag.is_set():
events_to_be_processed = poller.poll(10)
if len(events_to_be_processed) > 0:
msg = self._socket.recv_multipart()
self._handle_message(msg)
def add_callback(
self,
msg_filter: List[proto.message.Message],
callback_function: Callable[[str, proto.message.Message], None],
raw: bool,
**kwargs,
):
uuid_hex = uuid.uuid1().hex
self._callbacks.append(Callback(msg_filter, callback_function, raw, uuid_hex, kwargs))
return uuid_hex
def remove_callback(self, callback_id):
try:
self._callbacks.pop([cb.uuid_hex for cb in self._callbacks].index(callback_id))
except ValueError:
logger.warning(f"Callback with id {callback_id} not found, ignoring")
def get(self, key: proto.message.Message):
with self._state_lock:
return self._state[key]
def stop(self):
self._exit_flag.set()
Ancestors (in MRO)
- threading.Thread
Instance variables
daemon
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
ident
Thread identifier of this thread or None if it has not been started.
This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.
name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
native_id
Native integral thread ID of this thread, or None if it has not been started.
This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.
Methods
add_callback
def add_callback(
self,
msg_filter: 'List[proto.message.Message]',
callback_function: 'Callable[[str, proto.message.Message], None]',
raw: 'bool',
**kwargs
)
View Source
def add_callback(
self,
msg_filter: List[proto.message.Message],
callback_function: Callable[[str, proto.message.Message], None],
raw: bool,
**kwargs,
):
uuid_hex = uuid.uuid1().hex
self._callbacks.append(Callback(msg_filter, callback_function, raw, uuid_hex, kwargs))
return uuid_hex
get
def get(
self,
key: 'proto.message.Message'
)
View Source
def get(self, key: proto.message.Message):
with self._state_lock:
return self._state[key]
getName
def getName(
self
)
Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
View Source
def getName(self):
"""Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('getName() is deprecated, get the name attribute instead',
DeprecationWarning, stacklevel=2)
return self.name
isDaemon
def isDaemon(
self
)
Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
View Source
def isDaemon(self):
"""Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
"""
import warnings
warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
DeprecationWarning, stacklevel=2)
return self.daemon
is_alive
def is_alive(
self
)
Return whether the thread is alive.
This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().
View Source
def is_alive(self):
"""Return whether the thread is alive.
This method returns True just before the run() method starts until just
after the run() method terminates. See also the module function
enumerate().
"""
assert self._initialized, "Thread.__init__() not called"
if self._is_stopped or not self._started.is_set():
return False
self._wait_for_tstate_lock(False)
return not self._is_stopped
join
def join(
self,
timeout=None
)
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
View Source
def join(self, timeout=None):
"""Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
"""
if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
if not self._started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if timeout is None:
self._wait_for_tstate_lock()
else:
# the behavior of a negative timeout isn't documented, but
# historically .join(timeout=x) for x<0 has acted as if timeout=0
self._wait_for_tstate_lock(timeout=max(timeout, 0))
remove_callback
def remove_callback(
self,
callback_id
)
View Source
def remove_callback(self, callback_id):
try:
self._callbacks.pop([cb.uuid_hex for cb in self._callbacks].index(callback_id))
except ValueError:
logger.warning(f"Callback with id {callback_id} not found, ignoring")
run
def run(
self
)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
View Source
def run(self):
poller = zmq.Poller()
poller.register(self._socket, zmq.POLLIN)
while not self._exit_flag.is_set():
events_to_be_processed = poller.poll(10)
if len(events_to_be_processed) > 0:
msg = self._socket.recv_multipart()
self._handle_message(msg)
setDaemon
def setDaemon(
self,
daemonic
)
Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
View Source
def setDaemon(self, daemonic):
"""Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
"""
import warnings
warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
DeprecationWarning, stacklevel=2)
self.daemon = daemonic
setName
def setName(
self,
name
)
Set the name string for this thread.
This method is deprecated, use the name attribute instead.
View Source
def setName(self, name):
"""Set the name string for this thread.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('setName() is deprecated, set the name attribute instead',
DeprecationWarning, stacklevel=2)
self.name = name
start
def start(
self
)
Start the thread's activity.
It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the same thread object.
View Source
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
stop
def stop(
self
)
View Source
def stop(self):
self._exit_flag.set()
WatchdogPublisher
class WatchdogPublisher(
parent_drone: "'blueye.sdk.Drone'",
context: 'zmq.Context' = None
)
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
View Source
class WatchdogPublisher(threading.Thread):
def __init__(self, parent_drone: "blueye.sdk.Drone", context: zmq.Context = None):
super().__init__(daemon=True)
self._parent_drone = parent_drone
self._context = context or zmq.Context().instance()
self._socket = self._context.socket(zmq.PUB)
self._socket.connect(f"tcp://{self._parent_drone._ip}:5557")
self._exit_flag = threading.Event()
def run(self):
duration = 0
WATCHDOG_DELAY = 1
while not self._exit_flag.wait(WATCHDOG_DELAY):
self.pet_watchdog(duration)
duration += 1
def pet_watchdog(self, duration):
msg = blueye.protocol.WatchdogCtrl(
connection_duration={"value": duration}, client_id=self._parent_drone.client_id
)
self._socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
blueye.protocol.WatchdogCtrl.serialize(msg),
]
)
def stop(self):
"""Stop the watchdog thread started by run()"""
self._exit_flag.set()
Ancestors (in MRO)
- threading.Thread
Instance variables
daemon
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
ident
Thread identifier of this thread or None if it has not been started.
This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.
name
A string used for identification purposes only.
It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.
native_id
Native integral thread ID of this thread, or None if it has not been started.
This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.
Methods
getName
def getName(
self
)
Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
View Source
def getName(self):
"""Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('getName() is deprecated, get the name attribute instead',
DeprecationWarning, stacklevel=2)
return self.name
isDaemon
def isDaemon(
self
)
Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
View Source
def isDaemon(self):
"""Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
"""
import warnings
warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
DeprecationWarning, stacklevel=2)
return self.daemon
is_alive
def is_alive(
self
)
Return whether the thread is alive.
This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().
View Source
def is_alive(self):
"""Return whether the thread is alive.
This method returns True just before the run() method starts until just
after the run() method terminates. See also the module function
enumerate().
"""
assert self._initialized, "Thread.__init__() not called"
if self._is_stopped or not self._started.is_set():
return False
self._wait_for_tstate_lock(False)
return not self._is_stopped
join
def join(
self,
timeout=None
)
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
View Source
def join(self, timeout=None):
"""Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
"""
if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
if not self._started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if timeout is None:
self._wait_for_tstate_lock()
else:
# the behavior of a negative timeout isn't documented, but
# historically .join(timeout=x) for x<0 has acted as if timeout=0
self._wait_for_tstate_lock(timeout=max(timeout, 0))
pet_watchdog
def pet_watchdog(
self,
duration
)
View Source
def pet_watchdog(self, duration):
msg = blueye.protocol.WatchdogCtrl(
connection_duration={"value": duration}, client_id=self._parent_drone.client_id
)
self._socket.send_multipart(
[
bytes(msg._pb.DESCRIPTOR.full_name, "utf-8"),
blueye.protocol.WatchdogCtrl.serialize(msg),
]
)
run
def run(
self
)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
View Source
def run(self):
duration = 0
WATCHDOG_DELAY = 1
while not self._exit_flag.wait(WATCHDOG_DELAY):
self.pet_watchdog(duration)
duration += 1
setDaemon
def setDaemon(
self,
daemonic
)
Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
View Source
def setDaemon(self, daemonic):
"""Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
"""
import warnings
warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
DeprecationWarning, stacklevel=2)
self.daemon = daemonic
setName
def setName(
self,
name
)
Set the name string for this thread.
This method is deprecated, use the name attribute instead.
View Source
def setName(self, name):
"""Set the name string for this thread.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('setName() is deprecated, set the name attribute instead',
DeprecationWarning, stacklevel=2)
self.name = name
start
def start(
self
)
Start the thread's activity.
It must be called at most once per thread object. It arranges for the object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the same thread object.
View Source
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
stop
def stop(
self
)
Stop the watchdog thread started by run()
View Source
def stop(self):
"""Stop the watchdog thread started by run()"""
self._exit_flag.set()