Source code for boto_session_manager.manager

# -*- coding: utf-8 -*-

"""
Manage the underlying boto3 session and client.
"""

# standard library
import typing as T
import os
import json
import uuid
import contextlib
from pathlib import Path
from datetime import datetime, timezone, timedelta

# boto3 is NOT declared as a hard dependency in pyproject.toml on purpose.
# It releases almost daily, so pinning a range here would frequently conflict
# with whatever version the user (or their AWS runtime environment) already has.
# Instead we treat it as an implicit peer dependency and import defensively.
try:
    import boto3
    import boto3.session
    import botocore.session
except ImportError as e:  # pragma: no cover
    print("You probably need to install 'boto3' first.")

try:
    from botocore.credentials import (
        AssumeRoleCredentialFetcher,
        DeferredRefreshableCredentials,
    )
except ImportError as e:  # pragma: no cover
    print("The auto refreshable assume role session would not work.")

from .vendor.aws_sts import (
    mask_user_id,
    mask_aws_account_id,
    mask_iam_principal_arn,
    get_caller_identity,
    get_account_alias,
)

# modules from this project
from .services import AwsServiceEnum
from .clients import ClientMixin
from .sentinel import NOTHING, resolve_kwargs
from .exc import NoBotocoreCredentialError


if T.TYPE_CHECKING:  # pragma: no cover
    from botocore.client import BaseClient
    from boto3.resources.base import ServiceResource

try:
    PATH_DEFAULT_SNAPSHOT = Path.home().joinpath(".bsm-snapshot.json")
except Exception as e:  # pragma: no cover
    PATH_DEFAULT_SNAPSHOT = None


