Module blueye.sdk.logs
View Source
from __future__ import annotations
import logging
import zlib
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Callable, Iterator, List, Optional, Tuple
import blueye.protocol as bp
import dateutil.parser
import proto
import requests
import tabulate
from google.protobuf.internal.decoder import _DecodeVarint as decodeVarint
from packaging import version
from .utils import deserialize_any_to_message
logger = logging.getLogger(__name__)
def human_readable_filesize(binsize: int) -> str:
"""Convert bytes to human readable string"""
suffix = "B"
num = binsize
for unit in ["", "Ki", "Mi"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Gi{suffix}"
def decompress_log(log: bytes) -> bytes:
"""Decompress a log file"""
return zlib.decompressobj(wbits=zlib.MAX_WBITS | 16).decompress(log)
class LogStream:
"""Class for streaming a log
Creates a stream from a downloaded log file. Iterate over the object to get the next log record.
"""
def __init__(
self, log: bytes, decompress: bool = True
) -> Iterator[
Tuple[
proto.datetime_helpers.DatetimeWithNanoseconds, # Real time clock
timedelta, # Time since first message
proto.message.MessageMeta, # Message type
proto.message.Message, # Message contents
]
]:
if decompress:
self.decompressed_log = decompress_log(log)
else:
self.decompressed_log = log
self.pos = 0
self.start_monotonic: proto.datetime_helpers.DatetimeWithNanoseconds = 0
def __iter__(self):
return self
def __next__(self):
if self.pos < len(self.decompressed_log):
msg_size, pos_msg_start = decodeVarint(self.decompressed_log, self.pos)
msg_data = self.decompressed_log[pos_msg_start : (pos_msg_start + msg_size)]
if len(msg_data) < msg_size:
raise EOFError("Not enough bytes to read message")
self.pos = pos_msg_start + msg_size
msg = bp.BinlogRecord.deserialize(msg_data)
payload_type, payload_msg_deserialized = deserialize_any_to_message(msg)
if self.start_monotonic == 0:
self.start_monotonic = msg.clock_monotonic
return (
msg.unix_timestamp,
msg.clock_monotonic - self.start_monotonic,
payload_type,
payload_msg_deserialized,
)
else:
raise StopIteration
class LogFile:
def __init__(
self,
name: str,
is_dive: bool,
filesize: int,
start_time: int,
max_depth_magnitude: int,
ip: str,
):
self.name = name
self.is_dive = is_dive
self.filesize = filesize
self.start_time: datetime = datetime.fromtimestamp(start_time, tz=timezone.utc)
self.max_depth_magnitude = max_depth_magnitude
self.download_url = f"http://{ip}/logs/{self.name}/binlog"
self.content = None
self._formatted_values = [
self.name,
self.start_time.strftime("%d. %b %Y %H:%M"),
f"{self.max_depth_magnitude} m",
human_readable_filesize(self.filesize),
]
def download(
self,
output_path: Optional[Path | str] = None,
write_to_file: bool = True,
timeout: float = 1,
overwrite_cache: bool = False,
) -> bytes:
"""Download a log file from the drone
*Arguments*:
* `output_path`:
Path to write the log file to. If `None`, the log will be written to the
current working directory. If the path is a directory, the log will be
downloaded to that directory with its original name. Else the log will be
downloaded to the specified path.
* `write_to_file`:
If True, the log will be written to the specified path. If False, the
log will only be returned as a bytes object.
* `timeout`:
Seconds to wait for response
* `overwrite_cache`:
If True, the log will be downloaded even if it is already been downloaded.
*Returns*:
The compressed log file as a bytes object.
"""
if self.content is None or overwrite_cache:
self.content = requests.get(self.download_url, timeout=timeout).content
if write_to_file:
if output_path is None:
output_path = Path(f"{self.name}.bez")
else:
if type(output_path) == str:
output_path = Path(output_path)
if output_path.is_dir():
output_path = output_path.joinpath(f"{self.name}.bez")
with open(output_path, "wb") as f:
f.write(self.content)
return self.content
def parse_to_stream(self) -> LogStream:
"""Parse the log file to a stream
Will download the log if it is not already downloaded.
*Returns*:
A `LogStream` object
"""
return LogStream(self.download(write_to_file=False))
def __format__(self, format_specifier):
if format_specifier == "with_header":
return tabulate.tabulate(
[self], headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
else:
return tabulate.tabulate([self], tablefmt="plain")
def __str__(self):
return f"{self}"
def __getitem__(self, item):
return self._formatted_values[item]
class Logs:
def __init__(self, parent_drone, auto_download_index=False):
self._parent_drone = parent_drone
self.auto_download_index = auto_download_index
self.index_downloaded = False
self._logs = {}
if auto_download_index:
self.refresh_log_index()
def refresh_log_index(self):
"""Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check
for new log files it can be called at any time.
"""
if not self._parent_drone.connected:
raise ConnectionError(
"The connection to the drone is not established, try calling the connect method "
"before retrying"
)
logger.debug("Refreshing log index")
logs_endpoint = f"http://{self._parent_drone._ip}/logs"
logs: List[dict] = requests.get(logs_endpoint).json()
if version.parse(self._parent_drone.software_version_short) < version.parse("3.3"):
# Extend index with dive info, sends a request for each log file so can be quite slow
# for drones with many logs. Not necessary for Blunux >= 3.3 as dive info is included in
# the index.
logger.debug(f"Getting dive info for {len(logs)} logs")
for index, log in enumerate(logs):
dive_info = requests.get(f"{logs_endpoint}/{log['name']}/dive_info").json()
logs[index].update(dive_info)
# Instantiate log objects for each log
logger.debug(f"Creating log objects for {len(logs)} logs")
for log in logs:
if log["has_binlog"]:
self._logs[log["name"]] = LogFile(
log["name"],
log["is_dive"],
log["binlog_size"],
log["start_time"],
log["max_depth_magnitude"],
self._parent_drone._ip,
)
else:
logger.info(f"Log {log['name']} does not have a binlog, ignoring")
self.index_downloaded = True
def __len__(self):
if not self.index_downloaded:
self.refresh_log_index()
return len(self._logs)
def __getitem__(self, item):
if not self.index_downloaded:
self.refresh_log_index()
if type(item) == str:
try:
return self._logs[item]
except KeyError:
raise KeyError(f"A log with the name '{item}' does not exist")
elif isinstance(item, slice):
logs_slice = Logs(self._parent_drone)
for log in list(self._logs.values())[item]:
logs_slice._logs[log.name] = log
logs_slice.index_downloaded = True
return logs_slice
else:
try:
return list(self._logs.values())[item]
except IndexError:
raise IndexError(
f"Tried to access log nr {item}, "
+ f"but there are only {len(self._logs.values())} logs available"
)
def __str__(self):
return tabulate.tabulate(
self, headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
def filter(self, filter_func: Callable[[LogFile], bool]) -> Logs:
"""Return a new Logs object with only those matching the filter
Eg. to get logs classified as a dive:
```
dive_logs = myDrone.logs.filter(lambda log: log.is_dive)
```
or to get all logs with a max depth greater than 10m:
```
deep_logs = myDrone.logs.filter(lambda log: log.max_depth_magnitude > 10)
```
"""
if not self.index_downloaded:
self.refresh_log_index()
filtered_logs = Logs(self._parent_drone)
filtered_logs.index_downloaded = True
for log in self:
if filter_func(log):
filtered_logs._logs[log.name] = log
return filtered_logs
class LegacyLogFile:
"""
This class is a container for a log file stored on the drone
The drone lists the file name, max depth, start time, and file size for each log,
and you can show this information by printing the log object, eg. on a Drone
object called `myDrone`:
```
print(myDrone.logs[0])
```
or, if you want to display the header you can format the object with `with_header`:
```
print(f"{myDrone.logs[0]:with_header}")
```
Calling the download() method on a log object will pull the CSV (Comma Separated
Value) file from the drone to your local filesystem.
"""
def __init__(self, maxdepth, name, timestamp, binsize, ip):
self.maxdepth = maxdepth
self.name = name
self.timestamp: datetime = dateutil.parser.isoparse(timestamp)
self.binsize = binsize
self.download_path = "http://" + ip + "/logcsv/" + name
self._formatted_values = [
self.name,
self.timestamp.strftime("%d. %b %Y %H:%M"),
f"{self.maxdepth/1000:.2f} m",
human_readable_filesize(self.binsize),
]
def download(self, output_path=None, output_name=None, downsample_divisor=10):
"""
Download the specified log to your local file system
If you specify an output_path the log file will be downloaded to that directory
instead of the current one.
Specifying output_name will overwrite the default file name with whatever you
have specified (be sure to include the .csv extension).
The drone samples the log content at 10 Hz, and by default this function downsamples this
rate to 1 Hz.
"""
log = requests.get(self.download_path, params={"divisor": downsample_divisor}).content
if output_path is None:
output_path = "./"
if output_name is None:
output_name = self.name
with open(f"{output_path}{output_name}", "wb") as f:
f.write(log)
def __format__(self, format_specifier):
if format_specifier == "with_header":
return tabulate.tabulate(
[self], headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
else:
return tabulate.tabulate([self], tablefmt="plain")
def __str__(self):
return f"{self}"
def __getitem__(self, item):
return self._formatted_values[item]
class LegacyLogs:
"""This class is an index of the legacy csv log files stored on the drone
To show the available logs you simply print this object, ie. if your Drone object
is called `myDrone`, you can do:
```
print(myDrone.legacy_logs)
```
This will print a list of all available logs, with some of their metadata, such as
name and maxdepth.
You can access logfile objects either by index or by name. Eg. if you want the first
logfile in the list you can do `myDrone.logs[0]`, or if you want some particular log you
can do `myDrone.logs["exampleName0001.csv"]`. You can even give it a slice, so if you want
the last 10 logs you can do `myDrone.logs[-10:]`.
"""
def __init__(self, parent_drone, auto_download_index=False):
self.ip = parent_drone._ip
self._parent_drone = parent_drone
self.index_downloaded = False
if auto_download_index:
self.refresh_log_index()
else:
self._logs = {}
def _get_list_of_logs_from_drone(self, get_all: bool):
list_of_dictionaries = requests.get(
"http://" + self.ip + "/logcsv", params={"all": True} if get_all else {}
).json()
return list_of_dictionaries
def _build_log_files_from_dictionary(self, list_of_logs_in_dictionaries):
loglist = {}
for log in list_of_logs_in_dictionaries:
try:
loglist[log["name"]] = LegacyLogFile(
log["maxdepth"], log["name"], log["timestamp"], log["binsize"], self.ip
)
except dateutil.parser.ParserError:
logger.warning(
f"Could not parse timestamp for log {log['name']}, skipping this log file"
)
return loglist
def refresh_log_index(self, get_all_logs=False):
"""Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check
for new log files it can be called at any time.
Pass with `get_all_logs=True` to include logs that are not classified as dives.
"""
if not self._parent_drone.connected:
raise ConnectionError(
"The connection to the drone is not established, try calling the connect method "
"before retrying"
)
list_of_logs_in_dictionaries = self._get_list_of_logs_from_drone(get_all_logs)
self._logs = self._build_log_files_from_dictionary(list_of_logs_in_dictionaries)
self.index_downloaded = True
def __len__(self):
if not self.index_downloaded:
self.refresh_log_index()
return len(self._logs)
def __getitem__(self, item):
if not self.index_downloaded:
self.refresh_log_index()
if type(item) == str:
try:
return self._logs[item]
except KeyError:
raise KeyError(f"A log with the name '{item}' does not exist")
else:
try:
return list(self._logs.values())[item]
except IndexError:
raise IndexError(
f"Tried to access log nr {item}, "
+ f"but there are only {len(self._logs.values())} logs available"
)
def __str__(self):
return tabulate.tabulate(
self, headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
Variables
logger
Functions
decompress_log
def decompress_log(
log: 'bytes'
) -> 'bytes'
Decompress a log file
View Source
def decompress_log(log: bytes) -> bytes:
"""Decompress a log file"""
return zlib.decompressobj(wbits=zlib.MAX_WBITS | 16).decompress(log)
human_readable_filesize
def human_readable_filesize(
binsize: 'int'
) -> 'str'
Convert bytes to human readable string
View Source
def human_readable_filesize(binsize: int) -> str:
"""Convert bytes to human readable string"""
suffix = "B"
num = binsize
for unit in ["", "Ki", "Mi"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Gi{suffix}"
Classes
LegacyLogFile
class LegacyLogFile(
maxdepth,
name,
timestamp,
binsize,
ip
)
This class is a container for a log file stored on the drone
The drone lists the file name, max depth, start time, and file size for each log,
and you can show this information by printing the log object, eg. on a Drone
object called myDrone
:
print(myDrone.logs[0])
or, if you want to display the header you can format the object with with_header
:
print(f"{myDrone.logs[0]:with_header}")
Calling the download() method on a log object will pull the CSV (Comma Separated Value) file from the drone to your local filesystem.
View Source
class LegacyLogFile:
"""
This class is a container for a log file stored on the drone
The drone lists the file name, max depth, start time, and file size for each log,
and you can show this information by printing the log object, eg. on a Drone
object called `myDrone`:
```
print(myDrone.logs[0])
```
or, if you want to display the header you can format the object with `with_header`:
```
print(f"{myDrone.logs[0]:with_header}")
```
Calling the download() method on a log object will pull the CSV (Comma Separated
Value) file from the drone to your local filesystem.
"""
def __init__(self, maxdepth, name, timestamp, binsize, ip):
self.maxdepth = maxdepth
self.name = name
self.timestamp: datetime = dateutil.parser.isoparse(timestamp)
self.binsize = binsize
self.download_path = "http://" + ip + "/logcsv/" + name
self._formatted_values = [
self.name,
self.timestamp.strftime("%d. %b %Y %H:%M"),
f"{self.maxdepth/1000:.2f} m",
human_readable_filesize(self.binsize),
]
def download(self, output_path=None, output_name=None, downsample_divisor=10):
"""
Download the specified log to your local file system
If you specify an output_path the log file will be downloaded to that directory
instead of the current one.
Specifying output_name will overwrite the default file name with whatever you
have specified (be sure to include the .csv extension).
The drone samples the log content at 10 Hz, and by default this function downsamples this
rate to 1 Hz.
"""
log = requests.get(self.download_path, params={"divisor": downsample_divisor}).content
if output_path is None:
output_path = "./"
if output_name is None:
output_name = self.name
with open(f"{output_path}{output_name}", "wb") as f:
f.write(log)
def __format__(self, format_specifier):
if format_specifier == "with_header":
return tabulate.tabulate(
[self], headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
else:
return tabulate.tabulate([self], tablefmt="plain")
def __str__(self):
return f"{self}"
def __getitem__(self, item):
return self._formatted_values[item]
Methods
download
def download(
self,
output_path=None,
output_name=None,
downsample_divisor=10
)
Download the specified log to your local file system
If you specify an output_path the log file will be downloaded to that directory instead of the current one.
Specifying output_name will overwrite the default file name with whatever you have specified (be sure to include the .csv extension).
The drone samples the log content at 10 Hz, and by default this function downsamples this rate to 1 Hz.
View Source
def download(self, output_path=None, output_name=None, downsample_divisor=10):
"""
Download the specified log to your local file system
If you specify an output_path the log file will be downloaded to that directory
instead of the current one.
Specifying output_name will overwrite the default file name with whatever you
have specified (be sure to include the .csv extension).
The drone samples the log content at 10 Hz, and by default this function downsamples this
rate to 1 Hz.
"""
log = requests.get(self.download_path, params={"divisor": downsample_divisor}).content
if output_path is None:
output_path = "./"
if output_name is None:
output_name = self.name
with open(f"{output_path}{output_name}", "wb") as f:
f.write(log)
LegacyLogs
class LegacyLogs(
parent_drone,
auto_download_index=False
)
This class is an index of the legacy csv log files stored on the drone
To show the available logs you simply print this object, ie. if your Drone object
is called myDrone
, you can do:
print(myDrone.legacy_logs)
This will print a list of all available logs, with some of their metadata, such as name and maxdepth.
You can access logfile objects either by index or by name. Eg. if you want the first
logfile in the list you can do myDrone.logs[0]
, or if you want some particular log you
can do myDrone.logs["exampleName0001.csv"]
. You can even give it a slice, so if you want
the last 10 logs you can do myDrone.logs[-10:]
.
View Source
class LegacyLogs:
"""This class is an index of the legacy csv log files stored on the drone
To show the available logs you simply print this object, ie. if your Drone object
is called `myDrone`, you can do:
```
print(myDrone.legacy_logs)
```
This will print a list of all available logs, with some of their metadata, such as
name and maxdepth.
You can access logfile objects either by index or by name. Eg. if you want the first
logfile in the list you can do `myDrone.logs[0]`, or if you want some particular log you
can do `myDrone.logs["exampleName0001.csv"]`. You can even give it a slice, so if you want
the last 10 logs you can do `myDrone.logs[-10:]`.
"""
def __init__(self, parent_drone, auto_download_index=False):
self.ip = parent_drone._ip
self._parent_drone = parent_drone
self.index_downloaded = False
if auto_download_index:
self.refresh_log_index()
else:
self._logs = {}
def _get_list_of_logs_from_drone(self, get_all: bool):
list_of_dictionaries = requests.get(
"http://" + self.ip + "/logcsv", params={"all": True} if get_all else {}
).json()
return list_of_dictionaries
def _build_log_files_from_dictionary(self, list_of_logs_in_dictionaries):
loglist = {}
for log in list_of_logs_in_dictionaries:
try:
loglist[log["name"]] = LegacyLogFile(
log["maxdepth"], log["name"], log["timestamp"], log["binsize"], self.ip
)
except dateutil.parser.ParserError:
logger.warning(
f"Could not parse timestamp for log {log['name']}, skipping this log file"
)
return loglist
def refresh_log_index(self, get_all_logs=False):
"""Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check
for new log files it can be called at any time.
Pass with `get_all_logs=True` to include logs that are not classified as dives.
"""
if not self._parent_drone.connected:
raise ConnectionError(
"The connection to the drone is not established, try calling the connect method "
"before retrying"
)
list_of_logs_in_dictionaries = self._get_list_of_logs_from_drone(get_all_logs)
self._logs = self._build_log_files_from_dictionary(list_of_logs_in_dictionaries)
self.index_downloaded = True
def __len__(self):
if not self.index_downloaded:
self.refresh_log_index()
return len(self._logs)
def __getitem__(self, item):
if not self.index_downloaded:
self.refresh_log_index()
if type(item) == str:
try:
return self._logs[item]
except KeyError:
raise KeyError(f"A log with the name '{item}' does not exist")
else:
try:
return list(self._logs.values())[item]
except IndexError:
raise IndexError(
f"Tried to access log nr {item}, "
+ f"but there are only {len(self._logs.values())} logs available"
)
def __str__(self):
return tabulate.tabulate(
self, headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
Methods
refresh_log_index
def refresh_log_index(
self,
get_all_logs=False
)
Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check for new log files it can be called at any time.
Pass with get_all_logs=True
to include logs that are not classified as dives.
View Source
def refresh_log_index(self, get_all_logs=False):
"""Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check
for new log files it can be called at any time.
Pass with `get_all_logs=True` to include logs that are not classified as dives.
"""
if not self._parent_drone.connected:
raise ConnectionError(
"The connection to the drone is not established, try calling the connect method "
"before retrying"
)
list_of_logs_in_dictionaries = self._get_list_of_logs_from_drone(get_all_logs)
self._logs = self._build_log_files_from_dictionary(list_of_logs_in_dictionaries)
self.index_downloaded = True
LogFile
class LogFile(
name: 'str',
is_dive: 'bool',
filesize: 'int',
start_time: 'int',
max_depth_magnitude: 'int',
ip: 'str'
)
View Source
class LogFile:
def __init__(
self,
name: str,
is_dive: bool,
filesize: int,
start_time: int,
max_depth_magnitude: int,
ip: str,
):
self.name = name
self.is_dive = is_dive
self.filesize = filesize
self.start_time: datetime = datetime.fromtimestamp(start_time, tz=timezone.utc)
self.max_depth_magnitude = max_depth_magnitude
self.download_url = f"http://{ip}/logs/{self.name}/binlog"
self.content = None
self._formatted_values = [
self.name,
self.start_time.strftime("%d. %b %Y %H:%M"),
f"{self.max_depth_magnitude} m",
human_readable_filesize(self.filesize),
]
def download(
self,
output_path: Optional[Path | str] = None,
write_to_file: bool = True,
timeout: float = 1,
overwrite_cache: bool = False,
) -> bytes:
"""Download a log file from the drone
*Arguments*:
* `output_path`:
Path to write the log file to. If `None`, the log will be written to the
current working directory. If the path is a directory, the log will be
downloaded to that directory with its original name. Else the log will be
downloaded to the specified path.
* `write_to_file`:
If True, the log will be written to the specified path. If False, the
log will only be returned as a bytes object.
* `timeout`:
Seconds to wait for response
* `overwrite_cache`:
If True, the log will be downloaded even if it is already been downloaded.
*Returns*:
The compressed log file as a bytes object.
"""
if self.content is None or overwrite_cache:
self.content = requests.get(self.download_url, timeout=timeout).content
if write_to_file:
if output_path is None:
output_path = Path(f"{self.name}.bez")
else:
if type(output_path) == str:
output_path = Path(output_path)
if output_path.is_dir():
output_path = output_path.joinpath(f"{self.name}.bez")
with open(output_path, "wb") as f:
f.write(self.content)
return self.content
def parse_to_stream(self) -> LogStream:
"""Parse the log file to a stream
Will download the log if it is not already downloaded.
*Returns*:
A `LogStream` object
"""
return LogStream(self.download(write_to_file=False))
def __format__(self, format_specifier):
if format_specifier == "with_header":
return tabulate.tabulate(
[self], headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
else:
return tabulate.tabulate([self], tablefmt="plain")
def __str__(self):
return f"{self}"
def __getitem__(self, item):
return self._formatted_values[item]
Methods
download
def download(
self,
output_path: 'Optional[Path | str]' = None,
write_to_file: 'bool' = True,
timeout: 'float' = 1,
overwrite_cache: 'bool' = False
) -> 'bytes'
Download a log file from the drone
Arguments:
output_path
: Path to write the log file to. IfNone
, the log will be written to the current working directory. If the path is a directory, the log will be downloaded to that directory with its original name. Else the log will be downloaded to the specified path.write_to_file
: If True, the log will be written to the specified path. If False, the log will only be returned as a bytes object.timeout
: Seconds to wait for responseoverwrite_cache
: If True, the log will be downloaded even if it is already been downloaded.
Returns:
The compressed log file as a bytes object.
View Source
def download(
self,
output_path: Optional[Path | str] = None,
write_to_file: bool = True,
timeout: float = 1,
overwrite_cache: bool = False,
) -> bytes:
"""Download a log file from the drone
*Arguments*:
* `output_path`:
Path to write the log file to. If `None`, the log will be written to the
current working directory. If the path is a directory, the log will be
downloaded to that directory with its original name. Else the log will be
downloaded to the specified path.
* `write_to_file`:
If True, the log will be written to the specified path. If False, the
log will only be returned as a bytes object.
* `timeout`:
Seconds to wait for response
* `overwrite_cache`:
If True, the log will be downloaded even if it is already been downloaded.
*Returns*:
The compressed log file as a bytes object.
"""
if self.content is None or overwrite_cache:
self.content = requests.get(self.download_url, timeout=timeout).content
if write_to_file:
if output_path is None:
output_path = Path(f"{self.name}.bez")
else:
if type(output_path) == str:
output_path = Path(output_path)
if output_path.is_dir():
output_path = output_path.joinpath(f"{self.name}.bez")
with open(output_path, "wb") as f:
f.write(self.content)
return self.content
parse_to_stream
def parse_to_stream(
self
) -> 'LogStream'
Parse the log file to a stream
Will download the log if it is not already downloaded.
Returns:
A LogStream
object
View Source
def parse_to_stream(self) -> LogStream:
"""Parse the log file to a stream
Will download the log if it is not already downloaded.
*Returns*:
A `LogStream` object
"""
return LogStream(self.download(write_to_file=False))
LogStream
class LogStream(
log: 'bytes',
decompress: 'bool' = True
)
Class for streaming a log
Creates a stream from a downloaded log file. Iterate over the object to get the next log record.
View Source
class LogStream:
"""Class for streaming a log
Creates a stream from a downloaded log file. Iterate over the object to get the next log record.
"""
def __init__(
self, log: bytes, decompress: bool = True
) -> Iterator[
Tuple[
proto.datetime_helpers.DatetimeWithNanoseconds, # Real time clock
timedelta, # Time since first message
proto.message.MessageMeta, # Message type
proto.message.Message, # Message contents
]
]:
if decompress:
self.decompressed_log = decompress_log(log)
else:
self.decompressed_log = log
self.pos = 0
self.start_monotonic: proto.datetime_helpers.DatetimeWithNanoseconds = 0
def __iter__(self):
return self
def __next__(self):
if self.pos < len(self.decompressed_log):
msg_size, pos_msg_start = decodeVarint(self.decompressed_log, self.pos)
msg_data = self.decompressed_log[pos_msg_start : (pos_msg_start + msg_size)]
if len(msg_data) < msg_size:
raise EOFError("Not enough bytes to read message")
self.pos = pos_msg_start + msg_size
msg = bp.BinlogRecord.deserialize(msg_data)
payload_type, payload_msg_deserialized = deserialize_any_to_message(msg)
if self.start_monotonic == 0:
self.start_monotonic = msg.clock_monotonic
return (
msg.unix_timestamp,
msg.clock_monotonic - self.start_monotonic,
payload_type,
payload_msg_deserialized,
)
else:
raise StopIteration
Logs
class Logs(
parent_drone,
auto_download_index=False
)
View Source
class Logs:
def __init__(self, parent_drone, auto_download_index=False):
self._parent_drone = parent_drone
self.auto_download_index = auto_download_index
self.index_downloaded = False
self._logs = {}
if auto_download_index:
self.refresh_log_index()
def refresh_log_index(self):
"""Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check
for new log files it can be called at any time.
"""
if not self._parent_drone.connected:
raise ConnectionError(
"The connection to the drone is not established, try calling the connect method "
"before retrying"
)
logger.debug("Refreshing log index")
logs_endpoint = f"http://{self._parent_drone._ip}/logs"
logs: List[dict] = requests.get(logs_endpoint).json()
if version.parse(self._parent_drone.software_version_short) < version.parse("3.3"):
# Extend index with dive info, sends a request for each log file so can be quite slow
# for drones with many logs. Not necessary for Blunux >= 3.3 as dive info is included in
# the index.
logger.debug(f"Getting dive info for {len(logs)} logs")
for index, log in enumerate(logs):
dive_info = requests.get(f"{logs_endpoint}/{log['name']}/dive_info").json()
logs[index].update(dive_info)
# Instantiate log objects for each log
logger.debug(f"Creating log objects for {len(logs)} logs")
for log in logs:
if log["has_binlog"]:
self._logs[log["name"]] = LogFile(
log["name"],
log["is_dive"],
log["binlog_size"],
log["start_time"],
log["max_depth_magnitude"],
self._parent_drone._ip,
)
else:
logger.info(f"Log {log['name']} does not have a binlog, ignoring")
self.index_downloaded = True
def __len__(self):
if not self.index_downloaded:
self.refresh_log_index()
return len(self._logs)
def __getitem__(self, item):
if not self.index_downloaded:
self.refresh_log_index()
if type(item) == str:
try:
return self._logs[item]
except KeyError:
raise KeyError(f"A log with the name '{item}' does not exist")
elif isinstance(item, slice):
logs_slice = Logs(self._parent_drone)
for log in list(self._logs.values())[item]:
logs_slice._logs[log.name] = log
logs_slice.index_downloaded = True
return logs_slice
else:
try:
return list(self._logs.values())[item]
except IndexError:
raise IndexError(
f"Tried to access log nr {item}, "
+ f"but there are only {len(self._logs.values())} logs available"
)
def __str__(self):
return tabulate.tabulate(
self, headers=["Name", "Time", "Max depth", "Size"], tablefmt="plain"
)
def filter(self, filter_func: Callable[[LogFile], bool]) -> Logs:
"""Return a new Logs object with only those matching the filter
Eg. to get logs classified as a dive:
```
dive_logs = myDrone.logs.filter(lambda log: log.is_dive)
```
or to get all logs with a max depth greater than 10m:
```
deep_logs = myDrone.logs.filter(lambda log: log.max_depth_magnitude > 10)
```
"""
if not self.index_downloaded:
self.refresh_log_index()
filtered_logs = Logs(self._parent_drone)
filtered_logs.index_downloaded = True
for log in self:
if filter_func(log):
filtered_logs._logs[log.name] = log
return filtered_logs
Methods
filter
def filter(
self,
filter_func: 'Callable[[LogFile], bool]'
) -> 'Logs'
Return a new Logs object with only those matching the filter
Eg. to get logs classified as a dive:
dive_logs = myDrone.logs.filter(lambda log: log.is_dive)
or to get all logs with a max depth greater than 10m:
deep_logs = myDrone.logs.filter(lambda log: log.max_depth_magnitude > 10)
View Source
def filter(self, filter_func: Callable[[LogFile], bool]) -> Logs:
"""Return a new Logs object with only those matching the filter
Eg. to get logs classified as a dive:
```
dive_logs = myDrone.logs.filter(lambda log: log.is_dive)
```
or to get all logs with a max depth greater than 10m:
```
deep_logs = myDrone.logs.filter(lambda log: log.max_depth_magnitude > 10)
```
"""
if not self.index_downloaded:
self.refresh_log_index()
filtered_logs = Logs(self._parent_drone)
filtered_logs.index_downloaded = True
for log in self:
if filter_func(log):
filtered_logs._logs[log.name] = log
return filtered_logs
refresh_log_index
def refresh_log_index(
self
)
Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check for new log files it can be called at any time.
View Source
def refresh_log_index(self):
"""Refresh the log index from the drone
This is method is run on the first log access by default, but if you would like to check
for new log files it can be called at any time.
"""
if not self._parent_drone.connected:
raise ConnectionError(
"The connection to the drone is not established, try calling the connect method "
"before retrying"
)
logger.debug("Refreshing log index")
logs_endpoint = f"http://{self._parent_drone._ip}/logs"
logs: List[dict] = requests.get(logs_endpoint).json()
if version.parse(self._parent_drone.software_version_short) < version.parse("3.3"):
# Extend index with dive info, sends a request for each log file so can be quite slow
# for drones with many logs. Not necessary for Blunux >= 3.3 as dive info is included in
# the index.
logger.debug(f"Getting dive info for {len(logs)} logs")
for index, log in enumerate(logs):
dive_info = requests.get(f"{logs_endpoint}/{log['name']}/dive_info").json()
logs[index].update(dive_info)
# Instantiate log objects for each log
logger.debug(f"Creating log objects for {len(logs)} logs")
for log in logs:
if log["has_binlog"]:
self._logs[log["name"]] = LogFile(
log["name"],
log["is_dive"],
log["binlog_size"],
log["start_time"],
log["max_depth_magnitude"],
self._parent_drone._ip,
)
else:
logger.info(f"Log {log['name']} does not have a binlog, ignoring")
self.index_downloaded = True