Skip to content

blueye.sdk.mission

mission

Classes:

  • Mission

    Class for handling mission planning with the drone

Functions:

Mission

Mission(parent_drone: Drone)

Class for handling mission planning with the drone

Example usage
  1. Create some instructions

    import blueye.protocol as bp
    go_to_seabed = bp.Instruction(go_to_seabed_command={"desired_speed": 0.3})
    take_picture = bp.Instruction(camera_command={
        "camera_action": bp.CameraAction.CAMERA_ACTION_TAKE_PHOTO
    })
    go_to_surface = bp.Instruction(go_to_surface_command={"desired_speed": 0.3})
    

  2. Create a mission with the instructions

    from blueye.sdk.mission import prepare_new_mission
    mission = prepare_new_mission(
        instruction_list = [go_to_seabed, take_picture, go_to_surface],
        mission_id = 0,
        mission_name = "Go to seabed and take a picture",)
    

  3. Load and run the new mission

    drone.mission.load_and_run(mission)
    

Parameters:

  • parent_drone (Drone) –

    The parent drone instance.

Methods:

  • clear

    Clear the currently loaded mission.

  • get_active

    Get the current active mission.

  • get_status

    Get the current mission status.

  • load_and_run

    Clears any previous mission, loads the given mission, and runs it.

  • pause

    Pause the currently running mission.

  • run

    Run the currently loaded mission.

  • send_new

    Send a new mission to the drone.

Source code in blueye/sdk/mission.py
190
191
192
193
194
195
196
def __init__(self, parent_drone: "Drone"):
    """Initialize the Mission class.

    Args:
        parent_drone: The parent drone instance.
    """
    self._parent_drone = parent_drone

clear

clear()

Clear the currently loaded mission.

Raises:

  • RuntimeError

    If the connected drone does not meet the required Blunux version.

Source code in blueye/sdk/mission.py
258
259
260
261
262
263
264
265
def clear(self):
    """Clear the currently loaded mission.

    Raises:
        RuntimeError: If the connected drone does not meet the required Blunux version.
    """
    self._parent_drone._verify_required_blunux_version("4.0.5")
    self._parent_drone._ctrl_client.clear_mission()

get_active

get_active() -> Mission

Get the current active mission.

Returns:

  • Mission

    The current active mission. The mission will be empty if no mission is running.

Raises:

  • RuntimeError

    If the connected drone does not meet the required Blunux version.

Source code in blueye/sdk/mission.py
214
215
216
217
218
219
220
221
222
223
224
def get_active(self) -> bp.Mission:
    """Get the current active mission.

    Returns:
        The current active mission. The mission will be empty if no mission is running.

    Raises:
        RuntimeError: If the connected drone does not meet the required Blunux version.
    """
    self._parent_drone._verify_required_blunux_version("4.0.5")
    return self._parent_drone._req_rep_client.get_active_mission().mission

get_status

get_status() -> Optional[MissionStatus]

Get the current mission status.

Returns:

  • Optional[MissionStatus]

    The current mission status, or None if no telemetry data has been received.

Raises:

  • RuntimeError

    If the connected drone does not meet the required Blunux version.

Source code in blueye/sdk/mission.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def get_status(self) -> Optional[bp.MissionStatus]:
    """Get the current mission status.

    Returns:
        The current mission status, or None if no telemetry data has been received.

    Raises:
        RuntimeError: If the connected drone does not meet the required Blunux version.
    """
    self._parent_drone._verify_required_blunux_version("4.0.5")
    msg = self._parent_drone.telemetry.get(bp.MissionStatusTel)
    if msg is None:
        return None
    else:
        return msg.mission_status

load_and_run

load_and_run(mission: Mission, timeout: float = 2.0)

Clears any previous mission, loads the given mission, and runs it.

Parameters:

  • mission (Mission) –

    The mission to load and run.

  • timeout (float, default: 2.0 ) –

    The maximum time to wait for the mission to be ready before raising an exception.

Raises:

  • RuntimeError

    If the connected drone does not meet the required Blunux version.

  • TimeoutError

    If the mission does not become ready within the specified timeout.

Source code in blueye/sdk/mission.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def load_and_run(self, mission: bp.Mission, timeout: float = 2.0):
    """Clears any previous mission, loads the given mission, and runs it.

    Args:
        mission: The mission to load and run.
        timeout: The maximum time to wait for the mission to be ready before raising an exception.

    Raises:
        RuntimeError: If the connected drone does not meet the required Blunux version.
        TimeoutError: If the mission does not become ready within the specified timeout.
    """
    self._parent_drone._verify_required_blunux_version("4.0.5")
    if self.get_status().state not in (
        bp.MissionState.MISSION_STATE_INACTIVE,
        bp.MissionState.MISSION_STATE_COMPLETED,
    ):
        # Clear the previous mission if it exists
        self.clear()

        # Wait until the mission state becomes MISSION_STATE_INACTIVE
        start_time = time.time()
        while self.get_status().state != bp.MissionState.MISSION_STATE_INACTIVE:
            if time.time() - start_time > timeout:
                raise TimeoutError(
                    "Mission status did not become inactive within the timeout period."
                )
            time.sleep(0.1)

    # Send the new mission
    self.send_new(mission)

    # Wait until the mission state becomes MISSION_STATE_READY
    start_time = time.time()
    while self.get_status().state != bp.MissionState.MISSION_STATE_READY:
        if time.time() - start_time > timeout:
            raise TimeoutError("Mission status did not become ready within the timeout period.")
        if self.get_status().state == bp.MissionState.MISSION_STATE_FAILED_TO_LOAD_MISSION:
            raise RuntimeError("Failed to load the mission. Check the mission instructions.")
        time.sleep(0.1)

    # Run the mission
    self.run()