[docs] class BotoSesManager(ClientMixin): """ Boto3 session and client manager that use cache to create low level client. .. note:: boto3.session.Session is a static object that won't talk to AWS endpoint. also session.client("s3") won't talk to AWS endpoint right away. The authentication only happen when a concrete API request called. """ def __init__( self, aws_access_key_id: str | None = NOTHING, aws_secret_access_key: str | None = NOTHING, aws_session_token: str | None = NOTHING, region_name: str | None = NOTHING, botocore_session: T.Optional["botocore.session.Session"] = NOTHING, profile_name: str = NOTHING, default_client_kwargs: dict = NOTHING, expiration_time: datetime = NOTHING, ): self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.aws_session_token = aws_session_token self.region_name = region_name if botocore_session is not NOTHING: # pragma: no cover if not isinstance(botocore_session, botocore.session.Session): raise TypeError self.botocore_session: T.Optional["botocore.session.Session"] = botocore_session self.profile_name = profile_name self.expiration_time: datetime if expiration_time is NOTHING: self.expiration_time = datetime.now(timezone.utc) + timedelta(days=365) else: self.expiration_time = expiration_time if default_client_kwargs is NOTHING: default_client_kwargs = dict() self.default_client_kwargs = default_client_kwargs self._boto_ses_cache: T.Optional["boto3.session.Session"] = NOTHING self._client_cache: dict[str, "BaseClient"] = dict() self._resource_cache: dict[str, "ServiceResource"] = dict() self._aws_user_id_cache: str | None = NOTHING self._aws_account_id_cache: str | None = NOTHING self._principal_arn_cache: str | None = NOTHING self._aws_account_alias_cache: str | None = NOTHING self._aws_region_cache: str | None = NOTHING
[docs] def create_boto_ses(self) -> "boto3.session.Session": """ Create a new boto3 session object from the :class:`BotoSesManager`. """ return boto3.session.Session( **resolve_kwargs( aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, aws_session_token=self.aws_session_token, region_name=self.region_name, botocore_session=self.botocore_session, profile_name=self.profile_name, ) )
@property def boto_ses(self) -> "boto3.session.Session": """ Get boto3 session from metadata. This is a cached property. """ if self._boto_ses_cache is NOTHING: self._boto_ses_cache = self.create_boto_ses() return self._boto_ses_cache def _get_caller_identity(self): sts_client = self.get_client(AwsServiceEnum.STS) user_id, aws_account_id, arn = get_caller_identity(sts_client) self._aws_user_id_cache = user_id self._aws_account_id_cache = aws_account_id self._principal_arn_cache = arn @property def aws_account_user_id(self) -> str: """ Get current aws account user id of the boto session. This is a cached property. """ if self._aws_user_id_cache is NOTHING: # pragma: no cover self._get_caller_identity() return self._aws_user_id_cache @property def masked_aws_account_user_id(self) -> str: """ Get the masked current aws account user id of the boto session. """ return mask_user_id(self.aws_account_user_id) @property def aws_account_id(self) -> str: """ Get current aws account id of the boto session. This is a cached property. """ if self._aws_account_id_cache is NOTHING: # pragma: no cover self._get_caller_identity() return self._aws_account_id_cache @property def masked_aws_account_id(self) -> str: """ Get the masked current aws account id of the boto session. """ return mask_aws_account_id(self.aws_account_id) @property def principal_arn(self) -> str: """ Get current principal arn of the boto session. This is a cached property. """ if self._principal_arn_cache is NOTHING: # pragma: no cover self._get_caller_identity() return self._principal_arn_cache @property def masked_principal_arn(self) -> str: """ Get the masked principal arn of the boto session. """ return mask_iam_principal_arn(self.principal_arn) @property def aws_region(self) -> str: """ Get current aws region of the boto session. This is a cached property. """ if self._aws_region_cache is NOTHING: self._aws_region_cache = self.boto_ses.region_name return self._aws_region_cache @property def aws_account_alias(self) -> T.Optional[str]: """ Get the first aws account alias of the boto session. This is a cached property. """ if self._aws_account_alias_cache is NOTHING: self._aws_account_alias_cache = get_account_alias( self.get_client(AwsServiceEnum.IAM) ) return self._aws_account_alias_cache
[docs] def print_who_am_i(self, masked: bool = True): # pragma: no cover """ Print the boto session AWS Account and IAM principal information. """ if masked: print(f"User Id = {self.masked_aws_account_user_id}") print(f"AWS Account Id = {self.masked_aws_account_id}") print(f"Principal Arn = {self.masked_principal_arn}") else: print(f"User Id = {self.aws_account_user_id}") print(f"AWS Account Id = {self.aws_account_id}") print(f"Principal Arn = {self.principal_arn}") print(f"AWS Account Alias = {self.aws_account_alias}") print(f"AWS Region = {self.aws_region}")
[docs] def get_client( self, service_name: str, region_name: str = NOTHING, api_version: str = NOTHING, use_ssl: bool = True, verify: bool | str = NOTHING, endpoint_url: str = NOTHING, aws_access_key_id: str = NOTHING, aws_secret_access_key: str = NOTHING, aws_session_token: str = NOTHING, config=NOTHING, ) -> "BaseClient": """ Get aws boto client using cache. """ try: return self._client_cache[service_name] except KeyError: client_kwargs = resolve_kwargs( region_name=region_name, api_version=api_version, use_ssl=use_ssl, verify=verify, endpoint_url=endpoint_url, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, config=config, ) kwargs = dict(self.default_client_kwargs) kwargs.update(client_kwargs) client = self.boto_ses.client(service_name, **kwargs) self._client_cache[service_name] = client return client
[docs] def get_resource( self, service_name: str, region_name: str = NOTHING, api_version: str = NOTHING, use_ssl: bool = True, verify: bool | str = NOTHING, endpoint_url: str = NOTHING, aws_access_key_id: str = NOTHING, aws_secret_access_key: str = NOTHING, aws_session_token: str = NOTHING, config=NOTHING, ) -> "ServiceResource": """ Get aws boto service resource using cache. """ try: return self._resource_cache[service_name] except KeyError: resource_kwargs = resolve_kwargs( region_name=region_name, api_version=api_version, use_ssl=use_ssl, verify=verify, endpoint_url=endpoint_url, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, config=config, ) kwargs = dict(self.default_client_kwargs) kwargs.update(resource_kwargs) resource = self.boto_ses.resource(service_name, **kwargs) self._resource_cache[service_name] = resource return resource
[docs] def assume_role( self, role_arn: str, role_session_name: str = NOTHING, duration_seconds: int = 3600, tags: T.Optional[list[dict[str, str]]] = NOTHING, transitive_tag_keys: T.Optional[list[str]] = NOTHING, external_id: str = NOTHING, mfa_serial_number: str = NOTHING, mfa_token: str = NOTHING, source_identity: str = NOTHING, region_name: str = NOTHING, auto_refresh: bool = False, ) -> "BotoSesManager": """ Assume an IAM role, create another :class:`BotoSesManager` and return. :param auto_refresh: if True, the assumed role will be refreshed automatically. Note: this uses ``AssumeRoleCredentialFetcher`` and ``DeferredRefreshableCredentials`` from botocore, which are not public API officially supported by botocore. """ if role_session_name is NOTHING: role_session_name = uuid.uuid4().hex # if region_name is not specified, use the same region as the current session if region_name is NOTHING: region_name = self.aws_region # this branch cannot be tested regularly # it is tested in a separate integration test environment. if auto_refresh: # pragma: no cover botocore_session = self.boto_ses._session credentials = botocore_session.get_credentials() # the get_credentials() method can return None # raise error explicitly if not credentials: raise NoBotocoreCredentialError credential_fetcher = AssumeRoleCredentialFetcher( client_creator=botocore_session.create_client, source_credentials=credentials, role_arn=role_arn, extra_args=resolve_kwargs( RoleSessionName=role_session_name, DurationSeconds=duration_seconds, Tags=tags, TransitiveTagKeys=transitive_tag_keys, ExternalId=external_id, SerialNumber=mfa_serial_number, TokenCode=mfa_token, SourceIdentity=source_identity, ), ) assumed_role_credentials = DeferredRefreshableCredentials( refresh_using=credential_fetcher.fetch_credentials, method="assume-role", ) assumed_role_botocore_session: "botocore.session.Session" = ( botocore.session.get_session() ) assumed_role_botocore_session._credentials = assumed_role_credentials return BotoSesManager( botocore_session=assumed_role_botocore_session, region_name=region_name, expiration_time=datetime(2099, 12, 31, 23, 59, 59, tzinfo=timezone.utc), default_client_kwargs=self.default_client_kwargs, ) else: assume_role_kwargs = resolve_kwargs( RoleArn=role_arn, RoleSessionName=role_session_name, DurationSeconds=duration_seconds, Tags=tags, TransitiveTagKeys=transitive_tag_keys, external_id=external_id, SerialNumber=mfa_serial_number, TokenCode=mfa_token, SourceIdentity=source_identity, ) sts_client = self.get_client(AwsServiceEnum.STS) res = sts_client.assume_role(**assume_role_kwargs) expiration_time = res["Credentials"]["Expiration"] bsm = self.__class__( aws_access_key_id=res["Credentials"]["AccessKeyId"], aws_secret_access_key=res["Credentials"]["SecretAccessKey"], aws_session_token=res["Credentials"]["SessionToken"], region_name=region_name, expiration_time=expiration_time, default_client_kwargs=self.default_client_kwargs, ) return bsm
[docs] def is_expired(self, delta: int = 0) -> bool: """ Check if this boto session is expired. """ return ( datetime.now(timezone.utc) + timedelta(seconds=delta) >= self.expiration_time )
[docs] @contextlib.contextmanager def awscli(self): """ Temporarily set up environment variables to pass the boto session credential to AWS CLI. On exit the original environment is restored. Example:: import subprocess bsm = BotoSesManager(...) with bsm.awscli(): subprocess.run(["aws", "sts", "get-caller-identity"]) Reference: - https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html """ # save the existing env var state, and disable the existing env var mapper = { "AWS_ACCESS_KEY_ID": None, "AWS_SECRET_ACCESS_KEY": None, "AWS_SESSION_TOKEN": None, "AWS_REGION": self.aws_region, "AWS_PROFILE": None, } cred = self.boto_ses.get_credentials() # set environment variable for aws cli when you create this # boto session manager explicitly with ACCESS KEY and SECRET KEY if self.profile_name is not NOTHING: mapper["AWS_PROFILE"] = self.profile_name elif cred.token is None: mapper["AWS_ACCESS_KEY_ID"] = cred.access_key mapper["AWS_SECRET_ACCESS_KEY"] = cred.secret_key else: mapper["AWS_ACCESS_KEY_ID"] = cred.access_key mapper["AWS_SECRET_ACCESS_KEY"] = cred.secret_key mapper["AWS_SESSION_TOKEN"] = cred.token # get existing env var existing = {} for k, v in mapper.items(): existing[k] = os.environ.get(k) try: # set new env var for k, v in mapper.items(): # v = None means delete this env var if v is None: if k in os.environ: os.environ.pop(k) else: os.environ[k] = v yield self finally: # recover the original env var for k, v in existing.items(): # v = None means this env var not exists at begin if v is None: if k in os.environ: os.environ.pop(k) else: os.environ[k] = v
[docs] def to_snapshot(self) -> dict: """ Serialize the current session credentials (access key, secret key, optional session token, region) into a plain dict that can be persisted to disk with :meth:`temp_snapshot` and later restored with :meth:`from_snapshot` or :meth:`from_snapshot_file`. """ cred = self.boto_ses.get_credentials() snapshot = dict( region_name=self.aws_region, aws_access_key_id=cred.access_key, aws_secret_access_key=cred.secret_key, ) if cred.token: snapshot["aws_session_token"] = cred.token return snapshot
[docs] @classmethod def from_snapshot(cls, snapshot: dict): """ Create a :class:`BotoSesManager` from a snapshot dict previously produced by :meth:`to_snapshot`. """ return cls(**snapshot)
[docs] @classmethod def from_snapshot_file( cls, path: str | Path | None = PATH_DEFAULT_SNAPSHOT, ): """ Read a JSON snapshot file from *path* (default ``~/.bsm-snapshot.json``) and reconstruct a :class:`BotoSesManager`. Pair this with :meth:`temp_snapshot` to hand credentials across process boundaries. """ if path is None: # pragma: no cover raise EnvironmentError("your system may not support $HOME directory") path = Path(path) if not path.exists(): raise FileNotFoundError(f"Snapshot file not found: {path}") return cls.from_snapshot(json.loads(path.read_text()))
[docs] @contextlib.contextmanager def temp_snapshot( self, path: str | Path | None = PATH_DEFAULT_SNAPSHOT, ): """ Context manager that writes the current credentials to a JSON file (default ``~/.bsm-snapshot.json``) and deletes it on exit. **Why this exists:** when you use :meth:`awscli` to switch the environment to a *different* AWS account, child processes (scripts, CLI tools) lose access to the *original* session. By saving a snapshot first, those child processes can call :meth:`from_snapshot_file` to recover the original credentials. Example:: import subprocess bsm_default = BotoSesManager() bsm_acc_b = BotoSesManager(profile_name="acc_b") with bsm_default.temp_snapshot(): with bsm_acc_b.awscli(): # env now points to account B subprocess.run(["aws", "sts", "get-caller-identity"]) # my_script.py can call BotoSesManager.from_snapshot_file() # to get the original (account A) session back subprocess.run(["python", "my_script.py"]) """ if path is None: # pragma: no cover raise EnvironmentError("your system may not support $HOME directory") path = Path(path) snapshot = self.to_snapshot() try: path.write_text(json.dumps(snapshot)) yield self finally: if path.exists(): path.unlink()
[docs] def clear_cache(self): """ Clear all the boto session and boto client cache. """ self._boto_ses_cache = NOTHING self._client_cache.clear() self._resource_cache.clear() self._aws_user_id_cache = NOTHING self._aws_account_id_cache = NOTHING self._principal_arn_cache = NOTHING self._aws_account_alias_cache = NOTHING self._aws_region_cache = NOTHING