Module blueye.sdk.drone
View Source
#!/usr/bin/env python3
from __future__ import annotations
import logging
import time
from datetime import datetime
from json import JSONDecodeError
from typing import Callable, Dict, List, Optional
import blueye.protocol
import proto
import requests
from packaging import version
from .battery import Battery
from .camera import Camera
from .connection import CtrlClient, ReqRepClient, TelemetryClient, WatchdogPublisher
from .constants import WaterDensities
from .guestport import (
Gripper,
GuestPortCamera,
GuestPortLight,
Peripheral,
device_to_peripheral,
)
from .logs import LegacyLogs, Logs
from .motion import Motion
logger = logging.getLogger(__name__)
class Config:
def __init__(self, parent_drone: "Drone"):
self._parent_drone = parent_drone
self._water_density = WaterDensities.salty
@property
def water_density(self):
"""Get or set the current water density for increased pressure sensor accuracy
Older software versions will assume a water density of 1025 grams per liter.
The WaterDensities class contains typical densities for salty-, brackish-, and fresh water
(these are the same values that the Blueye app uses).
"""
return self._water_density
@water_density.setter
def water_density(self, density: float):
self._water_density = density
self._parent_drone._ctrl_client.set_water_density(density)
def set_drone_time(self, time: int):
"""Set the system for the drone
This method is used to set the system time for the drone. The argument `time` is expected to
be a Unix timestamp (ie. the number of seconds since the epoch).
"""
self._parent_drone._req_rep_client.sync_time(time)
class _NoConnectionClient:
"""A client that raises a ConnectionError if you use any of its functions"""
def __getattr__(self, name):
def method(*args, **kwargs):
raise ConnectionError(
"The connection to the drone is not established, "
"try calling the connect method before retrying"
)
return method
class Telemetry:
def __init__(self, parent_drone: "Drone"):
self._parent_drone = parent_drone
def set_msg_publish_frequency(self, msg: proto.message.Message, frequency: float):
"""Set the publishing frequency of a specific telemetry message
Raises a RuntimeError if the drone fails to set the frequency. Possible causes could be a
frequency outside the valid range, or an incorrect message type.
*Arguments*:
* msg (proto.message.Message): The message to set the frequency of. Needs to be one of the
messages in blueye.protocol that end in Tel, eg.
blueye.protocol.DepthTel
* frequency (float): The frequency in Hz. Valid range is (0 .. 100).
"""
resp = self._parent_drone._req_rep_client.set_telemetry_msg_publish_frequency(
msg, frequency
)
if not resp.success:
raise RuntimeError("Could not set telemetry message frequency")
def add_msg_callback(
self,
msg_filter: List[proto.message.Message],
callback: Callable[[str, proto.message.Message], None],
raw: bool = False,
**kwargs,
) -> str:
"""Register a telemetry message callback
The callback is called each time a message of the type is received
*Arguments*:
* msg_filter: A list of message types to register the callback for.
Eg. `[blueye.protocol.DepthTel, blueye.protocol.Imu1Tel]`. If the list is
empty the callback will be registered for all message types
* callback: The callback function. It should be minimal and return as fast as possible to
not block the telemetry communication. It is called with two arguments, the
message type name and the message object
* raw: Pass the raw data instead of the deserialized message to the callback function
* kwargs: Additional keyword arguments to pass to the callback function
*Returns*:
* uuid: Callback id. Can be used to remove callback in the future
"""
uuid_hex = self._parent_drone._telemetry_watcher.add_callback(
msg_filter, callback, raw, **kwargs
)
return uuid_hex
def remove_msg_callback(self, callback_id: str) -> Optional[str]:
"""Remove a telemetry message callback
*Arguments*:
* callback_id: The callback id
"""
self._parent_drone._telemetry_watcher.remove_callback(callback_id)
def get(
self, msg_type: proto.message.Message, deserialize=True
) -> Optional[proto.message.Message | bytes]:
"""Get the latest telemetry message of the specified type
*Arguments*:
* msg_type: The message type to get. Eg. blueye.protocol.DepthTel
* deserialize: If True, the message will be deserialized before being returned. If False,
the raw bytes will be returned.
*Returns*:
* The latest message of the specified type, or None if no message has been received yet
"""
try:
msg = self._parent_drone._telemetry_watcher.get(msg_type)
except KeyError:
if version.parse(self._parent_drone.software_version_short) >= version.parse("3.3"):
msg = self._parent_drone._req_rep_client.get_telemetry_msg(msg_type).payload.value
if msg == b"":
return None
else:
return None
if deserialize:
return msg_type.deserialize(msg)
else:
return msg
class Drone:
"""A class providing an interface to a Blueye drone's functions
Automatically connects to the drone using the default ip when instantiated, this behaviour can
be disabled by setting `auto_connect=False`.
"""
def __init__(
self,
ip="192.168.1.101",
auto_connect=True,
timeout=10,
disconnect_other_clients=False,
):
self._ip = ip
self.camera = Camera(self, is_guestport_camera=False)
self.motion = Motion(self)
self.logs = Logs(self)
self.legacy_logs = LegacyLogs(self)
self.config = Config(self)
self.battery = Battery(self)
self.telemetry = Telemetry(self)
self.connected = False
self.client_id: int = None
self.in_control: bool = False
self._watchdog_publisher = _NoConnectionClient()
self._telemetry_watcher = _NoConnectionClient()
self._req_rep_client = _NoConnectionClient()
self._ctrl_client = _NoConnectionClient()
self.peripherals: Optional[List[Peripheral]] = None
"""This list holds the peripherals connected to the drone. If it is `None`, then no
Guestport telemetry message has been recieved yet."""
if auto_connect is True:
self.connect(timeout=timeout, disconnect_other_clients=disconnect_other_clients)
def _verify_required_blunux_version(self, requirement: str):
"""Verify that Blunux version is higher than requirement
requirement needs to be a string that's able to be parsed by version.parse()
Raises a RuntimeError if the Blunux version of the connected drone does not match or exceed
the requirement.
"""
if version.parse(self.software_version_short) < version.parse(requirement):
raise RuntimeError(
f"Blunux version of connected drone is {self.software_version_short}. Version "
f"{requirement} or higher is required."
)
def _update_drone_info(self, timeout: float = 3):
"""Request and store information about the connected drone"""
try:
response = requests.get(
f"http://{self._ip}/diagnostics/drone_info", timeout=timeout
).json()
except (
requests.ConnectTimeout,
requests.ReadTimeout,
requests.ConnectionError,
JSONDecodeError,
) as e:
raise ConnectionError("Could not establish connection with drone") from e
try:
self.features = list(filter(None, response["features"].split(",")))
except KeyError:
# Drone versions older than 1.4.7 did not have this field.
self.features = []
self.software_version = response["sw_version"]
self.software_version_short = self.software_version.split("-")[0]
self.serial_number = response["serial_number"]
self.uuid = response["hardware_id"]
@staticmethod
def _drone_info_callback(msg_type: str, msg: blueye.protocol.DroneInfoTel, drone: Drone):
# Check if the GuestPortInfo has been initialized
if msg.drone_info.gp._pb.ByteSize() != 0:
drone._create_peripherals_from_drone_info(msg.drone_info.gp)
# Remove the callback after the first message has been received
drone.telemetry.remove_msg_callback(drone._drone_info_cb_id)
def _create_peripherals_from_drone_info(self, gp_info: blueye.protocol.GuestPortInfo):
self.peripherals = []
for port in (gp_info.gp1, gp_info.gp2, gp_info.gp3):
for device in port.device_list.devices:
peripheral = device_to_peripheral(self, port.guest_port_number, device)
self.peripherals.append(peripheral)
if isinstance(peripheral, GuestPortLight):
self.external_light = peripheral
elif isinstance(peripheral, GuestPortCamera):
self.external_camera = peripheral
elif isinstance(peripheral, Gripper):
self.gripper = peripheral
def connect(
self,
client_info: blueye.protocol.ClientInfo = None,
timeout: float = 4,
disconnect_other_clients: bool = False,
):
"""Establish a connection to the drone
Spawns of several threads for receiving telemetry, sending control messages and publishing
watchdog messages.
When a watchdog message is receieved by the drone the thrusters are armed, so to stop the
drone from moving unexpectedly when connecting all thruster set points are set to zero when
connecting.
** Arguments **
- *client_info*: Information about the client connecting, if None the SDK will attempt to
read it from the environment
- *timeout*: Seconds to wait for connection. The first connection on boot can be a little
slower than the following ones
- *disconnect_other_clients*: If True, disconnect clients until drone reports that we are in
control
** Raises **
- *ConnectionError*: If the connection attempt fails
- *RuntimeError*: If the Blunux version of the connected drone is too old
"""
logger.info(f"Attempting to connect to drone at {self._ip}")
self._update_drone_info(timeout=timeout)
self._verify_required_blunux_version("3.2")
self._telemetry_watcher = TelemetryClient(self)
self._ctrl_client = CtrlClient(self)
self._watchdog_publisher = WatchdogPublisher(self)
self._req_rep_client = ReqRepClient(self)
self._telemetry_watcher.start()
self._req_rep_client.start()
self._ctrl_client.start()
self._watchdog_publisher.start()
try:
self.ping()
connect_resp = self._req_rep_client.connect_client(client_info=client_info)
except blueye.protocol.exceptions.ResponseTimeout as e:
raise ConnectionError("Could not establish connection with drone") from e
logger.info(f"Connection successful, client id: {connect_resp.client_id}")
logger.info(f"Client id in control: {connect_resp.client_id_in_control}")
logger.info(f"There are {len(connect_resp.connected_clients)-1} other clients connected")
self.client_id = connect_resp.client_id
self.in_control = connect_resp.client_id == connect_resp.client_id_in_control
self.connected = True
if disconnect_other_clients and not self.in_control:
self.take_control()
self._drone_info_cb_id = self.telemetry.add_msg_callback(
[blueye.protocol.DroneInfoTel],
Drone._drone_info_callback,
False,
drone=self,
)
if self.in_control:
# The drone runs from a read-only filesystem, and as such does not keep any state,
# therefore when we connect to it we should send the current time
current_time = int(time.time())
time_formatted = datetime.fromtimestamp(current_time).strftime("%d. %b %Y %H:%M")
logger.debug(f"Setting current time to {current_time} ({time_formatted})")
self.config.set_drone_time(current_time)
logger.debug(f"Disabling thrusters")
self.motion.send_thruster_setpoint(0, 0, 0, 0)
def disconnect(self):
"""Disconnects the connection, allowing another client to take control of the drone"""
try:
self._req_rep_client.disconnect_client(self.client_id)
except blueye.protocol.exceptions.ResponseTimeout:
# If there's no response the connection is likely already closed, so we can just
# continue to stop threads and disconnect
pass
self._watchdog_publisher.stop()
self._telemetry_watcher.stop()
self._req_rep_client.stop()
self._ctrl_client.stop()
self._watchdog_publisher = _NoConnectionClient()
self._telemetry_watcher = _NoConnectionClient()
self._req_rep_client = _NoConnectionClient()
self._ctrl_client = _NoConnectionClient()
self.connected = False
@property
def connected_clients(self) -> Optional[List[blueye.protocol.ConnectedClient]]:
"""Get a list of connected clients"""
clients_tel = self.telemetry.get(blueye.protocol.ConnectedClientsTel)
if clients_tel is None:
return None
else:
return list(clients_tel.connected_clients)
@property
def client_in_control(self) -> Optional[int]:
"""Get the client id of the client in control of the drone"""
clients_tel = self.telemetry.get(blueye.protocol.ConnectedClientsTel)
if clients_tel is None:
return None
else:
return clients_tel.client_id_in_control
def take_control(self, timeout=1):
"""Take control of the drone, disconnecting other clients
Will disconnect other clients until the client is in control of the drone.
Raises a RuntimeError if the client could not take control of the drone in the given time.
"""
start_time = time.time()
client_in_control = self.client_in_control
while self.client_id != client_in_control:
if time.time() - start_time > timeout:
raise RuntimeError("Could not take control of the drone in the given time")
resp = self._req_rep_client.disconnect_client(client_in_control)
client_in_control = resp.client_id_in_control
self.in_control = True
@property
def lights(self) -> Optional[float]:
"""Get or set the intensity of the drone lights
*Arguments*:
* brightness (float): Set the intensity of the drone light (0..1)
*Returns*:
* brightness (float): The intensity of the drone light (0..1)
"""
return self.telemetry.get(blueye.protocol.LightsTel).lights.value
@lights.setter
def lights(self, brightness: float):
if not 0 <= brightness <= 1:
raise ValueError("Error occured while trying to set lights to: " f"{brightness}")
self._ctrl_client.set_lights(brightness)
@property
def depth(self) -> Optional[float]:
"""Get the current depth in meters
*Returns*:
* depth (float): The depth in meters of water column.
"""
depth_tel = self.telemetry.get(blueye.protocol.DepthTel)
if depth_tel is None:
return None
else:
return depth_tel.depth.value
@property
def pose(self) -> Optional[dict]:
"""Get the current orientation of the drone
*Returns*:
* pose (dict): Dictionary with roll, pitch, and yaw in degrees, from 0 to 359.
"""
attitude_tel = self.telemetry.get(blueye.protocol.AttitudeTel)
if attitude_tel is None:
return None
attitude = attitude_tel.attitude
pose = {
"roll": (attitude.roll + 360) % 360,
"pitch": (attitude.pitch + 360) % 360,
"yaw": (attitude.yaw + 360) % 360,
}
return pose
@property
def error_flags(self) -> Optional[Dict[str, bool]]:
"""Get the error flags
*Returns*:
* error_flags (dict): The error flags as bools in a dictionary
"""
error_flags_tel = self.telemetry.get(blueye.protocol.ErrorFlagsTel)
if error_flags_tel is None:
return None
error_flags_msg = error_flags_tel.error_flags
error_flags = {}
possible_flags = [attr for attr in dir(error_flags_msg) if not attr.startswith("__")]
for flag in possible_flags:
error_flags[flag] = getattr(error_flags_msg, flag)
return error_flags
@property
def active_video_streams(self) -> Optional[Dict[str, int]]:
"""Get the number of currently active connections to the video stream
Every client connected to the RTSP stream (does not matter if it's directly from GStreamer,
or from the Blueye app) counts as one connection.
"""
n_streamers_msg_tel = self.telemetry.get(blueye.protocol.NStreamersTel)
if n_streamers_msg_tel is None:
return None
n_streamers_msg = n_streamers_msg_tel.n_streamers
return {"main": n_streamers_msg.main, "guestport": n_streamers_msg.guestport}
def ping(self, timeout: float = 1.0):
"""Ping drone
Raises a ResponseTimeout exception if the drone does not respond within the timeout period.
"""
self._req_rep_client.ping(timeout)
Variables
logger
Classes
Config
class Config(
parent_drone: "'Drone'"
)
View Source
class Config:
def __init__(self, parent_drone: "Drone"):
self._parent_drone = parent_drone
self._water_density = WaterDensities.salty
@property
def water_density(self):
"""Get or set the current water density for increased pressure sensor accuracy
Older software versions will assume a water density of 1025 grams per liter.
The WaterDensities class contains typical densities for salty-, brackish-, and fresh water
(these are the same values that the Blueye app uses).
"""
return self._water_density
@water_density.setter
def water_density(self, density: float):
self._water_density = density
self._parent_drone._ctrl_client.set_water_density(density)
def set_drone_time(self, time: int):
"""Set the system for the drone
This method is used to set the system time for the drone. The argument `time` is expected to
be a Unix timestamp (ie. the number of seconds since the epoch).
"""
self._parent_drone._req_rep_client.sync_time(time)
Instance variables
water_density
Get or set the current water density for increased pressure sensor accuracy
Older software versions will assume a water density of 1025 grams per liter.
The WaterDensities class contains typical densities for salty-, brackish-, and fresh water (these are the same values that the Blueye app uses).
Methods
set_drone_time
def set_drone_time(
self,
time: 'int'
)
Set the system for the drone
This method is used to set the system time for the drone. The argument time
is expected to
be a Unix timestamp (ie. the number of seconds since the epoch).
View Source
def set_drone_time(self, time: int):
"""Set the system for the drone
This method is used to set the system time for the drone. The argument `time` is expected to
be a Unix timestamp (ie. the number of seconds since the epoch).
"""
self._parent_drone._req_rep_client.sync_time(time)
Drone
class Drone(
ip='192.168.1.101',
auto_connect=True,
timeout=10,
disconnect_other_clients=False
)
A class providing an interface to a Blueye drone's functions
Automatically connects to the drone using the default ip when instantiated, this behaviour can
be disabled by setting auto_connect=False
.
View Source
class Drone:
"""A class providing an interface to a Blueye drone's functions
Automatically connects to the drone using the default ip when instantiated, this behaviour can
be disabled by setting `auto_connect=False`.
"""
def __init__(
self,
ip="192.168.1.101",
auto_connect=True,
timeout=10,
disconnect_other_clients=False,
):
self._ip = ip
self.camera = Camera(self, is_guestport_camera=False)
self.motion = Motion(self)
self.logs = Logs(self)
self.legacy_logs = LegacyLogs(self)
self.config = Config(self)
self.battery = Battery(self)
self.telemetry = Telemetry(self)
self.connected = False
self.client_id: int = None
self.in_control: bool = False
self._watchdog_publisher = _NoConnectionClient()
self._telemetry_watcher = _NoConnectionClient()
self._req_rep_client = _NoConnectionClient()
self._ctrl_client = _NoConnectionClient()
self.peripherals: Optional[List[Peripheral]] = None
"""This list holds the peripherals connected to the drone. If it is `None`, then no
Guestport telemetry message has been recieved yet."""
if auto_connect is True:
self.connect(timeout=timeout, disconnect_other_clients=disconnect_other_clients)
def _verify_required_blunux_version(self, requirement: str):
"""Verify that Blunux version is higher than requirement
requirement needs to be a string that's able to be parsed by version.parse()
Raises a RuntimeError if the Blunux version of the connected drone does not match or exceed
the requirement.
"""
if version.parse(self.software_version_short) < version.parse(requirement):
raise RuntimeError(
f"Blunux version of connected drone is {self.software_version_short}. Version "
f"{requirement} or higher is required."
)
def _update_drone_info(self, timeout: float = 3):
"""Request and store information about the connected drone"""
try:
response = requests.get(
f"http://{self._ip}/diagnostics/drone_info", timeout=timeout
).json()
except (
requests.ConnectTimeout,
requests.ReadTimeout,
requests.ConnectionError,
JSONDecodeError,
) as e:
raise ConnectionError("Could not establish connection with drone") from e
try:
self.features = list(filter(None, response["features"].split(",")))
except KeyError:
# Drone versions older than 1.4.7 did not have this field.
self.features = []
self.software_version = response["sw_version"]
self.software_version_short = self.software_version.split("-")[0]
self.serial_number = response["serial_number"]
self.uuid = response["hardware_id"]
@staticmethod
def _drone_info_callback(msg_type: str, msg: blueye.protocol.DroneInfoTel, drone: Drone):
# Check if the GuestPortInfo has been initialized
if msg.drone_info.gp._pb.ByteSize() != 0:
drone._create_peripherals_from_drone_info(msg.drone_info.gp)
# Remove the callback after the first message has been received
drone.telemetry.remove_msg_callback(drone._drone_info_cb_id)
def _create_peripherals_from_drone_info(self, gp_info: blueye.protocol.GuestPortInfo):
self.peripherals = []
for port in (gp_info.gp1, gp_info.gp2, gp_info.gp3):
for device in port.device_list.devices:
peripheral = device_to_peripheral(self, port.guest_port_number, device)
self.peripherals.append(peripheral)
if isinstance(peripheral, GuestPortLight):
self.external_light = peripheral
elif isinstance(peripheral, GuestPortCamera):
self.external_camera = peripheral
elif isinstance(peripheral, Gripper):
self.gripper = peripheral
def connect(
self,
client_info: blueye.protocol.ClientInfo = None,
timeout: float = 4,
disconnect_other_clients: bool = False,
):
"""Establish a connection to the drone
Spawns of several threads for receiving telemetry, sending control messages and publishing
watchdog messages.
When a watchdog message is receieved by the drone the thrusters are armed, so to stop the
drone from moving unexpectedly when connecting all thruster set points are set to zero when
connecting.
** Arguments **
- *client_info*: Information about the client connecting, if None the SDK will attempt to
read it from the environment
- *timeout*: Seconds to wait for connection. The first connection on boot can be a little
slower than the following ones
- *disconnect_other_clients*: If True, disconnect clients until drone reports that we are in
control
** Raises **
- *ConnectionError*: If the connection attempt fails
- *RuntimeError*: If the Blunux version of the connected drone is too old
"""
logger.info(f"Attempting to connect to drone at {self._ip}")
self._update_drone_info(timeout=timeout)
self._verify_required_blunux_version("3.2")
self._telemetry_watcher = TelemetryClient(self)
self._ctrl_client = CtrlClient(self)
self._watchdog_publisher = WatchdogPublisher(self)
self._req_rep_client = ReqRepClient(self)
self._telemetry_watcher.start()
self._req_rep_client.start()
self._ctrl_client.start()
self._watchdog_publisher.start()
try:
self.ping()
connect_resp = self._req_rep_client.connect_client(client_info=client_info)
except blueye.protocol.exceptions.ResponseTimeout as e:
raise ConnectionError("Could not establish connection with drone") from e
logger.info(f"Connection successful, client id: {connect_resp.client_id}")
logger.info(f"Client id in control: {connect_resp.client_id_in_control}")
logger.info(f"There are {len(connect_resp.connected_clients)-1} other clients connected")
self.client_id = connect_resp.client_id
self.in_control = connect_resp.client_id == connect_resp.client_id_in_control
self.connected = True
if disconnect_other_clients and not self.in_control:
self.take_control()
self._drone_info_cb_id = self.telemetry.add_msg_callback(
[blueye.protocol.DroneInfoTel],
Drone._drone_info_callback,
False,
drone=self,
)
if self.in_control:
# The drone runs from a read-only filesystem, and as such does not keep any state,
# therefore when we connect to it we should send the current time
current_time = int(time.time())
time_formatted = datetime.fromtimestamp(current_time).strftime("%d. %b %Y %H:%M")
logger.debug(f"Setting current time to {current_time} ({time_formatted})")
self.config.set_drone_time(current_time)
logger.debug(f"Disabling thrusters")
self.motion.send_thruster_setpoint(0, 0, 0, 0)
def disconnect(self):
"""Disconnects the connection, allowing another client to take control of the drone"""
try:
self._req_rep_client.disconnect_client(self.client_id)
except blueye.protocol.exceptions.ResponseTimeout:
# If there's no response the connection is likely already closed, so we can just
# continue to stop threads and disconnect
pass
self._watchdog_publisher.stop()
self._telemetry_watcher.stop()
self._req_rep_client.stop()
self._ctrl_client.stop()
self._watchdog_publisher = _NoConnectionClient()
self._telemetry_watcher = _NoConnectionClient()
self._req_rep_client = _NoConnectionClient()
self._ctrl_client = _NoConnectionClient()
self.connected = False
@property
def connected_clients(self) -> Optional[List[blueye.protocol.ConnectedClient]]:
"""Get a list of connected clients"""
clients_tel = self.telemetry.get(blueye.protocol.ConnectedClientsTel)
if clients_tel is None:
return None
else:
return list(clients_tel.connected_clients)
@property
def client_in_control(self) -> Optional[int]:
"""Get the client id of the client in control of the drone"""
clients_tel = self.telemetry.get(blueye.protocol.ConnectedClientsTel)
if clients_tel is None:
return None
else:
return clients_tel.client_id_in_control
def take_control(self, timeout=1):
"""Take control of the drone, disconnecting other clients
Will disconnect other clients until the client is in control of the drone.
Raises a RuntimeError if the client could not take control of the drone in the given time.
"""
start_time = time.time()
client_in_control = self.client_in_control
while self.client_id != client_in_control:
if time.time() - start_time > timeout:
raise RuntimeError("Could not take control of the drone in the given time")
resp = self._req_rep_client.disconnect_client(client_in_control)
client_in_control = resp.client_id_in_control
self.in_control = True
@property
def lights(self) -> Optional[float]:
"""Get or set the intensity of the drone lights
*Arguments*:
* brightness (float): Set the intensity of the drone light (0..1)
*Returns*:
* brightness (float): The intensity of the drone light (0..1)
"""
return self.telemetry.get(blueye.protocol.LightsTel).lights.value
@lights.setter
def lights(self, brightness: float):
if not 0 <= brightness <= 1:
raise ValueError("Error occured while trying to set lights to: " f"{brightness}")
self._ctrl_client.set_lights(brightness)
@property
def depth(self) -> Optional[float]:
"""Get the current depth in meters
*Returns*:
* depth (float): The depth in meters of water column.
"""
depth_tel = self.telemetry.get(blueye.protocol.DepthTel)
if depth_tel is None:
return None
else:
return depth_tel.depth.value
@property
def pose(self) -> Optional[dict]:
"""Get the current orientation of the drone
*Returns*:
* pose (dict): Dictionary with roll, pitch, and yaw in degrees, from 0 to 359.
"""
attitude_tel = self.telemetry.get(blueye.protocol.AttitudeTel)
if attitude_tel is None:
return None
attitude = attitude_tel.attitude
pose = {
"roll": (attitude.roll + 360) % 360,
"pitch": (attitude.pitch + 360) % 360,
"yaw": (attitude.yaw + 360) % 360,
}
return pose
@property
def error_flags(self) -> Optional[Dict[str, bool]]:
"""Get the error flags
*Returns*:
* error_flags (dict): The error flags as bools in a dictionary
"""
error_flags_tel = self.telemetry.get(blueye.protocol.ErrorFlagsTel)
if error_flags_tel is None:
return None
error_flags_msg = error_flags_tel.error_flags
error_flags = {}
possible_flags = [attr for attr in dir(error_flags_msg) if not attr.startswith("__")]
for flag in possible_flags:
error_flags[flag] = getattr(error_flags_msg, flag)
return error_flags
@property
def active_video_streams(self) -> Optional[Dict[str, int]]:
"""Get the number of currently active connections to the video stream
Every client connected to the RTSP stream (does not matter if it's directly from GStreamer,
or from the Blueye app) counts as one connection.
"""
n_streamers_msg_tel = self.telemetry.get(blueye.protocol.NStreamersTel)
if n_streamers_msg_tel is None:
return None
n_streamers_msg = n_streamers_msg_tel.n_streamers
return {"main": n_streamers_msg.main, "guestport": n_streamers_msg.guestport}
def ping(self, timeout: float = 1.0):
"""Ping drone
Raises a ResponseTimeout exception if the drone does not respond within the timeout period.
"""
self._req_rep_client.ping(timeout)
Instance variables
active_video_streams
Get the number of currently active connections to the video stream
Every client connected to the RTSP stream (does not matter if it's directly from GStreamer, or from the Blueye app) counts as one connection.
client_in_control
Get the client id of the client in control of the drone
connected_clients
Get a list of connected clients
depth
Get the current depth in meters
Returns:
- depth (float): The depth in meters of water column.
error_flags
Get the error flags
Returns:
- error_flags (dict): The error flags as bools in a dictionary
lights
Get or set the intensity of the drone lights
Arguments:
- brightness (float): Set the intensity of the drone light (0..1)
Returns:
- brightness (float): The intensity of the drone light (0..1)
pose
Get the current orientation of the drone
Returns:
- pose (dict): Dictionary with roll, pitch, and yaw in degrees, from 0 to 359.
Methods
connect
def connect(
self,
client_info: 'blueye.protocol.ClientInfo' = None,
timeout: 'float' = 4,
disconnect_other_clients: 'bool' = False
)
Establish a connection to the drone
Spawns of several threads for receiving telemetry, sending control messages and publishing watchdog messages.
When a watchdog message is receieved by the drone the thrusters are armed, so to stop the drone from moving unexpectedly when connecting all thruster set points are set to zero when connecting.
** Arguments ** - client_info: Information about the client connecting, if None the SDK will attempt to read it from the environment - timeout: Seconds to wait for connection. The first connection on boot can be a little slower than the following ones - disconnect_other_clients: If True, disconnect clients until drone reports that we are in control
** Raises ** - ConnectionError: If the connection attempt fails - RuntimeError: If the Blunux version of the connected drone is too old
View Source
def connect(
self,
client_info: blueye.protocol.ClientInfo = None,
timeout: float = 4,
disconnect_other_clients: bool = False,
):
"""Establish a connection to the drone
Spawns of several threads for receiving telemetry, sending control messages and publishing
watchdog messages.
When a watchdog message is receieved by the drone the thrusters are armed, so to stop the
drone from moving unexpectedly when connecting all thruster set points are set to zero when
connecting.
** Arguments **
- *client_info*: Information about the client connecting, if None the SDK will attempt to
read it from the environment
- *timeout*: Seconds to wait for connection. The first connection on boot can be a little
slower than the following ones
- *disconnect_other_clients*: If True, disconnect clients until drone reports that we are in
control
** Raises **
- *ConnectionError*: If the connection attempt fails
- *RuntimeError*: If the Blunux version of the connected drone is too old
"""
logger.info(f"Attempting to connect to drone at {self._ip}")
self._update_drone_info(timeout=timeout)
self._verify_required_blunux_version("3.2")
self._telemetry_watcher = TelemetryClient(self)
self._ctrl_client = CtrlClient(self)
self._watchdog_publisher = WatchdogPublisher(self)
self._req_rep_client = ReqRepClient(self)
self._telemetry_watcher.start()
self._req_rep_client.start()
self._ctrl_client.start()
self._watchdog_publisher.start()
try:
self.ping()
connect_resp = self._req_rep_client.connect_client(client_info=client_info)
except blueye.protocol.exceptions.ResponseTimeout as e:
raise ConnectionError("Could not establish connection with drone") from e
logger.info(f"Connection successful, client id: {connect_resp.client_id}")
logger.info(f"Client id in control: {connect_resp.client_id_in_control}")
logger.info(f"There are {len(connect_resp.connected_clients)-1} other clients connected")
self.client_id = connect_resp.client_id
self.in_control = connect_resp.client_id == connect_resp.client_id_in_control
self.connected = True
if disconnect_other_clients and not self.in_control:
self.take_control()
self._drone_info_cb_id = self.telemetry.add_msg_callback(
[blueye.protocol.DroneInfoTel],
Drone._drone_info_callback,
False,
drone=self,
)
if self.in_control:
# The drone runs from a read-only filesystem, and as such does not keep any state,
# therefore when we connect to it we should send the current time
current_time = int(time.time())
time_formatted = datetime.fromtimestamp(current_time).strftime("%d. %b %Y %H:%M")
logger.debug(f"Setting current time to {current_time} ({time_formatted})")
self.config.set_drone_time(current_time)
logger.debug(f"Disabling thrusters")
self.motion.send_thruster_setpoint(0, 0, 0, 0)
disconnect
def disconnect(
self
)
Disconnects the connection, allowing another client to take control of the drone
View Source
def disconnect(self):
"""Disconnects the connection, allowing another client to take control of the drone"""
try:
self._req_rep_client.disconnect_client(self.client_id)
except blueye.protocol.exceptions.ResponseTimeout:
# If there's no response the connection is likely already closed, so we can just
# continue to stop threads and disconnect
pass
self._watchdog_publisher.stop()
self._telemetry_watcher.stop()
self._req_rep_client.stop()
self._ctrl_client.stop()
self._watchdog_publisher = _NoConnectionClient()
self._telemetry_watcher = _NoConnectionClient()
self._req_rep_client = _NoConnectionClient()
self._ctrl_client = _NoConnectionClient()
self.connected = False
ping
def ping(
self,
timeout: 'float' = 1.0
)
Ping drone
Raises a ResponseTimeout exception if the drone does not respond within the timeout period.
View Source
def ping(self, timeout: float = 1.0):
"""Ping drone
Raises a ResponseTimeout exception if the drone does not respond within the timeout period.
"""
self._req_rep_client.ping(timeout)
take_control
def take_control(
self,
timeout=1
)
Take control of the drone, disconnecting other clients
Will disconnect other clients until the client is in control of the drone. Raises a RuntimeError if the client could not take control of the drone in the given time.
View Source
def take_control(self, timeout=1):
"""Take control of the drone, disconnecting other clients
Will disconnect other clients until the client is in control of the drone.
Raises a RuntimeError if the client could not take control of the drone in the given time.
"""
start_time = time.time()
client_in_control = self.client_in_control
while self.client_id != client_in_control:
if time.time() - start_time > timeout:
raise RuntimeError("Could not take control of the drone in the given time")
resp = self._req_rep_client.disconnect_client(client_in_control)
client_in_control = resp.client_id_in_control
self.in_control = True
Telemetry
class Telemetry(
parent_drone: "'Drone'"
)
View Source
class Telemetry:
def __init__(self, parent_drone: "Drone"):
self._parent_drone = parent_drone
def set_msg_publish_frequency(self, msg: proto.message.Message, frequency: float):
"""Set the publishing frequency of a specific telemetry message
Raises a RuntimeError if the drone fails to set the frequency. Possible causes could be a
frequency outside the valid range, or an incorrect message type.
*Arguments*:
* msg (proto.message.Message): The message to set the frequency of. Needs to be one of the
messages in blueye.protocol that end in Tel, eg.
blueye.protocol.DepthTel
* frequency (float): The frequency in Hz. Valid range is (0 .. 100).
"""
resp = self._parent_drone._req_rep_client.set_telemetry_msg_publish_frequency(
msg, frequency
)
if not resp.success:
raise RuntimeError("Could not set telemetry message frequency")
def add_msg_callback(
self,
msg_filter: List[proto.message.Message],
callback: Callable[[str, proto.message.Message], None],
raw: bool = False,
**kwargs,
) -> str:
"""Register a telemetry message callback
The callback is called each time a message of the type is received
*Arguments*:
* msg_filter: A list of message types to register the callback for.
Eg. `[blueye.protocol.DepthTel, blueye.protocol.Imu1Tel]`. If the list is
empty the callback will be registered for all message types
* callback: The callback function. It should be minimal and return as fast as possible to
not block the telemetry communication. It is called with two arguments, the
message type name and the message object
* raw: Pass the raw data instead of the deserialized message to the callback function
* kwargs: Additional keyword arguments to pass to the callback function
*Returns*:
* uuid: Callback id. Can be used to remove callback in the future
"""
uuid_hex = self._parent_drone._telemetry_watcher.add_callback(
msg_filter, callback, raw, **kwargs
)
return uuid_hex
def remove_msg_callback(self, callback_id: str) -> Optional[str]:
"""Remove a telemetry message callback
*Arguments*:
* callback_id: The callback id
"""
self._parent_drone._telemetry_watcher.remove_callback(callback_id)
def get(
self, msg_type: proto.message.Message, deserialize=True
) -> Optional[proto.message.Message | bytes]:
"""Get the latest telemetry message of the specified type
*Arguments*:
* msg_type: The message type to get. Eg. blueye.protocol.DepthTel
* deserialize: If True, the message will be deserialized before being returned. If False,
the raw bytes will be returned.
*Returns*:
* The latest message of the specified type, or None if no message has been received yet
"""
try:
msg = self._parent_drone._telemetry_watcher.get(msg_type)
except KeyError:
if version.parse(self._parent_drone.software_version_short) >= version.parse("3.3"):
msg = self._parent_drone._req_rep_client.get_telemetry_msg(msg_type).payload.value
if msg == b"":
return None
else:
return None
if deserialize:
return msg_type.deserialize(msg)
else:
return msg
Methods
add_msg_callback
def add_msg_callback(
self,
msg_filter: 'List[proto.message.Message]',
callback: 'Callable[[str, proto.message.Message], None]',
raw: 'bool' = False,
**kwargs
) -> 'str'
Register a telemetry message callback
The callback is called each time a message of the type is received
Arguments:
- msg_filter: A list of message types to register the callback for.
Eg.
[blueye.protocol.DepthTel, blueye.protocol.Imu1Tel]
. If the list is empty the callback will be registered for all message types - callback: The callback function. It should be minimal and return as fast as possible to not block the telemetry communication. It is called with two arguments, the message type name and the message object
- raw: Pass the raw data instead of the deserialized message to the callback function
- kwargs: Additional keyword arguments to pass to the callback function
Returns:
- uuid: Callback id. Can be used to remove callback in the future
View Source
def add_msg_callback(
self,
msg_filter: List[proto.message.Message],
callback: Callable[[str, proto.message.Message], None],
raw: bool = False,
**kwargs,
) -> str:
"""Register a telemetry message callback
The callback is called each time a message of the type is received
*Arguments*:
* msg_filter: A list of message types to register the callback for.
Eg. `[blueye.protocol.DepthTel, blueye.protocol.Imu1Tel]`. If the list is
empty the callback will be registered for all message types
* callback: The callback function. It should be minimal and return as fast as possible to
not block the telemetry communication. It is called with two arguments, the
message type name and the message object
* raw: Pass the raw data instead of the deserialized message to the callback function
* kwargs: Additional keyword arguments to pass to the callback function
*Returns*:
* uuid: Callback id. Can be used to remove callback in the future
"""
uuid_hex = self._parent_drone._telemetry_watcher.add_callback(
msg_filter, callback, raw, **kwargs
)
return uuid_hex
get
def get(
self,
msg_type: 'proto.message.Message',
deserialize=True
) -> 'Optional[proto.message.Message | bytes]'
Get the latest telemetry message of the specified type
Arguments:
- msg_type: The message type to get. Eg. blueye.protocol.DepthTel
- deserialize: If True, the message will be deserialized before being returned. If False, the raw bytes will be returned.
Returns:
- The latest message of the specified type, or None if no message has been received yet
View Source
def get(
self, msg_type: proto.message.Message, deserialize=True
) -> Optional[proto.message.Message | bytes]:
"""Get the latest telemetry message of the specified type
*Arguments*:
* msg_type: The message type to get. Eg. blueye.protocol.DepthTel
* deserialize: If True, the message will be deserialized before being returned. If False,
the raw bytes will be returned.
*Returns*:
* The latest message of the specified type, or None if no message has been received yet
"""
try:
msg = self._parent_drone._telemetry_watcher.get(msg_type)
except KeyError:
if version.parse(self._parent_drone.software_version_short) >= version.parse("3.3"):
msg = self._parent_drone._req_rep_client.get_telemetry_msg(msg_type).payload.value
if msg == b"":
return None
else:
return None
if deserialize:
return msg_type.deserialize(msg)
else:
return msg
remove_msg_callback
def remove_msg_callback(
self,
callback_id: 'str'
) -> 'Optional[str]'
Remove a telemetry message callback
Arguments:
- callback_id: The callback id
View Source
def remove_msg_callback(self, callback_id: str) -> Optional[str]:
"""Remove a telemetry message callback
*Arguments*:
* callback_id: The callback id
"""
self._parent_drone._telemetry_watcher.remove_callback(callback_id)
set_msg_publish_frequency
def set_msg_publish_frequency(
self,
msg: 'proto.message.Message',
frequency: 'float'
)
Set the publishing frequency of a specific telemetry message
Raises a RuntimeError if the drone fails to set the frequency. Possible causes could be a frequency outside the valid range, or an incorrect message type.
Arguments:
- msg (proto.message.Message): The message to set the frequency of. Needs to be one of the messages in blueye.protocol that end in Tel, eg. blueye.protocol.DepthTel
- frequency (float): The frequency in Hz. Valid range is (0 .. 100).
View Source
def set_msg_publish_frequency(self, msg: proto.message.Message, frequency: float):
"""Set the publishing frequency of a specific telemetry message
Raises a RuntimeError if the drone fails to set the frequency. Possible causes could be a
frequency outside the valid range, or an incorrect message type.
*Arguments*:
* msg (proto.message.Message): The message to set the frequency of. Needs to be one of the
messages in blueye.protocol that end in Tel, eg.
blueye.protocol.DepthTel
* frequency (float): The frequency in Hz. Valid range is (0 .. 100).
"""
resp = self._parent_drone._req_rep_client.set_telemetry_msg_publish_frequency(
msg, frequency
)
if not resp.success:
raise RuntimeError("Could not set telemetry message frequency")