Skip to content

blueye.sdk.drone

drone

Classes:

  • Config

    Handles the configuration settings for the Blueye drone.

  • Drone

    A class providing an interface to a Blueye drone's functions.

  • Telemetry

    Handles the telemetry messages for the Blueye drone.

Config

Config(parent_drone: 'Drone')

Handles the configuration settings for the Blueye drone.

Parameters:

  • parent_drone (Drone) –

    The parent drone instance.

Methods:

Attributes:

  • water_density

    Get or set the current water density for increased pressure sensor accuracy.

Source code in blueye/sdk/drone.py
36
37
38
39
40
41
42
43
def __init__(self, parent_drone: "Drone"):
    """Initialize the Config class.

    Args:
        parent_drone (Drone): The parent drone instance.
    """
    self._parent_drone = parent_drone
    self._water_density = WaterDensities.salty

water_density property writable

water_density

Get or set the current water density for increased pressure sensor accuracy.

Older software versions will assume a water density of 1025 grams per liter.

The WaterDensities class contains typical densities for salty-, brackish-, and fresh water (these are the same values that the Blueye app uses).

Parameters:

  • density (float) –

    The water density to set.

set_drone_time

set_drone_time(time: int)

Set the system time for the drone.

This method is used to set the system time for the drone. The argument time is expected to be a Unix timestamp (i.e., the number of seconds since the epoch).

Parameters:

  • time (int) –

    The Unix timestamp to set.

Source code in blueye/sdk/drone.py
64
65
66
67
68
69
70
71
72
73
def set_drone_time(self, time: int):
    """Set the system time for the drone.

    This method is used to set the system time for the drone. The argument `time` is expected to
    be a Unix timestamp (i.e., the number of seconds since the epoch).

    Args:
        time (int): The Unix timestamp to set.
    """
    self._parent_drone._req_rep_client.sync_time(time)

Drone

Drone(
    ip="192.168.1.101",
    auto_connect=True,
    timeout=10,
    disconnect_other_clients=False,
    connect_as_observer=False,
)

A class providing an interface to a Blueye drone's functions.

Automatically connects to the drone using the default IP when instantiated. This behavior can be disabled by setting auto_connect=False.

Parameters:

  • ip (str, default: '192.168.1.101' ) –

    The IP address of the drone.

  • auto_connect (bool, default: True ) –

    Whether to automatically connect to the drone.

  • timeout (int, default: 10 ) –

    The timeout for the connection attempt.

  • disconnect_other_clients (bool, default: False ) –

    Whether to disconnect other clients.

  • connect_as_observer (bool, default: False ) –

    Whether to connect as an observer.

Methods:

  • connect

    Establish a connection to the drone.

  • disconnect

    Disconnects the connection, allowing another client to take control of the drone.

  • ping

    Ping the drone.

  • take_control

    Take control of the drone, disconnecting other clients.

Attributes:

  • active_video_streams (Dict[str, int]) –

    Get the number of currently active connections to the video stream.

  • altitude (Optional[float]) –

    Get the current altitude in meters.

  • client_in_control (Optional[int]) –

    Get the client ID of the client in control of the drone.

  • connected_clients (Optional[List[ConnectedClient]]) –

    Get a list of connected clients.

  • depth (Optional[float]) –

    Get the current depth in meters.

  • dive_time (Optional[int]) –

    Get the amount of time the drone has been submerged.

  • error_flags (Dict[str, bool]) –

    Get the error flags.

  • lights (Optional[float]) –

    Get or set the intensity of the drone lights.

  • peripherals (Optional[List[Peripheral]]) –

    This list holds the peripherals connected to the drone. If it is None, then no

  • pose (Optional[Dict[str, float]]) –

    Get the current orientation of the drone.

  • water_temperature (Optional[float]) –

    Get the water temperature in degrees Celsius.