pause

pause()

Pause the currently running mission.

Raises:

  • RuntimeError

    If the connected drone does not meet the required Blunux version.

Source code in blueye/sdk/mission.py
249
250
251
252
253
254
255
256
def pause(self):
    """Pause the currently running mission.

    Raises:
        RuntimeError: If the connected drone does not meet the required Blunux version.
    """
    self._parent_drone._verify_required_blunux_version("4.0.5")
    self._parent_drone._ctrl_client.pause_mission()

run

run()

Run the currently loaded mission.

The mission state must be MISSION_STATE_READY before calling this method.

Raises:

  • RuntimeError

    If the connected drone does not meet the required Blunux version.

Source code in blueye/sdk/mission.py
238
239
240
241
242
243
244
245
246
247
def run(self):
    """Run the currently loaded mission.

    The mission state must be MISSION_STATE_READY before calling this method.

    Raises:
        RuntimeError: If the connected drone does not meet the required Blunux version.
    """
    self._parent_drone._verify_required_blunux_version("4.0.5")
    self._parent_drone._ctrl_client.run_mission()

send_new

send_new(mission: Mission)

Send a new mission to the drone.

Parameters:

  • mission (Mission) –

    The mission to send to the drone.

Raises:

  • RuntimeError

    If the connected drone does not meet the required Blunux version.

Source code in blueye/sdk/mission.py
226
227
228
229
230
231
232
233
234
235
236
def send_new(self, mission: bp.Mission):
    """Send a new mission to the drone.

    Args:
        mission: The mission to send to the drone.

    Raises:
        RuntimeError: If the connected drone does not meet the required Blunux version.
    """
    self._parent_drone._verify_required_blunux_version("4.0.5")
    self._parent_drone._req_rep_client.set_mission(mission)

create_waypoint_instruction

create_waypoint_instruction(
    waypoint_name: str,
    latitude: float,
    longitude: float,
    depth: float,
    speed_to_target: float = 0.6,
    speed_to_depth: float = 0.3,
    circle_of_acceptance: float = 1,
    waypoint_id: int = 0,
) -> Instruction

Helper function to create waypoint instructions.

Parameters:

  • waypoint_name (str) –

    The name of the waypoint.

  • latitude (float) –

    The latitude of the waypoint (WGS 84 decimal format). Needs to be in the range [-90, 90].

  • longitude (float) –

    The longitude of the waypoint (WGS 84 decimal format). Needs to be in the range [-180, 180].

  • depth (float) –

    The depth of the waypoint (meters below surface).

  • circle_of_acceptance (float, default: 1 ) –

    The radius of the circle of acceptance (meters).

  • speed_to_target (float, default: 0.6 ) –

    The speed to the waypoint (m/s).

  • waypoint_id (int, default: 0 ) –

    The ID of the waypoint.

Raises:

  • ValueError

    If latitude or longitude are out of bounds.

Returns:

  • Instruction ( Instruction ) –

    An Instruction object with the specified waypoint details.

Source code in blueye/sdk/mission.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def create_waypoint_instruction(
    waypoint_name: str,
    latitude: float,
    longitude: float,
    depth: float,
    speed_to_target: float = 0.6,
    speed_to_depth: float = 0.3,
    circle_of_acceptance: float = 1,
    waypoint_id: int = 0,
) -> bp.Instruction:
    """
    Helper function to create waypoint instructions.

    Args:
        waypoint_name (str): The name of the waypoint.
        latitude (float): The latitude of the waypoint (WGS 84 decimal format). Needs to be in the
                          range [-90, 90].
        longitude (float): The longitude of the waypoint (WGS 84 decimal format). Needs to be in the
                           range [-180, 180].
        depth (float): The depth of the waypoint (meters below surface).
        circle_of_acceptance: The radius of the circle of acceptance (meters).
        speed_to_target: The speed to the waypoint (m/s).
        waypoint_id: The ID of the waypoint.

    Raises:
        ValueError: If latitude or longitude are out of bounds.

    Returns:
        Instruction: An Instruction object with the specified waypoint details.
    """
    if not (-90 <= latitude <= 90):
        raise ValueError(f"Latitude must be between -90 and 90 degrees, got {latitude}")
    if not (-180 <= longitude <= 180):
        raise ValueError(f"Longitude must be between -180 and 180 degrees, got {longitude}")
    global_position = bp.LatLongPosition()
    global_position.latitude = latitude
    global_position.longitude = longitude

    depth_set_point = bp.DepthSetPoint()
    depth_set_point.depth = depth
    depth_set_point.depth_zero_reference = bp.DepthZeroReference.DEPTH_ZERO_REFERENCE_SURFACE
    depth_set_point.speed_to_depth = speed_to_depth

    waypoint = bp.Waypoint()
    waypoint.id = waypoint_id
    waypoint.name = waypoint_name
    waypoint.global_position = global_position
    waypoint.circle_of_acceptance = circle_of_acceptance
    waypoint.speed_to_target = speed_to_target
    waypoint.depth_set_point = depth_set_point

    waypoint_command = bp.WaypointCommand(waypoint=waypoint)

    instruction = bp.Instruction({"waypoint_command": waypoint_command})
    return instruction

