boto, the esteemed Python SDK for the AWS API, is being retired in favor of boto3, which has been deemed “stable and recommended for general use.” There are at least two big enhancements in boto3:

  1. Interfaces to AWS are driven automatically by JSON service descriptions rather than hand-coded. As a result, the features provided by the SDK are both more current and more portable since SDKs in many languages can use the same JSON descriptions.
  2. Resources, collections and related high-level interfaces offer a more pythonic and object-oriented experience for interacting with AWS.

In August, AWS released the Extensibility Guide for boto3. Quoting the guide:

All of Boto3’s resource and client classes are generated at runtime. This means that you cannot directly inherit and then extend the functionality of these classes because they do not exist until the program actually starts running. However it is still possible to extend the functionality of classes through Boto3’s event system.

The documentation goes on to give some trivial examples, but it still wasn’t immediately obvious to me, a relative Python neophyte, how to use this in my own development. I recently had a project where the ability to extend boto3 would simplify my code, and I’d like to share what I did.

Problem Statement

Many of my clients have complex applications with several AWS VPCs, external networks, and a lot of moving parts with many teams managing the various systems. I am often hired to help with the design and the security of these environments, and in that role I end up doing a lot of vulnerability assessment work.

Assessments require a lot of data collection about the network architecture, security group structure, permissions, and other settings in the environment. Previously I spent a lot of time clicking through the console and making notes, a tedious process that was both time consuming and error prone.

I set about coding up a tool to collect as much of the data as possible automatically, and also to draw some conclusions about the security of the system based on the data. I learned how simple it is to extend boto3 using the event system.

An Example: Security Group Rules

The ec2.SecurityGroup class in boto3 contain an IpPermissions attribute that represents the group’s inbound rules. The attribute is a list of rules in JSON format, looking something like this:

IpPermissions=[
    {
        'IpProtocol': 'string',
        'FromPort': 123,
        'ToPort': 123,
        'UserIdGroupPairs': [
            {
                'UserId': 'string',
                'GroupName': 'string',
                'GroupId': 'string'
            },
        ],
        'IpRanges': [
            {
                'CidrIp': 'string'
            },
        ],
        'PrefixListIds': [
            {
                'PrefixListId': 'string'
            },
        ]
    }
]

I wanted to analyze the rules, and converting this JSON blob to a proper Python object would allow me to work with individual rules more easily.

To do this, I extend boto3 with two classes: SecurityGroupRules, used to add an attribute called rules to SecurityGroup, and then SecurityGroupRule (note singular vs plural) to represent a single rule. The rules attribute can be created when the object is initialized. Something like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class SecurityGroupRules(object):
    def __init__(self, *args, **kwargs):
        """Adds the attribute `rules` to SecurityGroup. `rules` is a list of
        SecurityGroupRule objects.
        """
        super(SecurityGroupRules, self).__init__(*args, **kwargs)
        self.rules = self.ip_permissions

    @property
    def rules(self):
        return self._rules

    @rules.setter
    def rules(self, ip_permissions):
        if type(ip_permissions) is not list:
            raise TypeError("Expected list, found %s" % type(ip_permissions))
        self._rules = []
        for rule in ip_permissions:
            protocol = rule.get('IpProtocol')
            from_port = rule.get('FromPort')
            to_port = rule.get('ToPort')
            source_groups = []
            for source_group in rule.get('UserIdGroupPairs'):
                source_groups.append(SourceGroup(user_id=source_group.get('UserId'),
                                                 group_name=source_group.get('GroupName'),
                                                 group_id=source_group.get('GroupId')))
            cidr_ranges = []
            for ip_range in rule.get('IpRanges'):
                cidr_ranges.append(ip_range['CidrIp'])
            prefix_lists = []
            for pfx in rule.get('PrefixListIds'):
                prefix_lists.append(pfx['PrefixListId'])
            self._rules.append(SecurityGroupRule(protocol=protocol,
                                                 from_port=from_port,
                                                 to_port=to_port,
                                                 source_groups=source_groups,
                                                 cidr_ranges=cidr_ranges,
                                                 prefix_lists=prefix_lists))

The setter method extracts the fields from the ip_permissions JSON blob and creates a SecurityGroupRule object for every rule.

SecurityGroupRule looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from collections import namedtuple
SourceGroup = namedtuple('SourceGroup', 'user_id group_name group_id')
CidrRange = namedtuple('CidrRange', 'cidr')
PrefixList = namedtuple('PrefixList', 'prefix_list')
PROTOCOLS = [
    '-1',
    'tcp',
    'udp',
    'icmp'
]