Source code in blueye/sdk/drone.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def __init__(
    self,
    ip="192.168.1.101",
    auto_connect=True,
    timeout=10,
    disconnect_other_clients=False,
    connect_as_observer=False,
):
    """Initialize the Drone class.

    Args:
        ip (str, optional):
            The IP address of the drone.
        auto_connect (bool, optional):
            Whether to automatically connect to the drone.
        timeout (int, optional):
            The timeout for the connection attempt.
        disconnect_other_clients (bool, optional):
            Whether to disconnect other clients.
        connect_as_observer (bool, optional):
            Whether to connect as an observer.
    """
    self._ip = ip
    self.camera = Camera(self, is_guestport_camera=False)
    self.motion = Motion(self)
    self.logs = Logs(self)
    self.legacy_logs = LegacyLogs(self)
    self.config = Config(self)
    self.battery = Battery(self)
    self.telemetry = Telemetry(self)
    self.connected = False
    self.client_id: int = None
    self.in_control: bool = False
    self._watchdog_publisher = _NoConnectionClient()
    self._telemetry_watcher = _NoConnectionClient()
    self._req_rep_client = _NoConnectionClient()
    self._ctrl_client = _NoConnectionClient()

    self.peripherals: Optional[List[Peripheral]] = None
    """This list holds the peripherals connected to the drone. If it is `None`, then no
    Guestport telemetry message has been received yet."""

    if auto_connect is True:
        self.connect(
            timeout=timeout,
            disconnect_other_clients=disconnect_other_clients,
            connect_as_observer=connect_as_observer,
        )

active_video_streams property

active_video_streams: Dict[str, int]

Get the number of currently active connections to the video stream.