export_to_json

export_to_json(
    mission: Mission,
    output_path: Optional[Path | str] = None,
)

Export the mission to a JSON file

This allows you to save the mission to a file for later use or to share it with others.

Parameters:

  • mission (Mission) –

    The mission to export

  • output_path (Optional[Path | str], default: None ) –

    The path to write the JSON file to. If None the mission will be written to the current directory with the name BlueyeMission.json. If the path is a directory, the mission will be written to that directory with the name BlueyeMission.json. Else the mission will be written to the specified file.

Source code in blueye/sdk/mission.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def export_to_json(mission: bp.Mission, output_path: Optional[Path | str] = None):
    """Export the mission to a JSON file

    This allows you to save the mission to a file for later use or to share it with others.

    Args:
        mission: The mission to export
        output_path: The path to write the JSON file to. If `None` the mission will be written to
                     the current directory with the name `BlueyeMission.json`. If the path is a
                     directory, the mission will be written to that directory with the name
                     `BlueyeMission.json`. Else the mission will be written to the specified file.
    """
    if output_path is None:
        output_path = Path("BlueyeMission.json")
    else:
        if type(output_path) == str:
            output_path = Path(output_path)
        if output_path.is_dir():
            output_path = output_path.joinpath("BlueyeMission.json")

    logger.debug(f'Exporting mission "{mission.name}" to {output_path}')
    with open(output_path, "w") as f:
        f.write(MessageToJson(mission._pb))

import_from_json

import_from_json(input_path: Path | str) -> Mission

Import a mission from a JSON file

This allows you to load a mission from a file that was previously exported.

Parameters:

  • input_path (Path | str) –

    The path to the JSON file to import

Returns:

Source code in blueye/sdk/mission.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def import_from_json(input_path: Path | str) -> bp.Mission:
    """Import a mission from a JSON file

    This allows you to load a mission from a file that was previously exported.

    Args:
        input_path: The path to the JSON file to import

    Returns:
        The imported mission
    """
    if type(input_path) == str:
        input_path = Path(input_path)
    logger.debug(f"Importing mission from {input_path}")

    with open(input_path, "r") as f:
        json_data = f.read()
        mission = bp.Mission()
        Parse(json_data, mission._pb)
        return mission

prepare_new_mission

prepare_new_mission(
    instruction_list: List[Instruction],
    mission_id: int = 0,
    mission_name: str = "",
) -> Mission

Creates a mission from a list of instructions

Automatically assigns an ID to each instruction based on the order they are in the list.

Parameters:

  • mission_id (int, default: 0 ) –

    ID of the mission

  • mission_name (str, default: '' ) –

    Name of the mission

  • instruction_list (List[Instruction]) –

    List of instructions to create the mission from

Raises:

  • ValueError

    If the number of instructions exceeds 50, which is the maximum allowed.

Returns:

  • Mission

    A mission object with the instructions and their respective IDs

Source code in blueye/sdk/mission.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def prepare_new_mission(
    instruction_list: List[bp.Instruction],
    mission_id: int = 0,
    mission_name: str = "",
) -> bp.Mission:
    """Creates a mission from a list of instructions

    Automatically assigns an ID to each instruction based on the order they are in the list.

    Args:
        mission_id: ID of the mission
        mission_name: Name of the mission
        instruction_list: List of instructions to create the mission from

    Raises:
        ValueError: If the number of instructions exceeds 50, which is the maximum allowed.

    Returns:
        A mission object with the instructions and their respective IDs
    """
    if len(instruction_list) > 50:
        raise ValueError(
            "A mission can only contain up to 50 instructions. "
            f"Received {len(instruction_list)} instructions."
        )
    logger.debug(
        f'Preparing the "{mission_name}" mission, with ID {mission_id} and '
        f"{len(instruction_list)} instructions"
    )
    instruction_id = 0

    # Deep copy the instructions to avoid modifying the original ones
    for instruction in instruction_list:
        instruction_copy = copy.deepcopy(instruction)
        instruction_copy.id = instruction_id
        instruction_id += 1
        instruction_list[instruction_list.index(instruction)] = instruction_copy
    return bp.Mission(id=mission_id, name=mission_name, instructions=instruction_list)