class SecurityGroupRule(object):
    def __init__(self, protocol, from_port=None, to_port=None, source_groups=None, cidr_ranges=None, prefix_lists=None):
        """See also http://boto3.readthedocs.org/en/latest/reference/services/ec2.html#securitygroup
        """
        self.protocol = protocol
        self.from_port = from_port
        self.source_groups = source_groups
        self.cidr_ranges = cidr_ranges
        self.prefix_lists = prefix_lists

    @property
    def protocol(self):
        return self._protocol

    @protocol.setter
    def protocol(self, value):
        if value not in PROTOCOLS:
            raise TypeError('Encountered an unknown protocol %s' % value)
        self._protocol = value

    @property
    def from_port(self):
        return self._from_port

    @from_port.setter
    def from_port(self, value=None):
        if not value or port_is_valid(value):
            self._from_port = value
        else:
            raise TypeError('Encountered an unknown port %s' % value)
    @property
    def to_port(self):
        return self._to_port

    @to_port.setter
    def to_port(self, value=None):
        if not value or port_is_valid(value):
            self._to_port = value
        else:
            raise TypeError('Encountered an unknown port %s' % value)

    @property
    def source_groups(self):
        return self._source_groups

    @source_groups.setter
    def source_groups(self, values):
        """List of SourceGroup() named tuples
        """
        if values:
            for v in values:
                if not isinstance(v, SourceGroup):
                    raise TypeError('Expect a list of source groups but found %s' % v)
        self._source_groups = values

    @property
    def cidr_ranges(self):
        return self._cidr_ranges

    @cidr_ranges.setter
    def cidr_ranges(self, values):
        """List of CidrRange() named tuples
        """
        valid_cidr = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([0-9]|[1-2][0-9]|3[0-2]))$"
        if values:
            for v in values:
                if not re.match(valid_cidr, v):
                    raise TypeError('Expect a valid CIDR IP range but found %s' % v)
        self._cidr_ranges = values

    @property
    def prefix_lists(self):
        return self._prefix_lists

    @prefix_lists.setter
    def prefix_lists(self, values):
        """List of PrefixList() named tuples
        """
        if values:
            for v in values:
                if not isinstance(v, PrefixList):
                    raise TypeError('Expect a list of PrefixList() but found %s' % str(v))
        self._prefix_lists = values

def port_is_valid(port):
    return isinstance(port, int) and (port == -1 or 1 <= port <= 65535)

Great, so how do we use these classes? Enter the event system magic: register the SecurityGroupRules class to be added when boto3 instantiates a SecurityGroup.

1
2
3
4
5
6
7
def get_ec2_handle():
    session = Session()
    session.events.register('creating-resource-class.ec2.SecurityGroup', add_custom_sg_class)
    return session.resource('ec2')

def add_custom_sg_class(base_classes, **kwargs):
    base_classes.insert(0, SecurityGroupRules)

Whenever a SecurityGroup object is created using the handle returned from get_ec2_handle(), SecurityGroupRules will also be instantiated. As a trivial example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ec2_handle = get_ec2_handle()

# Get all the instances in an account
# Use list() to force boto3 to make calls to AWS now
# May take a some time if there are many instances!
instances = list(ec2_handle.instances.all())

# Get a unique list of security groups from all in-scope instances
all_groups = []
for i in instances:
    for sg_id in i.security_group_ids:
        all_groups.append(sg_id)

all_groups = set(all_groups)

security_groups = []
for sg_id in all_groups:
    security_groups.append(ec2_handle.SecurityGroup(sg_id))

security_groups is a list of SecurityGroup objects, each of which has a rules attribute that is a list of SecurityGroupRule objects. It’s now trivial to list all rules in all groups:

1
2
3
for sg in security_groups:
  for rule in sg.rules:
    print rule.protocol

We can also easily search the rules for insecure configurations, which is more detail than I planned for this post. So what’s considered an insecure configuration? Well that’s also not really the point of the post, but since you’ve asked my opinion…

The idea is that even internal to a VPC, security groups should be as restrictive as possible.

Wrapping Up

The boto3 event system is a novel approach to allow more pythonic interactions with AWS resources. This example showed how to essentially subclass ec2.SecurityGroup to allow better analysis of rules. I also subclass ec2.Instance to extract additional instance properties which aren’t immediately available, like IAM policies and instance userdata.

Please hassle me about something in the comments or on Twitter if you want. Thanks for reading.