Module awsrun.commands.aws.last
Display the last CloudTrail events in an account.
Overview
last
provides an easier way to review CloudTrail events in either an
interactive or non-interactive manner depending on whether or not the
--interactive
flag was supplied. In both cases, the events are grouped
together by the user that caused them.
With no addition arguments besides specification of --region
, the last
command retrieves the past one hour of write events up to a maximum of 1,000
events per account/region pair. Newest events are shown at the top. If output
is not being redirected, events from the same user are displayed in the same
color.
$ awsrun --account 100200300400 last --region us-east-1
Loading events, 1000 max per acct/region, use --max-events to change
100200300400/us-east-1: 2020-04-08 00:43:43-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
100200300400/us-east-1: 2020-04-08 00:43:39-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
100200300400/us-east-1: 2020-04-08 00:43:38-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
100200300400/us-east-1: 2020-04-08 00:43:35-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
The first column of output is the account/region where the event was collected. The second column is the event timestamp. The third column is the event source. The fourth column is the event name. And the last column is the "user" that generated the event.
Users can specify time ranges using --start
and --end
flags. These take
ISO 8601 formatted dates: 2020-04-07T22:30:00-0500
.
When specifying a time
range, use either --last
HOURS or --start
/--end
flags, but not both at
same time. If none of these flags are specified, the past hour of events is
retrieved by default. For example, the following command retrieves a 1-minute
window of write events:
$ awsrun --account 100200300400 last --region us-east-1 --start 2020-04-07T00:20:00-0500 --end 2020-04-07T00:21:00-0500
As stated before, only write events are retrieved, but Users can filter events
by any of the supported lookup attributes in CloudTrail via the --attribute
:
EventId, EventName, ReadOnly, Username, ResourceType, ResourceName,
EventSource, or AccessKeyId. To filter on console logins for the past 12
hours:
$ awsrun --account 100200300400 last --region us-east-1 --hours 12 --attribute EventName=ConsoleLogin
Loading events, 1000 max per acct/region, use --max-events to change
100200300400/us-east-1: 2020-04-08 00:56:17-05:00 signin.amazonaws.com ConsoleLogin Operator/user@example.com
Two shorthand attribute filters exist. The --all
flag will select all events
including the read-only events.
The --console
flag will filter on console
logins as above:
$ awsrun --account 100200300400 last --region us-east-1 --hours 12 --console
Loading events, 1000 max per acct/region, use --max-events to change
100200300400/us-east-1: 2020-04-08 00:56:17-05:00 signin.amazonaws.com ConsoleLogin Operator/user@example.com
To minimize memory footprint and load on AWS servers, a maximum of 1,000
events are pulled from an account/region pair. Use the --max-events
flag to
override the value.
Finally, for a TUI (terminal user interface) that lets you interactively
explore events, specify the --interactive
flag. Follow the on-screen
instructions to interact with the TUI.
Reference
Synopsis
$ awsrun [options] last [command options]
Configuration
The following is the syntax for the options that can be specified in the user configuration file:
Commands:
last:
hours: INT
max_events: INT
region:
- STRING
all: BOOLEAN
console: BOOLEAN
attributes:
STRING:
- STRING
interactive: BOOLEAN
Command Options
Some options can be overridden on the awsrun CLI via command line flags. In those cases, the CLI flags are specified next to the option name below:
hours
,--hours INT
- Specifies the how many hours of events to retrieve. The default value is 1
hour. Note: The number of events retrieved will not exceed
max_events
. max_events
,--max-events INT
- Specifies the upper limit on the number of events to retrieve on a per account per region basis. The default value is 1,000 events.
region
,--region REGION
- Run the command in the specified regions. When specifying multiple values on the command line, use multiple flags for each value.
all
,--all
- Retrieve all CloudTrail events including read-only events. The default value
is False. This option is mutually exclusive with the
console
andattributes
options. console
,--console
- Retrieve only console login CloudTrail events. The default is value is False.
This option is mutually exclusive with the
all
andattributes
options. attributes
,--attribute KEY=VALUE
- Retrieve only CloudTrail events matching the attribute key and value. The
possible key values are:
EventId
,EventName
,ReadOnly
,Username
,ResourceType
,ResourceName
,EventSource
, andAccessKeyId
. Due to limitations in the CloudTrail API, only one attribute can be specified. This option is mutually exclusive with theall
andconsole
options. interactive
,--interactive
- Open an interactive TUI (terminal user interface) instead of printing events to the console. The default value is False.
The following is a sample configuration to add a permanent filter in your
configuration file for DeleteStack
events using the attributes
configuration
option:
Commands:
last:
attributes:
EventName:
- DeleteStack
Expand source code
#
# Copyright 2023 FMR LLC <opensource@fidelity.com>
#
# SPDX-License-Identifier: Apache-2.0
#
"""Display the last CloudTrail events in an account.
## Overview
`last` provides an easier way to review CloudTrail events in either an
interactive or non-interactive manner depending on whether or not the
`--interactive` flag was supplied. In both cases, the events are grouped
together by the user that caused them.
With no addition arguments besides specification of `--region`, the last
command retrieves the past one hour of write events up to a maximum of 1,000
events per account/region pair. Newest events are shown at the top. If output
is not being redirected, events from the same user are displayed in the same
color.
$ awsrun --account 100200300400 last --region us-east-1
Loading events, 1000 max per acct/region, use --max-events to change
100200300400/us-east-1: 2020-04-08 00:43:43-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
100200300400/us-east-1: 2020-04-08 00:43:39-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
100200300400/us-east-1: 2020-04-08 00:43:38-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
100200300400/us-east-1: 2020-04-08 00:43:35-05:00 logs.amazonaws.com CreateLogGroup ECSClusterRole/i-XXXXXXXXXXXXXXXXX
The first column of output is the account/region where the event was
collected. The second column is the event timestamp. The third column is the
event source. The fourth column is the event name. And the last column is the
"user" that generated the event.
Users can specify time ranges using `--start` and `--end` flags. These take
ISO 8601 formatted dates: `2020-04-07T22:30:00-0500`. When specifying a time
range, use either `--last` HOURS or `--start`/`--end` flags, but not both at
same time. If none of these flags are specified, the past hour of events is
retrieved by default. For example, the following command retrieves a 1-minute
window of write events:
$ awsrun --account 100200300400 last --region us-east-1 --start 2020-04-07T00:20:00-0500 --end 2020-04-07T00:21:00-0500
As stated before, only write events are retrieved, but Users can filter events
by any of the supported lookup attributes in CloudTrail via the `--attribute`:
EventId, EventName, ReadOnly, Username, ResourceType, ResourceName,
EventSource, or AccessKeyId. To filter on console logins for the past 12
hours:
$ awsrun --account 100200300400 last --region us-east-1 --hours 12 --attribute EventName=ConsoleLogin
Loading events, 1000 max per acct/region, use --max-events to change
100200300400/us-east-1: 2020-04-08 00:56:17-05:00 signin.amazonaws.com ConsoleLogin Operator/user@example.com
Two shorthand attribute filters exist. The `--all` flag will select all events
including the read-only events. The `--console` flag will filter on console
logins as above:
$ awsrun --account 100200300400 last --region us-east-1 --hours 12 --console
Loading events, 1000 max per acct/region, use --max-events to change
100200300400/us-east-1: 2020-04-08 00:56:17-05:00 signin.amazonaws.com ConsoleLogin Operator/user@example.com
To minimize memory footprint and load on AWS servers, a maximum of 1,000
events are pulled from an account/region pair. Use the `--max-events` flag to
override the value.
Finally, for a TUI (terminal user interface) that lets you interactively
explore events, specify the `--interactive` flag. Follow the on-screen
instructions to interact with the TUI.
## Reference
### Synopsis
$ awsrun [options] last [command options]
### Configuration
The following is the syntax for the options that can be specified in the user
configuration file:
Commands:
last:
hours: INT
max_events: INT
region:
- STRING
all: BOOLEAN
console: BOOLEAN
attributes:
STRING:
- STRING
interactive: BOOLEAN
### Command Options
Some options can be overridden on the awsrun CLI via command line flags. In
those cases, the CLI flags are specified next to the option name below:
`hours`, `--hours INT`
: Specifies the how many hours of events to retrieve. The default value is 1
hour. Note: The number of events retrieved will not exceed `max_events`.
`max_events`, `--max-events INT`
: Specifies the upper limit on the number of events to retrieve on a per account
per region basis. The default value is 1,000 events.
`region`, `--region REGION`
: Run the command in the specified regions. When specifying multiple values on
the command line, use multiple flags for each value.
`all`, `--all`
: Retrieve all CloudTrail events including read-only events. The default value
is False. This option is mutually exclusive with the `console` and `attributes`
options.
`console`, `--console`
: Retrieve only console login CloudTrail events. The default is value is False.
This option is mutually exclusive with the `all` and `attributes` options.
`attributes`, `--attribute KEY=VALUE`
: Retrieve only CloudTrail events matching the attribute key and value. The
possible key values are: `EventId`, `EventName`, `ReadOnly`, `Username`,
`ResourceType`, `ResourceName`, `EventSource`, and `AccessKeyId`. Due to
limitations in the CloudTrail API, only one attribute can be specified. This
option is mutually exclusive with the `all` and `console` options.
`interactive`, `--interactive`
: Open an interactive TUI (terminal user interface) instead of printing events
to the console. The default value is False.
The following is a sample configuration to add a permanent filter in your
configuration file for `DeleteStack` events using the `attributes` configuration
option:
Commands:
last:
attributes:
EventName:
- DeleteStack
"""
import json
import sys
import warnings
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from functools import partial
from itertools import chain, cycle
from typing import Optional
from awsrun.argparse import AppendAttributeValuePair
from awsrun.config import Bool, Dict, Int, List, Str
from awsrun.runner import RegionalCommand
try:
import pyperclip
from colorama import Fore, Style, init
from rich.markdown import Markdown
from rich.syntax import Syntax
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.message import Message
from textual.reactive import reactive
from textual.widgets import Button, DataTable, Footer, Header, Input, Static
except ImportError:
sys.exit(
"""
The 'last' command requires dependencies not installed by default with
awsrun. Please install them with the following command:
pip install awsrun[last]
"""
)
# Textual enables ResourceWarnings, which is not the python default,
# so we disable them as we get warnings from boto library that we
# cannot control.
warnings.simplefilter("ignore", ResourceWarning)
# Lookup attributes supported by AWS CloudTrail
LOOKUP_ATTRIBUTES = [
"EventId",
"EventName",
"ReadOnly",
"Username",
"ResourceType",
"ResourceName",
"EventSource",
"AccessKeyId",
]
class CLICommand(RegionalCommand):
"""Displays the last CloudTrail events in an account."""
@classmethod
def regional_from_cli(cls, parser, argv, cfg):
time_spec = parser.add_argument_group("Time specification")
time_spec.add_argument(
"--hours",
metavar="N",
type=int,
default=cfg("hours", type=Int),
help="retrieve the last N hours of events",
)
time_spec.add_argument(
"--start",
type=_isodate,
help="lookup events starting at YYYY-MM-DDTHH:MM:SS-00:00",
)
time_spec.add_argument(
"--end",
type=_isodate,
help="lookup events ending at YYYY-MM-DDTHH:MM:SS-00:00",
)
parser.add_argument(
"--max-events",
metavar="N",
type=int,
default=cfg("max_events", type=Int, default=1000),
help="limit # of events retrieved",
)
filters = parser.add_argument_group("Event filters")
mut_excl = filters.add_mutually_exclusive_group()
mut_excl.add_argument(
"--all",
action="store_true",
default=cfg("all", type=Bool, default=False),
help="include read-only events",
)
mut_excl.add_argument(
"--console",
action="store_true",
default=cfg("console", type=Bool, default=False),
help="include only ConsoleLogin events",
)
mut_excl.add_argument(
"--attribute",
"-a",
dest="attributes",
action=AppendAttributeValuePair,
default=cfg("attributes", type=Dict(Str, List(Str)), default={}),
help="filter using attribute in form of ATTR=VALUE",
)
tui = parser.add_argument_group("TUI options")
tui.add_argument(
"--interactive",
"-i",
action="store_true",
default=cfg("interactive", type=Bool, default=False),
help="enter interactive mode to view results",
)
args = parser.parse_args(argv)
# If user doesn't specify any filters, then exclude the read-only
# events as there are far too many of these typically. While our
# argument parser can support multipe key and values, the AWS
# CloudTrail API is lacking considerably in ability to specify
# filters. One can only use a single lookup attribute and that
# attribute can only have a single value. We allow the user to
# explicity set their own filter or use one of our the shorthand
# filters such as --all or --console.
if not (args.all or args.console or args.attributes):
args.attributes["ReadOnly"] = ["false"]
elif args.console:
args.attributes["EventName"] = ["ConsoleLogin"]
elif len(args.attributes) > 1:
parser.error("only one lookup attribute may be used per AWS")
elif any(len(v) > 1 for v in args.attributes.values()):
parser.error("only one lookup value may be specified per AWS")
elif any(a not in LOOKUP_ATTRIBUTES for a in args.attributes):
parser.error(
f"invalid attribute, must be one of {', '.join(LOOKUP_ATTRIBUTES)}"
)
# If no time spec flags provided, default to last of 1 hour of events.
if not (args.hours or args.start or args.end):
args.hours = 1
# If only --hours was specified, then compute a start and end time as
# our CLICommand doesn't support --last.
if args.hours and not (args.start or args.end):
args.end = datetime.now(timezone.utc)
args.start = args.end - timedelta(hours=args.hours)
elif args.hours and (args.start or args.end):
parser.error("must specify either --hours OR --start/--end flags")
elif not (args.start and args.end):
parser.error("must specify both --start and --end flags")
del args.all
del args.hours
del args.console
return cls(**vars(args))
def __init__(
self,
regions,
start=None,
end=None,
attributes=None,
max_events=1000,
interactive=False,
):
super().__init__(regions)
# Event settings
self.start = start
self.end = end
self.max_events = max_events
self.attributes = {} if attributes is None else attributes
# TUI settings
self.interactive = interactive
# List of all events in reverse chronological order. We wouldn't have
# to keep this list if event timestamps were more granular than a
# second. Given that they are not, it's not possible to accurately
# reconstruct the series of events from a dict of events by user.
self.events = []
def pre_hook(self):
print(
f"Loading events, {self.max_events} max per acct/region, use --max-events to change",
file=sys.stderr,
)
if not self.interactive:
init() # colorama only needed for the noninteractive version
def regional_execute(self, session, acct, region):
ct = session.client("cloudtrail", region_name=region)
events = self._retrieve_events(ct)
if self.interactive:
return events
cf_map = defaultdict(lambda: next(COLOR_FUNCTIONS))
return "".join(cf_map[e.username()](f"{acct}/{region}: {e}\n") for e in events)
def _retrieve_events(self, ct):
events = []
event_ids = set()
for page in _lookup_events(
ct, start=self.start, end=self.end, attrs=self.attributes
):
for event in page["Events"]:
event_id = event["EventId"]
# Check to make sure AWS is not returning duplicate events.
# I've seen this happen sometimes, so let's ignore the dups.
if event_id in event_ids:
continue
event_ids.add(event_id)
event["CloudTrailEvent"] = json.loads(event["CloudTrailEvent"])
events.append(_UserIdentityType.new(event))
if self.interactive:
print(".", end="", file=sys.stderr, flush=True)
if len(events) >= self.max_events:
return events
return events
def regional_collect_results(self, acct, region, get_result):
if not self.interactive:
super().regional_collect_results(acct, region, get_result)
return
self.events.append(get_result())
def post_hook(self):
if not self.interactive:
return
# Let's fire up the TUI!
app = EventViewer(list(chain(*self.events)))
app.run()
class _UserIdentityType:
"""Represents a custom event object based on the user identity type.
The event object wraps an event dict with a convenience method to compute
the username for a given event. CloudTrail events seem to be a mess when it
comes to normalizing the username (at least in our environment). The purpose
of this class is to provide a means to extract a reasonable username from an
event that can be used to group events together via the `username` method.
The factory method `new` should be used to create instances of this type. It
will dispatch to a custom subclass based on the user type. If no custom
subclass exists, an instance of this class is created. If one wants to add
new user types, just follow the patterns of the existing subclasses.
"""
@classmethod
def new(cls, event):
"""Return a concrete implementation of `_UserIdentityType`."""
user_type = event["CloudTrailEvent"]["userIdentity"].get("type", "AWSService")
klass = globals().get(f"_{user_type}Type", cls)
return klass(event)
def __init__(self, event):
self.event = event
self.ct_event = event["CloudTrailEvent"]
self.user_identity = self.ct_event["userIdentity"]
def type(self):
"""Return the user identity type if present, otherwise NO_TYPE."""
return self.user_identity.get("type", "NO_TYPE")
def username(self):
"""Return a the username associated with an event."""
if self.event["EventName"].startswith("AssumeRole"):
user = self._parse_username_from_request_params()
if user:
return user
return self._parse_username()
def contains(self, s):
"""Return True if the event contains `s` (case-insenstive)."""
s = s.lower()
try:
next(
_deep_finder(
self.event, lambda n: isinstance(n, str) and s in n.lower()
)
)
return True
except StopIteration:
return False
def has_error(self):
return "errorCode" in self.ct_event
def _parse_username(self):
return self.event.get(
"Username", self.user_identity.get("userName", "NO_USERNAME")
)
def _find_resource(self, type_, default=None):
for resource in self.event["Resources"]:
if resource["ResourceType"] == type_:
return resource["ResourceName"]
return default
def _parse_username_from_request_params(self):
params = self.ct_event.get("requestParameters", {})
arn = params.get("roleArn")
session_name = params.get("roleSessionName")
if arn and session_name:
return _strip_to("/", arn, greedy=True) + "/" + session_name
return None
def event_id(self):
return self.event["EventId"]
def to_json(self):
"""Return event as JSON string."""
return json.dumps(self.event, default=str, indent=4)
def to_row(self):
return (
self.event.get("EventTime", ""),
self.event.get("EventSource", ""),
self.event.get("EventName", ""),
self.ct_event.get("errorCode", ""),
)
def __str__(self):
src = self.event.get("EventSource", "")
time = self.event.get("EventTime", "")
name = self.event.get("EventName", "")
error = self.ct_event.get("errorCode", "")
if error:
error = f"ERROR: {error}"
return f"{time} {src:25.25} {name:30.30} {self.username()} {error}"
#############################################################################
# These classes are for custom handling of different user identity types. you
# want to provide a custom username for a cloudtrail user identity type,
# define your subclass below following the naming convention.
#############################################################################
class _RootType(_UserIdentityType): # pyright: ignore
def _parse_username(self):
# Docs state that Root does not have a username unless an alias has
# been set for the account, so we try to print the username, if not,
# we return "ROOT".
return self.user_identity.get("userName", "ROOT")
class _AWSAccountType(_UserIdentityType): # pyright: ignore
def _parse_username(self):
acct = self.user_identity.get("accountId", "NO_ACCOUNT_ID")
principal = self.user_identity.get("principalId", "NO_PRINCIPAL_ID")
return f"{acct}/{_strip_to(':', principal)}"
class _UnknownType(_UserIdentityType): # pyright: ignore
def _parse_username(self):
acct = self.user_identity.get("accountId", "NO_ACCOUNT_ID")
return f"{acct}/unknown"
class _AWSServiceType(_UserIdentityType): # pyright: ignore
def _parse_username(self):
return self.user_identity.get("invokedBy", "NO_USERNAME")
class _AssumedRoleType(_UserIdentityType): # pyright: ignore
def _parse_username(self):
# For an assumed role type, we get username from the arn because it's
# the only thing consistent throughout the other type of events, which
# allows us to match a sequence of events.
arn = self.user_identity.get("arn")
if arn:
return _strip_to("/", arn)
return self.event.get("Username")
#############################################################################
# Helper functions
#############################################################################
def _isodate(string):
return datetime.strptime(string, "%Y-%m-%dT%H:%M:%S%z")
def _strip_to(char, string, greedy=False):
pos = string.rfind(char) if greedy else string.find(char)
return string[pos + 1 :]
def _lookup_events(ct, start, end, attrs=None):
attrs = {} if attrs is None else attrs
return ct.get_paginator("lookup_events").paginate(
LookupAttributes=[
{"AttributeKey": k, "AttributeValue": v}
for k, vs in attrs.items()
for v in vs
],
StartTime=start,
EndTime=end,
)
def _filter_events(events, query):
"""Return list of events filtered by query expression.
Query expression may consist of one or more terms. Terms are matched using
logical AND. A term may be prefixed with an optional '-' to exclude events
containing the term. Terms are case insensitive and are matched as substrings
in a CloudTrail event including both keys and values.
For example, to search for CloudTrail events that had errors:
errorcode
To search for errors excluding S3 issues:
errorcode -s3
To search for errors excluding S3 that are due to rate limiting:
errorCode -s3 RequestLimitExceeded
Technically, the above could be shortened:
-s3 RequestLimitExceeded
"""
terms = query.split()
for term in terms:
if term.startswith("-"):
events = [e for e in events if not e.contains(term[1:])]
else:
events = [e for e in events if e.contains(term)]
return events
def _deep_finder(node, predicate):
"""Return a generator that yields keys or objects matching `predicate`."""
if isinstance(node, dict):
for k, v in node.items():
if predicate(k):
yield k
yield from _deep_finder(v, predicate)
elif isinstance(node, list):
for e in node:
yield from _deep_finder(e, predicate)
else:
if predicate(node):
yield node
def _colorize(color, string):
return f"{color}{string}{Style.RESET_ALL}"
# An infinite list of partially applied colorize functions
COLOR_FUNCTIONS = cycle(
[
partial(_colorize, f"{s}{c}")
for s in ("", Style.DIM, Style.BRIGHT)
for c in (
Fore.BLUE,
Fore.GREEN,
Fore.YELLOW,
Fore.RED,
Fore.MAGENTA,
Fore.CYAN,
Fore.WHITE,
)
]
)
#############################################################################
# Textual TUI
#############################################################################
class Events:
"""Internal representation of the list of filtered events in TUI."""
def __init__(self, events):
self.unfiltered_events = events
self.filter_expr = ""
self._load_events(events)
def _load_events(self, events):
# Need to keep events as a list as we cannot recreate it
# from the values of the event_by_user or we lose the
# order of the events as the timestamp is not granular.
self.events = events
self.events_by_key = {e.event_id(): e for e in self.events}
self.events_by_user = defaultdict(list)
for e in self.events:
self.events_by_user[e.username()].append(e)
def all(self):
return self.events
def users(self):
return self.events_by_user.keys()
def by_user(self, user):
return self.events_by_user.get(user, [])
def by_id(self, event_id) -> Optional[_UserIdentityType]:
return self.events_by_key.get(event_id)
def filter(self, expr):
if not expr: # empty string?
events = self.unfiltered_events
else:
events = _filter_events(self.unfiltered_events, expr)
self.filter_expr = expr
self._load_events(events)
return len(self.events)
class Popup(Container):
"""An offscreen popup for a single input box."""
DEFAULT_CSS = """
Popup {
transition: offset 500ms in_out_cubic;
padding: 0 2 1 2;
width: 65;
height: auto;
background: $panel;
color: $text;
}
Popup:focus-within {
offset: 0 0;
}
Popup.offscreen {
offset-y: 100%;
}
Popup Static {
padding: 0 2 0 2;
}
Popup Horizontal {
padding: 1 2 0 2;
height: auto;
}
Popup Input {
width: 4fr;
margin-right: 1;
}
Popup Button {
width: 1fr;
min-width: 5;
}
"""
class Changed(Message):
def __init__(self, value: str) -> None:
super().__init__()
self.value = value
class Closed(Message):
pass
def __init__(self, prompt, help_md):
self.prompt = prompt
self.help_md = help_md
super().__init__(classes="offscreen")
def compose(self) -> ComposeResult:
yield Static(Markdown(self.help_md))
with Horizontal():
yield Input(placeholder=self.prompt)
yield Button("Close", variant="primary")
def on_input_submitted(self, event: Input.Submitted) -> None:
event.stop()
self.post_message(self.Changed(event.value))
def on_button_pressed(self, event: Button.Pressed) -> None:
event.stop()
self.post_message(self.Closed())
FILTER_MESSAGE = """
## Filter Evenets
- Search terms are **case-insensitive**
- Terms are matched using a logical **AND**
- To **exclude** a term, prefix with a **hypen** (**-**)
- **Substrings** in both keys and values are searched
"""
class FilterPopup(Popup):
# Class needed for the textual "magic" on_filter_popup_changed handler
class Changed(Popup.Changed):
pass
# Class needed for the textual "magic" on_filter_popup_closed handler
class Closed(Popup.Closed):
pass
def __init__(self):
super().__init__("Enter expression then press ENTER", FILTER_MESSAGE)
EXPORT_MESSAGE = """
## Export Events
- Events matching the current filter are exported
- If no filter is specified, then all are exported
- Events exported as JSON
"""
class ExportPopup(Popup):
# Class needed for the textual "magic" on_export_popup_changed handler
class Changed(Popup.Changed):
pass
# Class needed for the textual "magic" on_export_popup_closed handler
class Closed(Popup.Closed):
pass
def __init__(self):
super().__init__("Enter filename then press ENTER", EXPORT_MESSAGE)
class RowTable(Container):
DEFAULT_CSS = """
RowTable > DataTable {
border: solid $accent-lighten-2;
border-title-align: left;
height: 100%;
}
RowTable > DataTable:focus {
border: solid $secondary;
}
RowTable > DataTable > .datatable--header {
text-style: bold;
color: $accent-lighten-2;
background: black 0%;
}
RowTable > DataTable > .datatable--cursor {
color: $accent-lighten-2;
text-style: bold;
background: black 0%;
}
RowTable > DataTable > .datatable--hover {
background: black 0%;
}
RowTable > DataTable > .datatable--header-hover {
color: $accent-lighten-2;
background: black 0%;
}
"""
contents = reactive([], always_update=True)
class SelectionChanged(Message):
def __init__(self, row_key):
self.row_key = row_key
super().__init__()
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
event.stop()
event.control.border_subtitle = (
f"{event.cursor_row + 1} of {len(event.control.rows)}"
)
self.post_message(self.SelectionChanged(event.row_key))
def __init__(self, *col_names):
self.col_names = col_names
super().__init__()
def compose(self) -> ComposeResult:
yield DataTable()
def focus(self):
self.query_one(DataTable).focus()
def on_mount(self):
dt = self.query_one(DataTable)
dt.cursor_type = "row"
dt.add_columns(*self.col_names)
class UserTable(RowTable):
DEFAULT_CSS = """
UserTable {
height: 1fr;
}
"""
# Class needed for the textual "magic" on_user_table_selection_changed handler
class SelectionChanged(RowTable.SelectionChanged):
pass
def __init__(self):
super().__init__("Principal")
def on_mount(self):
dt = self.query_one(DataTable)
dt.show_header = False
dt.border_title = "Users"
def watch_contents(self, users):
dt = self.query_one(DataTable)
dt.clear()
for user in users:
dt.add_row(user, key=user)
dt.scroll_home()
dt.border_subtitle = f"0 of {len(users)}"
class EventTable(RowTable):
DEFAULT_CSS = """
EventTable {
height: 2fr;
}
"""
# Class needed for the textual "magic" on_event_table_selection_changed handler
class SelectionChanged(RowTable.SelectionChanged):
pass
def __init__(self):
super().__init__("Time", "Source", "Event", "Error?")
def on_mount(self):
self.query_one(DataTable).border_title = "Events"
def watch_contents(self, events):
dt = self.query_one(DataTable)
dt.clear()
for event in events:
row = event.to_row()
if event.has_error():
error = dt.app.get_css_variables()["error"]
row = [f"[{error}]{c}[/]" for c in row]
dt.add_row(*row, key=event.event_id())
dt.scroll_home()
dt.border_subtitle = f"0 of {len(events)}"
class EventViewer(App):
CSS = """
Screen {
align: center bottom;
overflow: hidden;
layers: base filter export;
}
FilterPopup {
layer: filter;
}
ExportPopup {
layer: export;
}
#main {
layout: horizontal;
}
.vertical #main {
layout: vertical;
}
#left {
width: 1fr;
}
.vertical #left {
height: 2fr;
}
.hidden {
display: none;
}
"""
TITLE = "CloudTrail Viewer"
BINDINGS = [
("q", "quit", "Quit"),
("c", "copy", "Copy"),
("e", "export_popup", "Export"),
("f", "filter_popup", "Filter"),
("l", "toggle_layout", "Layout"),
("u", "toggle_users", "Users"),
("d", "toggle_dark", "Toggle dark mode"),
]
total_count = reactive(0)
filtered_count = reactive(0)
theme = {
"dark": Syntax.get_theme("dracula"),
"light": Syntax.get_theme("friendly"),
}
def __init__(self, events):
super().__init__()
self.events = Events(events)
def compose(self) -> ComposeResult:
yield FilterPopup() # Hidden off-screen initially
yield ExportPopup() # Hidden off-screen initially
yield Header()
yield Horizontal(
Vertical(UserTable(), EventTable(), id="left"),
EventDetail(),
id="main",
)
yield Footer()
def on_mount(self):
self.query_one(Header).tall = True
self.query_one(FilterPopup).disabled = True
self.query_one(ExportPopup).disabled = True
self.total_count = self.filtered_count = len(self.events.all())
self.populate_data_tables()
def populate_data_tables(self, focus=True):
ut = self.query_one(UserTable)
et = self.query_one(EventTable)
ed = self.query_one(EventDetail)
if not self.events.all():
ut.contents = et.contents = []
ed.event = None
elif not ut.has_class("hidden"):
ut.contents = sorted(self.events.users(), key=str.lower)
if focus:
ut.focus()
else:
et.contents = self.events.all()
if focus:
et.focus()
def action_toggle_dark(self):
self.dark = not self.dark
# In EventDetail we define "event" as reactive with always_update,
# so this will force watch_event to be called which redraws the JSON
# with the correct theme. We need to do this as the content of the
# Static widget with our JSON is colored by rich which does not know
# about dark mode. So when user toggles, we need to syntax highlight
# the JSON with an appropriate theme.
ed = self.query_one(EventDetail)
ed.event = ed.event
def action_toggle_layout(self):
self.toggle_class("vertical")
def action_copy(self):
ed = self.query_one(EventDetail)
if ed.event:
pyperclip.copy(ed.event.to_json())
def action_filter_popup(self):
self.show_popup(FilterPopup)
def action_export_popup(self):
self.show_popup(ExportPopup)
def action_toggle_users(self):
ut = self.query_one(UserTable)
ut.toggle_class("hidden")
self.populate_data_tables()
def dismiss_popup(self, name):
p = self.query_one(name)
p.disabled = True
p.add_class("offscreen")
self.set_focus(self.original_focus)
self.refresh(repaint=True, layout=True)
def show_popup(self, name):
self.original_focus = self.query_one("*:focus")
p = self.query_one(name)
p.disabled = False
p.remove_class("offscreen")
p.query_one(Input).focus()
def on_filter_popup_closed(self, _) -> None:
self.dismiss_popup(FilterPopup)
def on_filter_popup_changed(self, event: FilterPopup.Changed) -> None:
# Only update the UI if the expression is different
if event.value != self.events.filter_expr:
self.filtered_count = self.events.filter(event.value)
self.query_one(EventDetail).filter_expr = event.value
self.populate_data_tables(focus=False)
self.dismiss_popup(FilterPopup)
def on_export_popup_closed(self, _) -> None:
self.dismiss_popup(ExportPopup)
def on_export_popup_changed(self, event: ExportPopup.Changed) -> None:
try:
with open(event.value, "w") as out:
json.dump(
[e.event for e in self.events.all()],
default=str,
indent=4,
fp=out,
)
self.dismiss_popup(ExportPopup)
except Exception:
self.bell()
def on_user_table_selection_changed(self, message: UserTable.SelectionChanged):
et = self.query_one(EventTable)
user = message.row_key
if user:
et.contents = self.events.by_user(user)
def on_event_table_selection_changed(self, message: EventTable.SelectionChanged):
ed = self.query_one(EventDetail)
event_key = message.row_key
if event_key:
event = self.events.by_id(event_key)
ed.event = event
def watch_total_count(self):
self.update_sub_title()
def watch_filtered_count(self):
self.update_sub_title()
def update_sub_title(self):
s = f"{self.total_count} events loaded"
if self.filtered_count != self.total_count:
s += f", {self.filtered_count} matched"
self.sub_title = s
class EventDetail(Container, can_focus=True):
DEFAULT_CSS = """
EventDetail {
width: 1fr;
overflow: auto scroll;
border: solid $accent-lighten-2;
border-title-align: left;
}
EventDetail:focus {
border: solid $secondary;
}
.vertical EventDetail {
height: 2fr;
}
EventDetail > Static {
width: auto;
min-width: 1fr;
}
"""
# We need always update for toggling of themes to work.
event: reactive[Optional[_UserIdentityType]] = reactive[
Optional[_UserIdentityType]
](None, always_update=True)
def __init__(self):
super().__init__()
self.filter_expr = ""
def on_mount(self):
self.border_title = "Event Detail"
def compose(self) -> ComposeResult:
yield Static()
def watch_event(self, event):
content = event.to_json() if event else ""
matching_lines = self._find_matching_lines(content)
self.query_one(Static).update(
Syntax(
content,
"json",
theme=EventViewer.theme["dark" if self.app.dark else "light"],
highlight_lines=matching_lines,
line_numbers=True,
)
)
def _find_matching_lines(self, content):
terms = [t.lower() for t in self.filter_expr.split() if not t.startswith("-")]
matching_lines = set()
for n, line in enumerate(content.split("\n"), start=1):
for term in terms:
if term in line.lower():
matching_lines.add(n)
return matching_lines
Classes
class CLICommand (regions, start=None, end=None, attributes=None, max_events=1000, interactive=False)
-
Displays the last CloudTrail events in an account.
Expand source code
class CLICommand(RegionalCommand): """Displays the last CloudTrail events in an account.""" @classmethod def regional_from_cli(cls, parser, argv, cfg): time_spec = parser.add_argument_group("Time specification") time_spec.add_argument( "--hours", metavar="N", type=int, default=cfg("hours", type=Int), help="retrieve the last N hours of events", ) time_spec.add_argument( "--start", type=_isodate, help="lookup events starting at YYYY-MM-DDTHH:MM:SS-00:00", ) time_spec.add_argument( "--end", type=_isodate, help="lookup events ending at YYYY-MM-DDTHH:MM:SS-00:00", ) parser.add_argument( "--max-events", metavar="N", type=int, default=cfg("max_events", type=Int, default=1000), help="limit # of events retrieved", ) filters = parser.add_argument_group("Event filters") mut_excl = filters.add_mutually_exclusive_group() mut_excl.add_argument( "--all", action="store_true", default=cfg("all", type=Bool, default=False), help="include read-only events", ) mut_excl.add_argument( "--console", action="store_true", default=cfg("console", type=Bool, default=False), help="include only ConsoleLogin events", ) mut_excl.add_argument( "--attribute", "-a", dest="attributes", action=AppendAttributeValuePair, default=cfg("attributes", type=Dict(Str, List(Str)), default={}), help="filter using attribute in form of ATTR=VALUE", ) tui = parser.add_argument_group("TUI options") tui.add_argument( "--interactive", "-i", action="store_true", default=cfg("interactive", type=Bool, default=False), help="enter interactive mode to view results", ) args = parser.parse_args(argv) # If user doesn't specify any filters, then exclude the read-only # events as there are far too many of these typically. While our # argument parser can support multipe key and values, the AWS # CloudTrail API is lacking considerably in ability to specify # filters. One can only use a single lookup attribute and that # attribute can only have a single value. We allow the user to # explicity set their own filter or use one of our the shorthand # filters such as --all or --console. if not (args.all or args.console or args.attributes): args.attributes["ReadOnly"] = ["false"] elif args.console: args.attributes["EventName"] = ["ConsoleLogin"] elif len(args.attributes) > 1: parser.error("only one lookup attribute may be used per AWS") elif any(len(v) > 1 for v in args.attributes.values()): parser.error("only one lookup value may be specified per AWS") elif any(a not in LOOKUP_ATTRIBUTES for a in args.attributes): parser.error( f"invalid attribute, must be one of {', '.join(LOOKUP_ATTRIBUTES)}" ) # If no time spec flags provided, default to last of 1 hour of events. if not (args.hours or args.start or args.end): args.hours = 1 # If only --hours was specified, then compute a start and end time as # our CLICommand doesn't support --last. if args.hours and not (args.start or args.end): args.end = datetime.now(timezone.utc) args.start = args.end - timedelta(hours=args.hours) elif args.hours and (args.start or args.end): parser.error("must specify either --hours OR --start/--end flags") elif not (args.start and args.end): parser.error("must specify both --start and --end flags") del args.all del args.hours del args.console return cls(**vars(args)) def __init__( self, regions, start=None, end=None, attributes=None, max_events=1000, interactive=False, ): super().__init__(regions) # Event settings self.start = start self.end = end self.max_events = max_events self.attributes = {} if attributes is None else attributes # TUI settings self.interactive = interactive # List of all events in reverse chronological order. We wouldn't have # to keep this list if event timestamps were more granular than a # second. Given that they are not, it's not possible to accurately # reconstruct the series of events from a dict of events by user. self.events = [] def pre_hook(self): print( f"Loading events, {self.max_events} max per acct/region, use --max-events to change", file=sys.stderr, ) if not self.interactive: init() # colorama only needed for the noninteractive version def regional_execute(self, session, acct, region): ct = session.client("cloudtrail", region_name=region) events = self._retrieve_events(ct) if self.interactive: return events cf_map = defaultdict(lambda: next(COLOR_FUNCTIONS)) return "".join(cf_map[e.username()](f"{acct}/{region}: {e}\n") for e in events) def _retrieve_events(self, ct): events = [] event_ids = set() for page in _lookup_events( ct, start=self.start, end=self.end, attrs=self.attributes ): for event in page["Events"]: event_id = event["EventId"] # Check to make sure AWS is not returning duplicate events. # I've seen this happen sometimes, so let's ignore the dups. if event_id in event_ids: continue event_ids.add(event_id) event["CloudTrailEvent"] = json.loads(event["CloudTrailEvent"]) events.append(_UserIdentityType.new(event)) if self.interactive: print(".", end="", file=sys.stderr, flush=True) if len(events) >= self.max_events: return events return events def regional_collect_results(self, acct, region, get_result): if not self.interactive: super().regional_collect_results(acct, region, get_result) return self.events.append(get_result()) def post_hook(self): if not self.interactive: return # Let's fire up the TUI! app = EventViewer(list(chain(*self.events))) app.run()
Ancestors
Inherited members
class Events (events)
-
Internal representation of the list of filtered events in TUI.
Expand source code
class Events: """Internal representation of the list of filtered events in TUI.""" def __init__(self, events): self.unfiltered_events = events self.filter_expr = "" self._load_events(events) def _load_events(self, events): # Need to keep events as a list as we cannot recreate it # from the values of the event_by_user or we lose the # order of the events as the timestamp is not granular. self.events = events self.events_by_key = {e.event_id(): e for e in self.events} self.events_by_user = defaultdict(list) for e in self.events: self.events_by_user[e.username()].append(e) def all(self): return self.events def users(self): return self.events_by_user.keys() def by_user(self, user): return self.events_by_user.get(user, []) def by_id(self, event_id) -> Optional[_UserIdentityType]: return self.events_by_key.get(event_id) def filter(self, expr): if not expr: # empty string? events = self.unfiltered_events else: events = _filter_events(self.unfiltered_events, expr) self.filter_expr = expr self._load_events(events) return len(self.events)
Methods
def all(self)
-
Expand source code
def all(self): return self.events
def users(self)
-
Expand source code
def users(self): return self.events_by_user.keys()
def by_user(self, user)
-
Expand source code
def by_user(self, user): return self.events_by_user.get(user, [])
def by_id(self, event_id)
-
Expand source code
def by_id(self, event_id) -> Optional[_UserIdentityType]: return self.events_by_key.get(event_id)
def filter(self, expr)
-
Expand source code
def filter(self, expr): if not expr: # empty string? events = self.unfiltered_events else: events = _filter_events(self.unfiltered_events, expr) self.filter_expr = expr self._load_events(events) return len(self.events)
class Popup (prompt, help_md)
-
An offscreen popup for a single input box.
Initialize a Widget.
Args
*children
- Child widgets.
name
- The name of the widget.
id
- The ID of the widget in the DOM.
classes
- The CSS classes for the widget.
disabled
- Whether the widget is disabled or not.
Expand source code
class Popup(Container): """An offscreen popup for a single input box.""" DEFAULT_CSS = """ Popup { transition: offset 500ms in_out_cubic; padding: 0 2 1 2; width: 65; height: auto; background: $panel; color: $text; } Popup:focus-within { offset: 0 0; } Popup.offscreen { offset-y: 100%; } Popup Static { padding: 0 2 0 2; } Popup Horizontal { padding: 1 2 0 2; height: auto; } Popup Input { width: 4fr; margin-right: 1; } Popup Button { width: 1fr; min-width: 5; } """ class Changed(Message): def __init__(self, value: str) -> None: super().__init__() self.value = value class Closed(Message): pass def __init__(self, prompt, help_md): self.prompt = prompt self.help_md = help_md super().__init__(classes="offscreen") def compose(self) -> ComposeResult: yield Static(Markdown(self.help_md)) with Horizontal(): yield Input(placeholder=self.prompt) yield Button("Close", variant="primary") def on_input_submitted(self, event: Input.Submitted) -> None: event.stop() self.post_message(self.Changed(event.value)) def on_button_pressed(self, event: Button.Pressed) -> None: event.stop() self.post_message(self.Closed())
Ancestors
- textual.containers.Container
- textual.widget.Widget
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Subclasses
Class variables
var DEFAULT_CSS
var Changed
-
Base class for a message.
var Closed
-
Base class for a message.
var can_focus
var can_focus_children
Methods
def compose(self)
-
Called by Textual to create child widgets.
Extend this to build a UI.
Example
def compose(self) -> ComposeResult: yield Header() yield Container( Tree(), Viewer() ) yield Footer()
Expand source code
def compose(self) -> ComposeResult: yield Static(Markdown(self.help_md)) with Horizontal(): yield Input(placeholder=self.prompt) yield Button("Close", variant="primary")
def on_input_submitted(self, event)
-
Expand source code
def on_input_submitted(self, event: Input.Submitted) -> None: event.stop() self.post_message(self.Changed(event.value))
-
Expand source code
def on_button_pressed(self, event: Button.Pressed) -> None: event.stop() self.post_message(self.Closed())
class FilterPopup
-
An offscreen popup for a single input box.
Initialize a Widget.
Args
*children
- Child widgets.
name
- The name of the widget.
id
- The ID of the widget in the DOM.
classes
- The CSS classes for the widget.
disabled
- Whether the widget is disabled or not.
Expand source code
class FilterPopup(Popup): # Class needed for the textual "magic" on_filter_popup_changed handler class Changed(Popup.Changed): pass # Class needed for the textual "magic" on_filter_popup_closed handler class Closed(Popup.Closed): pass def __init__(self): super().__init__("Enter expression then press ENTER", FILTER_MESSAGE)
Ancestors
- Popup
- textual.containers.Container
- textual.widget.Widget
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Class variables
var can_focus
var can_focus_children
Inherited members
class ExportPopup
-
An offscreen popup for a single input box.
Initialize a Widget.
Args
*children
- Child widgets.
name
- The name of the widget.
id
- The ID of the widget in the DOM.
classes
- The CSS classes for the widget.
disabled
- Whether the widget is disabled or not.
Expand source code
class ExportPopup(Popup): # Class needed for the textual "magic" on_export_popup_changed handler class Changed(Popup.Changed): pass # Class needed for the textual "magic" on_export_popup_closed handler class Closed(Popup.Closed): pass def __init__(self): super().__init__("Enter filename then press ENTER", EXPORT_MESSAGE)
Ancestors
- Popup
- textual.containers.Container
- textual.widget.Widget
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Class variables
var can_focus
var can_focus_children
Inherited members
class RowTable (*col_names)
-
Simple container widget, with vertical layout.
Initialize a Widget.
Args
*children
- Child widgets.
name
- The name of the widget.
id
- The ID of the widget in the DOM.
classes
- The CSS classes for the widget.
disabled
- Whether the widget is disabled or not.
Expand source code
class RowTable(Container): DEFAULT_CSS = """ RowTable > DataTable { border: solid $accent-lighten-2; border-title-align: left; height: 100%; } RowTable > DataTable:focus { border: solid $secondary; } RowTable > DataTable > .datatable--header { text-style: bold; color: $accent-lighten-2; background: black 0%; } RowTable > DataTable > .datatable--cursor { color: $accent-lighten-2; text-style: bold; background: black 0%; } RowTable > DataTable > .datatable--hover { background: black 0%; } RowTable > DataTable > .datatable--header-hover { color: $accent-lighten-2; background: black 0%; } """ contents = reactive([], always_update=True) class SelectionChanged(Message): def __init__(self, row_key): self.row_key = row_key super().__init__() def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: event.stop() event.control.border_subtitle = ( f"{event.cursor_row + 1} of {len(event.control.rows)}" ) self.post_message(self.SelectionChanged(event.row_key)) def __init__(self, *col_names): self.col_names = col_names super().__init__() def compose(self) -> ComposeResult: yield DataTable() def focus(self): self.query_one(DataTable).focus() def on_mount(self): dt = self.query_one(DataTable) dt.cursor_type = "row" dt.add_columns(*self.col_names)
Ancestors
- textual.containers.Container
- textual.widget.Widget
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Subclasses
Class variables
var DEFAULT_CSS
var SelectionChanged
-
Base class for a message.
var can_focus
var can_focus_children
Instance variables
var contents
-
Create a reactive attribute.
Args
default
- A default value or callable that returns a default.
layout
- Perform a layout on change.
repaint
- Perform a repaint on change.
init
- Call watchers on initialize (post mount).
always_update
- Call watchers even when the new value equals the old value.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType: internal_name = self.internal_name if not hasattr(obj, internal_name): self._initialize_reactive(obj, self.name) if hasattr(obj, self.compute_name): value: ReactiveType old_value = getattr(obj, internal_name) _rich_traceback_omit = True value = getattr(obj, self.compute_name)() setattr(obj, internal_name, value) self._check_watchers(obj, self.name, old_value) return value else: return getattr(obj, internal_name)
Methods
def on_data_table_row_highlighted(self, event)
-
Expand source code
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: event.stop() event.control.border_subtitle = ( f"{event.cursor_row + 1} of {len(event.control.rows)}" ) self.post_message(self.SelectionChanged(event.row_key))
def compose(self)
-
Called by Textual to create child widgets.
Extend this to build a UI.
Example
def compose(self) -> ComposeResult: yield Header() yield Container( Tree(), Viewer() ) yield Footer()
Expand source code
def compose(self) -> ComposeResult: yield DataTable()
def focus(self)
-
Give focus to this widget.
Args
scroll_visible
- Scroll parent to make this widget visible.
Returns
The
Widget
instance.Expand source code
def focus(self): self.query_one(DataTable).focus()
def on_mount(self)
-
Expand source code
def on_mount(self): dt = self.query_one(DataTable) dt.cursor_type = "row" dt.add_columns(*self.col_names)
class UserTable
-
Simple container widget, with vertical layout.
Initialize a Widget.
Args
*children
- Child widgets.
name
- The name of the widget.
id
- The ID of the widget in the DOM.
classes
- The CSS classes for the widget.
disabled
- Whether the widget is disabled or not.
Expand source code
class UserTable(RowTable): DEFAULT_CSS = """ UserTable { height: 1fr; } """ # Class needed for the textual "magic" on_user_table_selection_changed handler class SelectionChanged(RowTable.SelectionChanged): pass def __init__(self): super().__init__("Principal") def on_mount(self): dt = self.query_one(DataTable) dt.show_header = False dt.border_title = "Users" def watch_contents(self, users): dt = self.query_one(DataTable) dt.clear() for user in users: dt.add_row(user, key=user) dt.scroll_home() dt.border_subtitle = f"0 of {len(users)}"
Ancestors
- RowTable
- textual.containers.Container
- textual.widget.Widget
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Class variables
var DEFAULT_CSS
var can_focus
var can_focus_children
Methods
def on_mount(self)
-
Expand source code
def on_mount(self): dt = self.query_one(DataTable) dt.show_header = False dt.border_title = "Users"
def watch_contents(self, users)
-
Expand source code
def watch_contents(self, users): dt = self.query_one(DataTable) dt.clear() for user in users: dt.add_row(user, key=user) dt.scroll_home() dt.border_subtitle = f"0 of {len(users)}"
Inherited members
class EventTable
-
Simple container widget, with vertical layout.
Initialize a Widget.
Args
*children
- Child widgets.
name
- The name of the widget.
id
- The ID of the widget in the DOM.
classes
- The CSS classes for the widget.
disabled
- Whether the widget is disabled or not.
Expand source code
class EventTable(RowTable): DEFAULT_CSS = """ EventTable { height: 2fr; } """ # Class needed for the textual "magic" on_event_table_selection_changed handler class SelectionChanged(RowTable.SelectionChanged): pass def __init__(self): super().__init__("Time", "Source", "Event", "Error?") def on_mount(self): self.query_one(DataTable).border_title = "Events" def watch_contents(self, events): dt = self.query_one(DataTable) dt.clear() for event in events: row = event.to_row() if event.has_error(): error = dt.app.get_css_variables()["error"] row = [f"[{error}]{c}[/]" for c in row] dt.add_row(*row, key=event.event_id()) dt.scroll_home() dt.border_subtitle = f"0 of {len(events)}"
Ancestors
- RowTable
- textual.containers.Container
- textual.widget.Widget
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Class variables
var DEFAULT_CSS
var can_focus
var can_focus_children
Methods
def on_mount(self)
-
Expand source code
def on_mount(self): self.query_one(DataTable).border_title = "Events"
def watch_contents(self, events)
-
Expand source code
def watch_contents(self, events): dt = self.query_one(DataTable) dt.clear() for event in events: row = event.to_row() if event.has_error(): error = dt.app.get_css_variables()["error"] row = [f"[{error}]{c}[/]" for c in row] dt.add_row(*row, key=event.event_id()) dt.scroll_home() dt.border_subtitle = f"0 of {len(events)}"
Inherited members
class EventViewer (events)
-
The base class for Textual Applications.
Args
driver_class
- Driver class or
None
to auto-detect. This will be used by some Textual tools. css_path
- Path to CSS or
None
to use theCSS_PATH
class variable. To load multiple CSS files, pass a list of strings or paths which will be loaded in order. watch_css
- Reload CSS if the files changed. This is set automatically if you are using
textual run
with thedev
switch.
Raises
CssPathError
- When the supplied CSS path(s) are an unexpected type.
Expand source code
class EventViewer(App): CSS = """ Screen { align: center bottom; overflow: hidden; layers: base filter export; } FilterPopup { layer: filter; } ExportPopup { layer: export; } #main { layout: horizontal; } .vertical #main { layout: vertical; } #left { width: 1fr; } .vertical #left { height: 2fr; } .hidden { display: none; } """ TITLE = "CloudTrail Viewer" BINDINGS = [ ("q", "quit", "Quit"), ("c", "copy", "Copy"), ("e", "export_popup", "Export"), ("f", "filter_popup", "Filter"), ("l", "toggle_layout", "Layout"), ("u", "toggle_users", "Users"), ("d", "toggle_dark", "Toggle dark mode"), ] total_count = reactive(0) filtered_count = reactive(0) theme = { "dark": Syntax.get_theme("dracula"), "light": Syntax.get_theme("friendly"), } def __init__(self, events): super().__init__() self.events = Events(events) def compose(self) -> ComposeResult: yield FilterPopup() # Hidden off-screen initially yield ExportPopup() # Hidden off-screen initially yield Header() yield Horizontal( Vertical(UserTable(), EventTable(), id="left"), EventDetail(), id="main", ) yield Footer() def on_mount(self): self.query_one(Header).tall = True self.query_one(FilterPopup).disabled = True self.query_one(ExportPopup).disabled = True self.total_count = self.filtered_count = len(self.events.all()) self.populate_data_tables() def populate_data_tables(self, focus=True): ut = self.query_one(UserTable) et = self.query_one(EventTable) ed = self.query_one(EventDetail) if not self.events.all(): ut.contents = et.contents = [] ed.event = None elif not ut.has_class("hidden"): ut.contents = sorted(self.events.users(), key=str.lower) if focus: ut.focus() else: et.contents = self.events.all() if focus: et.focus() def action_toggle_dark(self): self.dark = not self.dark # In EventDetail we define "event" as reactive with always_update, # so this will force watch_event to be called which redraws the JSON # with the correct theme. We need to do this as the content of the # Static widget with our JSON is colored by rich which does not know # about dark mode. So when user toggles, we need to syntax highlight # the JSON with an appropriate theme. ed = self.query_one(EventDetail) ed.event = ed.event def action_toggle_layout(self): self.toggle_class("vertical") def action_copy(self): ed = self.query_one(EventDetail) if ed.event: pyperclip.copy(ed.event.to_json()) def action_filter_popup(self): self.show_popup(FilterPopup) def action_export_popup(self): self.show_popup(ExportPopup) def action_toggle_users(self): ut = self.query_one(UserTable) ut.toggle_class("hidden") self.populate_data_tables() def dismiss_popup(self, name): p = self.query_one(name) p.disabled = True p.add_class("offscreen") self.set_focus(self.original_focus) self.refresh(repaint=True, layout=True) def show_popup(self, name): self.original_focus = self.query_one("*:focus") p = self.query_one(name) p.disabled = False p.remove_class("offscreen") p.query_one(Input).focus() def on_filter_popup_closed(self, _) -> None: self.dismiss_popup(FilterPopup) def on_filter_popup_changed(self, event: FilterPopup.Changed) -> None: # Only update the UI if the expression is different if event.value != self.events.filter_expr: self.filtered_count = self.events.filter(event.value) self.query_one(EventDetail).filter_expr = event.value self.populate_data_tables(focus=False) self.dismiss_popup(FilterPopup) def on_export_popup_closed(self, _) -> None: self.dismiss_popup(ExportPopup) def on_export_popup_changed(self, event: ExportPopup.Changed) -> None: try: with open(event.value, "w") as out: json.dump( [e.event for e in self.events.all()], default=str, indent=4, fp=out, ) self.dismiss_popup(ExportPopup) except Exception: self.bell() def on_user_table_selection_changed(self, message: UserTable.SelectionChanged): et = self.query_one(EventTable) user = message.row_key if user: et.contents = self.events.by_user(user) def on_event_table_selection_changed(self, message: EventTable.SelectionChanged): ed = self.query_one(EventDetail) event_key = message.row_key if event_key: event = self.events.by_id(event_key) ed.event = event def watch_total_count(self): self.update_sub_title() def watch_filtered_count(self): self.update_sub_title() def update_sub_title(self): s = f"{self.total_count} events loaded" if self.filtered_count != self.total_count: s += f", {self.filtered_count} matched" self.sub_title = s
Ancestors
- textual.app.App
- typing.Generic
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Class variables
var CSS
var TITLE
var BINDINGS
var theme
Instance variables
var total_count
-
Create a reactive attribute.
Args
default
- A default value or callable that returns a default.
layout
- Perform a layout on change.
repaint
- Perform a repaint on change.
init
- Call watchers on initialize (post mount).
always_update
- Call watchers even when the new value equals the old value.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType: internal_name = self.internal_name if not hasattr(obj, internal_name): self._initialize_reactive(obj, self.name) if hasattr(obj, self.compute_name): value: ReactiveType old_value = getattr(obj, internal_name) _rich_traceback_omit = True value = getattr(obj, self.compute_name)() setattr(obj, internal_name, value) self._check_watchers(obj, self.name, old_value) return value else: return getattr(obj, internal_name)
var filtered_count
-
Create a reactive attribute.
Args
default
- A default value or callable that returns a default.
layout
- Perform a layout on change.
repaint
- Perform a repaint on change.
init
- Call watchers on initialize (post mount).
always_update
- Call watchers even when the new value equals the old value.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType: internal_name = self.internal_name if not hasattr(obj, internal_name): self._initialize_reactive(obj, self.name) if hasattr(obj, self.compute_name): value: ReactiveType old_value = getattr(obj, internal_name) _rich_traceback_omit = True value = getattr(obj, self.compute_name)() setattr(obj, internal_name, value) self._check_watchers(obj, self.name, old_value) return value else: return getattr(obj, internal_name)
Methods
def compose(self)
-
Yield child widgets for a container.
This method should be implemented in a subclass.
Expand source code
def compose(self) -> ComposeResult: yield FilterPopup() # Hidden off-screen initially yield ExportPopup() # Hidden off-screen initially yield Header() yield Horizontal( Vertical(UserTable(), EventTable(), id="left"), EventDetail(), id="main", ) yield Footer()
def on_mount(self)
-
Expand source code
def on_mount(self): self.query_one(Header).tall = True self.query_one(FilterPopup).disabled = True self.query_one(ExportPopup).disabled = True self.total_count = self.filtered_count = len(self.events.all()) self.populate_data_tables()
def populate_data_tables(self, focus=True)
-
Expand source code
def populate_data_tables(self, focus=True): ut = self.query_one(UserTable) et = self.query_one(EventTable) ed = self.query_one(EventDetail) if not self.events.all(): ut.contents = et.contents = [] ed.event = None elif not ut.has_class("hidden"): ut.contents = sorted(self.events.users(), key=str.lower) if focus: ut.focus() else: et.contents = self.events.all() if focus: et.focus()
def action_toggle_dark(self)
-
An action to toggle dark mode.
Expand source code
def action_toggle_dark(self): self.dark = not self.dark # In EventDetail we define "event" as reactive with always_update, # so this will force watch_event to be called which redraws the JSON # with the correct theme. We need to do this as the content of the # Static widget with our JSON is colored by rich which does not know # about dark mode. So when user toggles, we need to syntax highlight # the JSON with an appropriate theme. ed = self.query_one(EventDetail) ed.event = ed.event
def action_toggle_layout(self)
-
Expand source code
def action_toggle_layout(self): self.toggle_class("vertical")
def action_copy(self)
-
Expand source code
def action_copy(self): ed = self.query_one(EventDetail) if ed.event: pyperclip.copy(ed.event.to_json())
def action_filter_popup(self)
-
Expand source code
def action_filter_popup(self): self.show_popup(FilterPopup)
def action_export_popup(self)
-
Expand source code
def action_export_popup(self): self.show_popup(ExportPopup)
def action_toggle_users(self)
-
Expand source code
def action_toggle_users(self): ut = self.query_one(UserTable) ut.toggle_class("hidden") self.populate_data_tables()
def dismiss_popup(self, name)
-
Expand source code
def dismiss_popup(self, name): p = self.query_one(name) p.disabled = True p.add_class("offscreen") self.set_focus(self.original_focus) self.refresh(repaint=True, layout=True)
def show_popup(self, name)
-
Expand source code
def show_popup(self, name): self.original_focus = self.query_one("*:focus") p = self.query_one(name) p.disabled = False p.remove_class("offscreen") p.query_one(Input).focus()
def on_filter_popup_closed(self, _)
-
Expand source code
def on_filter_popup_closed(self, _) -> None: self.dismiss_popup(FilterPopup)
def on_filter_popup_changed(self, event)
-
Expand source code
def on_filter_popup_changed(self, event: FilterPopup.Changed) -> None: # Only update the UI if the expression is different if event.value != self.events.filter_expr: self.filtered_count = self.events.filter(event.value) self.query_one(EventDetail).filter_expr = event.value self.populate_data_tables(focus=False) self.dismiss_popup(FilterPopup)
def on_export_popup_closed(self, _)
-
Expand source code
def on_export_popup_closed(self, _) -> None: self.dismiss_popup(ExportPopup)
def on_export_popup_changed(self, event)
-
Expand source code
def on_export_popup_changed(self, event: ExportPopup.Changed) -> None: try: with open(event.value, "w") as out: json.dump( [e.event for e in self.events.all()], default=str, indent=4, fp=out, ) self.dismiss_popup(ExportPopup) except Exception: self.bell()
def on_user_table_selection_changed(self, message)
-
Expand source code
def on_user_table_selection_changed(self, message: UserTable.SelectionChanged): et = self.query_one(EventTable) user = message.row_key if user: et.contents = self.events.by_user(user)
def on_event_table_selection_changed(self, message)
-
Expand source code
def on_event_table_selection_changed(self, message: EventTable.SelectionChanged): ed = self.query_one(EventDetail) event_key = message.row_key if event_key: event = self.events.by_id(event_key) ed.event = event
def watch_total_count(self)
-
Expand source code
def watch_total_count(self): self.update_sub_title()
def watch_filtered_count(self)
-
Expand source code
def watch_filtered_count(self): self.update_sub_title()
def update_sub_title(self)
-
Expand source code
def update_sub_title(self): s = f"{self.total_count} events loaded" if self.filtered_count != self.total_count: s += f", {self.filtered_count} matched" self.sub_title = s
class EventDetail
-
Simple container widget, with vertical layout.
Initialize a Widget.
Args
*children
- Child widgets.
name
- The name of the widget.
id
- The ID of the widget in the DOM.
classes
- The CSS classes for the widget.
disabled
- Whether the widget is disabled or not.
Expand source code
class EventDetail(Container, can_focus=True): DEFAULT_CSS = """ EventDetail { width: 1fr; overflow: auto scroll; border: solid $accent-lighten-2; border-title-align: left; } EventDetail:focus { border: solid $secondary; } .vertical EventDetail { height: 2fr; } EventDetail > Static { width: auto; min-width: 1fr; } """ # We need always update for toggling of themes to work. event: reactive[Optional[_UserIdentityType]] = reactive[ Optional[_UserIdentityType] ](None, always_update=True) def __init__(self): super().__init__() self.filter_expr = "" def on_mount(self): self.border_title = "Event Detail" def compose(self) -> ComposeResult: yield Static() def watch_event(self, event): content = event.to_json() if event else "" matching_lines = self._find_matching_lines(content) self.query_one(Static).update( Syntax( content, "json", theme=EventViewer.theme["dark" if self.app.dark else "light"], highlight_lines=matching_lines, line_numbers=True, ) ) def _find_matching_lines(self, content): terms = [t.lower() for t in self.filter_expr.split() if not t.startswith("-")] matching_lines = set() for n, line in enumerate(content.split("\n"), start=1): for term in terms: if term in line.lower(): matching_lines.add(n) return matching_lines
Ancestors
- textual.containers.Container
- textual.widget.Widget
- textual.dom.DOMNode
- textual.message_pump.MessagePump
Class variables
var DEFAULT_CSS
var can_focus
var can_focus_children
Instance variables
var event
-
Create a reactive attribute.
Args
default
- A default value or callable that returns a default.
layout
- Perform a layout on change.
repaint
- Perform a repaint on change.
init
- Call watchers on initialize (post mount).
always_update
- Call watchers even when the new value equals the old value.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType: internal_name = self.internal_name if not hasattr(obj, internal_name): self._initialize_reactive(obj, self.name) if hasattr(obj, self.compute_name): value: ReactiveType old_value = getattr(obj, internal_name) _rich_traceback_omit = True value = getattr(obj, self.compute_name)() setattr(obj, internal_name, value) self._check_watchers(obj, self.name, old_value) return value else: return getattr(obj, internal_name)
Methods
def on_mount(self)
-
Expand source code
def on_mount(self): self.border_title = "Event Detail"
def compose(self)
-
Called by Textual to create child widgets.
Extend this to build a UI.
Example
def compose(self) -> ComposeResult: yield Header() yield Container( Tree(), Viewer() ) yield Footer()
Expand source code
def compose(self) -> ComposeResult: yield Static()
def watch_event(self, event)
-
Expand source code
def watch_event(self, event): content = event.to_json() if event else "" matching_lines = self._find_matching_lines(content) self.query_one(Static).update( Syntax( content, "json", theme=EventViewer.theme["dark" if self.app.dark else "light"], highlight_lines=matching_lines, line_numbers=True, ) )