Every client connected to the RTSP stream (does not matter if it's directly from GStreamer, or from the Blueye app) counts as one connection.

Returns:

  • Dict[str, int]

    A dictionary with the number of active connections to the main and guestport video streams.

altitude property

altitude: Optional[float]

Get the current altitude in meters.

If the drone has a DVL or a positioning system with a valid reading, this property will return the current altitude.

Returns:

  • Optional[float]

    The altitude in meters above the bottom.

client_in_control property

client_in_control: Optional[int]

Get the client ID of the client in control of the drone.

Returns:

  • Optional[int]

    The client ID of the client in control, or None if no client is in control.

connected_clients property

connected_clients: Optional[List[ConnectedClient]]

Get a list of connected clients.

Returns:

  • Optional[List[ConnectedClient]]

    A list of connected clients, or None if no clients are connected.

depth property

depth: Optional[float]

Get the current depth in meters.

Returns:

  • Optional[float]

    The depth in meters of water column.

dive_time property

dive_time: Optional[int]

Get the amount of time the drone has been submerged.

The drone starts incrementing this value when the depth is above 250 mm.

Returns:

  • Optional[int]

    The time in seconds the drone has been submerged.

error_flags property

error_flags: Dict[str, bool]

Get the error flags.

Returns:

  • Dict[str, bool]

    The error flags as bools in a dictionary.

lights property writable

lights: Optional[float]

Get or set the intensity of the drone lights.

Parameters:

  • brightness (float) –

    Set the intensity of the drone light (0..1).

Returns:

  • Optional[float]

    The intensity of the drone light (0..1). None if no telemetry message has been received.

Raises:

  • ValueError

    If the brightness is not in the range

peripherals instance-attribute

peripherals: Optional[List[Peripheral]] = None

This list holds the peripherals connected to the drone. If it is None, then no Guestport telemetry message has been received yet.

pose property

pose: Optional[Dict[str, float]]

Get the current orientation of the drone.

Returns:

  • Optional[Dict[str, float]]

    A dictionary with roll, pitch, and yaw in degrees, from 0 to 359.

water_temperature property

water_temperature: Optional[float]

Get the water temperature in degrees Celsius.

Returns:

  • Optional[float]

    The water temperature in degrees Celsius.

connect

connect(
    client_info: ClientInfo = None,
    timeout: float = 4,
    disconnect_other_clients: bool = False,
    connect_as_observer: bool = False,
)

Establish a connection to the drone.

Spawns several threads for receiving telemetry, sending control messages, and publishing watchdog messages.

When a watchdog message is received by the drone, the thrusters are armed. To stop the drone from moving unexpectedly when connecting, all thruster set points are set to zero when connecting.

Parameters:

  • client_info (ClientInfo, default: None ) –

    Information about the client connecting. If None, the SDK will attempt to read it from the environment.

  • timeout (float, default: 4 ) –

    Seconds to wait for connection.

  • disconnect_other_clients (bool, default: False ) –

    If True, disconnect clients until the drone reports that we are in control.

  • connect_as_observer (bool, default: False ) –

    If True, the client will not be promoted to in control of the drone.

Raises:

  • ConnectionError

    If the connection attempt fails.

  • RuntimeError

    If the Blunux version of the connected drone is too old.

Source code in blueye/sdk/drone.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def connect(
    self,
    client_info: blueye.protocol.ClientInfo = None,
    timeout: float = 4,
    disconnect_other_clients: bool = False,
    connect_as_observer: bool = False,
):
    """Establish a connection to the drone.

    Spawns several threads for receiving telemetry, sending control messages, and publishing
    watchdog messages.

    When a watchdog message is received by the drone, the thrusters are armed. To stop the drone
    from moving unexpectedly when connecting, all thruster set points are set to zero when
    connecting.

    Args:
        client_info (blueye.protocol.ClientInfo, optional):
            Information about the client connecting. If None, the SDK will attempt to read it
            from the environment.
        timeout (float, optional):
            Seconds to wait for connection.
        disconnect_other_clients (bool, optional):
            If True, disconnect clients until the drone reports that we are in control.
        connect_as_observer (bool, optional):
            If True, the client will not be promoted to in control of the drone.

    Raises:
        ConnectionError: If the connection attempt fails.
        RuntimeError: If the Blunux version of the connected drone is too old.
    """
    logger.info(f"Attempting to connect to drone at {self._ip}")
    self._update_drone_info(timeout=timeout)
    self._verify_required_blunux_version("3.2")

    self._telemetry_watcher = TelemetryClient(self)
    self._ctrl_client = CtrlClient(self)
    self._watchdog_publisher = WatchdogPublisher(self)
    self._req_rep_client = ReqRepClient(self)

    self._telemetry_watcher.start()
    self._req_rep_client.start()
    self._ctrl_client.start()
    self._watchdog_publisher.start()

    try:
        self.ping()
        connect_resp = self._req_rep_client.connect_client(
            client_info=client_info, is_observer=connect_as_observer
        )
    except blueye.protocol.exceptions.ResponseTimeout as e:
        raise ConnectionError("Could not establish connection with drone") from e
    logger.info(f"Connection successful, client id: {connect_resp.client_id}")
    logger.info(f"Client id in control: {connect_resp.client_id_in_control}")
    logger.info(f"There are {len(connect_resp.connected_clients)-1} other clients connected")
    self.client_id = connect_resp.client_id
    self.in_control = connect_resp.client_id == connect_resp.client_id_in_control
    self.connected = True
    if disconnect_other_clients and not self.in_control and not connect_as_observer:
        self.take_control()
    self._drone_info_cb_id = self.telemetry.add_msg_callback(
        [blueye.protocol.DroneInfoTel],
        Drone._drone_info_callback,
        False,
        drone=self,
    )

    if self.in_control:
        # The drone runs from a read-only filesystem, and as such does not keep any state,
        # therefore when we connect to it we should send the current time
        current_time = int(time.time())
        time_formatted = datetime.fromtimestamp(current_time).strftime("%d. %b %Y %H:%M")
        logger.debug(f"Setting current time to {current_time} ({time_formatted})")
        self.config.set_drone_time(current_time)
        logger.debug(f"Disabling thrusters")
        self.motion.send_thruster_setpoint(0, 0, 0, 0)

disconnect

disconnect()

Disconnects the connection, allowing another client to take control of the drone.

Source code in blueye/sdk/drone.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def disconnect(self):
    """Disconnects the connection, allowing another client to take control of the drone."""
    try:
        self._req_rep_client.disconnect_client(self.client_id)
    except blueye.protocol.exceptions.ResponseTimeout:
        # If there's no response the connection is likely already closed, so we can just
        # continue to stop threads and disconnect
        pass
    self._watchdog_publisher.stop()
    self._telemetry_watcher.stop()
    self._req_rep_client.stop()
    self._ctrl_client.stop()

    self._watchdog_publisher = _NoConnectionClient()
    self._telemetry_watcher = _NoConnectionClient()
    self._req_rep_client = _NoConnectionClient()
    self._ctrl_client = _NoConnectionClient()

    self.connected = False

ping

ping(timeout: float = 1.0)

Ping the drone.

Parameters:

  • timeout (float, default: 1.0 ) –

    The timeout for the ping request.

Raises:

  • ResponseTimeout

    If the drone does not respond within the timeout period.

Source code in blueye/sdk/drone.py
563
564
565
566
567
568
569
570
571
572
573
def ping(self, timeout: float = 1.0):
    """Ping the drone.

    Args:
        timeout (float, optional): The timeout for the ping request.

    Raises:
        blueye.protocol.exceptions.ResponseTimeout:
            If the drone does not respond within the timeout period.
    """
    self._req_rep_client.ping(timeout)

take_control

take_control(timeout=1)

Take control of the drone, disconnecting other clients.

Will disconnect other clients until the client is in control of the drone.

Parameters:

  • timeout (int, default: 1 ) –

    The timeout for taking control.

Raises:

  • RuntimeError

    If the client could not take control of the drone in the given time.

Source code in blueye/sdk/drone.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def take_control(self, timeout=1):
    """Take control of the drone, disconnecting other clients.

    Will disconnect other clients until the client is in control of the drone.

    Args:
        timeout (int, optional): The timeout for taking control.

    Raises:
        RuntimeError: If the client could not take control of the drone in the given time.
    """
    start_time = time.time()
    client_in_control = self.client_in_control
    while self.client_id != client_in_control:
        if time.time() - start_time > timeout:
            raise RuntimeError("Could not take control of the drone in the given time")
        resp = self._req_rep_client.disconnect_client(client_in_control)
        client_in_control = resp.client_id_in_control
    self.in_control = True

Telemetry

Telemetry(parent_drone: 'Drone')

Handles the telemetry messages for the Blueye drone.

Parameters:

  • parent_drone (Drone) –

    The parent drone instance.

Methods:

Source code in blueye/sdk/drone.py
92
93
94
95
96
97
98
def __init__(self, parent_drone: "Drone"):
    """Initialize the Telemetry class.

    Args:
        parent_drone (Drone): The parent drone instance.
    """
    self._parent_drone = parent_drone

add_msg_callback

add_msg_callback(
    msg_filter: List[Message],
    callback: Callable[[str, Message], None],
    raw: bool = False,
    **kwargs: Dict[str, Any]
) -> str

Register a telemetry message callback.

The callback is called each time a message of the type is received.

Parameters:

  • msg_filter (List[Message]) –

    A list of message types to register the callback for. E.g., [blueye.protocol.DepthTel, blueye.protocol.Imu1Tel]. If the list is empty, the callback will be registered for all message types.

  • callback (Callable[[str, Message], None]) –

    The callback function. It should be minimal and return as fast as possible to not block the telemetry communication. It is called with two arguments, the message type name and the message object.

  • raw (bool, default: False ) –

    Pass the raw data instead of the deserialized message to the callback function.

  • **kwargs (Dict[str, Any], default: {} ) –

    Additional keyword arguments to pass to the callback function.

Returns:

  • str

    The UUID of the callback.

Source code in blueye/sdk/drone.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def add_msg_callback(
    self,
    msg_filter: List[proto.message.Message],
    callback: Callable[[str, proto.message.Message], None],
    raw: bool = False,
    **kwargs: Dict[str, Any],
) -> str:
    """Register a telemetry message callback.

    The callback is called each time a message of the type is received.

    Args:
        msg_filter (List[proto.message.Message]):
            A list of message types to register the callback for. E.g.,
            `[blueye.protocol.DepthTel, blueye.protocol.Imu1Tel]`. If the list is empty, the
            callback will be registered for all message types.
        callback (Callable[[str, proto.message.Message], None]):
            The callback function. It should be minimal and return as fast as possible to not
            block the telemetry communication. It is called with two arguments, the message
            type name and the message object.
        raw (bool, optional):
            Pass the raw data instead of the deserialized message to the callback function.
        **kwargs:
            Additional keyword arguments to pass to the callback function.

    Returns:
        The UUID of the callback.
    """
    uuid_hex = self._parent_drone._telemetry_watcher.add_callback(
        msg_filter, callback, raw, **kwargs
    )
    return uuid_hex

get

get(
    msg_type: Message, deserialize=True
) -> Optional[Message | bytes]

Get the latest telemetry message of the specified type.

Parameters:

  • msg_type (Message) –

    The message type to get. E.g., blueye.protocol.DepthTel.

  • deserialize (bool, default: True ) –

    If True, the message will be deserialized before being returned. If False, the raw bytes will be returned.

Returns:

  • Optional[Message | bytes]

    The latest message of the specified type, or None if no message has been received yet.

Source code in blueye/sdk/drone.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def get(
    self, msg_type: proto.message.Message, deserialize=True
) -> Optional[proto.message.Message | bytes]:
    """Get the latest telemetry message of the specified type.

    Args:
        msg_type (proto.message.Message):
            The message type to get. E.g., blueye.protocol.DepthTel.
        deserialize (bool, optional):
            If True, the message will be deserialized before being returned. If False, the raw
            bytes will be returned.

    Returns:
        The latest message of the specified type, or None if no message has been received yet.
    """
    try:
        msg = self._parent_drone._telemetry_watcher.get(msg_type)
    except KeyError:
        if version.parse(self._parent_drone.software_version_short) >= version.parse("3.3"):
            msg = self._parent_drone._req_rep_client.get_telemetry_msg(msg_type).payload.value
            if msg == b"":
                return None
        else:
            return None
    if deserialize:
        return msg_type.deserialize(msg)
    else:
        return msg

remove_msg_callback

remove_msg_callback(callback_id: str) -> Optional[str]

Remove a telemetry message callback.

Parameters:

  • callback_id (str) –

    The callback ID from when the callback was registered.

Source code in blueye/sdk/drone.py
153
154
155
156
157
158
159
def remove_msg_callback(self, callback_id: str) -> Optional[str]:
    """Remove a telemetry message callback.

    Args:
        callback_id (str): The callback ID from when the callback was registered.
    """
    self._parent_drone._telemetry_watcher.remove_callback(callback_id)

set_msg_publish_frequency

set_msg_publish_frequency(msg: Message, frequency: float)

Set the publishing frequency of a specific telemetry message.

Parameters:

  • msg (Message) –

    The message to set the frequency of. Needs to be one of the messages in blueye.protocol that end in Tel, e.g., blueye.protocol.DepthTel.

  • frequency (float) –

    The frequency in Hz. Valid range is (0 .. 100).

Raises:

  • RuntimeError

    If the drone fails to set the frequency. Possible causes could be a frequency outside the valid range, or an incorrect message type.

Source code in blueye/sdk/drone.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def set_msg_publish_frequency(self, msg: proto.message.Message, frequency: float):
    """Set the publishing frequency of a specific telemetry message.

    Args:
        msg (proto.message.Message): The message to set the frequency of. Needs to be one of the
                                     messages in blueye.protocol that end in Tel, e.g.,
                                     blueye.protocol.DepthTel.
        frequency (float): The frequency in Hz. Valid range is (0 .. 100).

    Raises:
        RuntimeError:
            If the drone fails to set the frequency.  Possible causes could be a frequency
            outside the valid range, or an incorrect message type.
    """
    resp = self._parent_drone._req_rep_client.set_telemetry_msg_publish_frequency(
        msg, frequency
    )
    if not resp.success:
        raise RuntimeError("Could not set telemetry message frequency")