boto, the esteemed Python SDK for the AWS API, is being retired in favor of boto3, which has been deemed “stable and recommended for general use.” There are at least two big enhancements in boto3:

  1. Interfaces to AWS are driven automatically by JSON service descriptions rather than hand-coded. As a result, the features provided by the SDK are both more current and more portable since SDKs in many languages can use the same JSON descriptions.
  2. Resources, collections and related high-level interfaces offer a more pythonic and object-oriented experience for interacting with AWS.

In August, AWS released the Extensibility Guide for boto3. Quoting the guide:

All of Boto3’s resource and client classes are generated at runtime. This means that you cannot directly inherit and then extend the functionality of these classes because they do not exist until the program actually starts running. However it is still possible to extend the functionality of classes through Boto3’s event system.

The documentation goes on to give some trivial examples, but it still wasn’t immediately obvious to me, a relative Python neophyte, how to use this in my own development. I recently had a project where the ability to extend boto3 would simplify my code, and I’d like to share what I did.

Problem Statement

Many of my clients have complex applications with several AWS VPCs, external networks, and a lot of moving parts with many teams managing the various systems. I am often hired to help with the design and the security of these environments, and in that role I end up doing a lot of vulnerability assessment work.

Assessments require a lot of data collection about the network architecture, security group structure, permissions, and other settings in the environment. Previously I spent a lot of time clicking through the console and making notes, a tedious process that was both time consuming and error prone.

I set about coding up a tool to collect as much of the data as possible automatically, and also to draw some conclusions about the security of the system based on the data. I learned how simple it is to extend boto3 using the event system.

An Example: Security Group Rules

The ec2.SecurityGroup class in boto3 contain an IpPermissions attribute that represents the group’s inbound rules. The attribute is a list of rules in JSON format, looking something like this:

IpPermissions=[
    {
        'IpProtocol': 'string',
        'FromPort': 123,
        'ToPort': 123,
        'UserIdGroupPairs': [
            {
                'UserId': 'string',
                'GroupName': 'string',
                'GroupId': 'string'
            },
        ],
        'IpRanges': [
            {
                'CidrIp': 'string'
            },
        ],
        'PrefixListIds': [
            {
                'PrefixListId': 'string'
            },
        ]
    }
]

I wanted to analyze the rules, and converting this JSON blob to a proper Python object would allow me to work with individual rules more easily.

To do this, I extend boto3 with two classes: SecurityGroupRules, used to add an attribute called rules to SecurityGroup, and then SecurityGroupRule (note singular vs plural) to represent a single rule. The rules attribute can be created when the object is initialized. Something like this:

 1 class SecurityGroupRules(object):
 2     def __init__(self, *args, **kwargs):
 3         """Adds the attribute `rules` to SecurityGroup. `rules` is a list of
 4         SecurityGroupRule objects.
 5         """
 6         super(SecurityGroupRules, self).__init__(*args, **kwargs)
 7         self.rules = self.ip_permissions
 8 
 9     @property
10     def rules(self):
11         return self._rules
12 
13     @rules.setter
14     def rules(self, ip_permissions):
15         if type(ip_permissions) is not list:
16             raise TypeError("Expected list, found %s" % type(ip_permissions))
17         self._rules = []
18         for rule in ip_permissions:
19             protocol = rule.get('IpProtocol')
20             from_port = rule.get('FromPort')
21             to_port = rule.get('ToPort')
22             source_groups = []
23             for source_group in rule.get('UserIdGroupPairs'):
24                 source_groups.append(SourceGroup(user_id=source_group.get('UserId'),
25                                                  group_name=source_group.get('GroupName'),
26                                                  group_id=source_group.get('GroupId')))
27             cidr_ranges = []
28             for ip_range in rule.get('IpRanges'):
29                 cidr_ranges.append(ip_range['CidrIp'])
30             prefix_lists = []
31             for pfx in rule.get('PrefixListIds'):
32                 prefix_lists.append(pfx['PrefixListId'])
33             self._rules.append(SecurityGroupRule(protocol=protocol,
34                                                  from_port=from_port,
35                                                  to_port=to_port,
36                                                  source_groups=source_groups,
37                                                  cidr_ranges=cidr_ranges,
38                                                  prefix_lists=prefix_lists))

The setter method extracts the fields from the ip_permissions JSON blob and creates a SecurityGroupRule object for every rule.

SecurityGroupRule looks like:

 1 from collections import namedtuple
 2 SourceGroup = namedtuple('SourceGroup', 'user_id group_name group_id')
 3 CidrRange = namedtuple('CidrRange', 'cidr')
 4 PrefixList = namedtuple('PrefixList', 'prefix_list')
 5 PROTOCOLS = [
 6     '-1',
 7     'tcp',
 8     'udp',
 9     'icmp'
10 ]
11 
12 class SecurityGroupRule(object):
13     def __init__(self, protocol, from_port=None, to_port=None, source_groups=None, cidr_ranges=None, prefix_lists=None):
14         """See also http://boto3.readthedocs.org/en/latest/reference/services/ec2.html#securitygroup
15         """
16         self.protocol = protocol
17         self.from_port = from_port
18         self.source_groups = source_groups
19         self.cidr_ranges = cidr_ranges
20         self.prefix_lists = prefix_lists
21 
22     @property
23     def protocol(self):
24         return self._protocol
25 
26     @protocol.setter
27     def protocol(self, value):
28         if value not in PROTOCOLS:
29             raise TypeError('Encountered an unknown protocol %s' % value)
30         self._protocol = value
31 
32     @property
33     def from_port(self):
34         return self._from_port
35 
36     @from_port.setter
37     def from_port(self, value=None):
38         if not value or port_is_valid(value):
39             self._from_port = value
40         else:
41             raise TypeError('Encountered an unknown port %s' % value)
42     @property
43     def to_port(self):
44         return self._to_port
45 
46     @to_port.setter
47     def to_port(self, value=None):
48         if not value or port_is_valid(value):
49             self._to_port = value
50         else:
51             raise TypeError('Encountered an unknown port %s' % value)
52 
53     @property
54     def source_groups(self):
55         return self._source_groups
56 
57     @source_groups.setter
58     def source_groups(self, values):
59         """List of SourceGroup() named tuples
60         """
61         if values:
62             for v in values:
63                 if not isinstance(v, SourceGroup):
64                     raise TypeError('Expect a list of source groups but found %s' % v)
65         self._source_groups = values
66 
67     @property
68     def cidr_ranges(self):
69         return self._cidr_ranges
70 
71     @cidr_ranges.setter
72     def cidr_ranges(self, values):
73         """List of CidrRange() named tuples
74         """
75         valid_cidr = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([0-9]|[1-2][0-9]|3[0-2]))$"
76         if values:
77             for v in values:
78                 if not re.match(valid_cidr, v):
79                     raise TypeError('Expect a valid CIDR IP range but found %s' % v)
80         self._cidr_ranges = values
81 
82     @property
83     def prefix_lists(self):
84         return self._prefix_lists
85 
86     @prefix_lists.setter
87     def prefix_lists(self, values):
88         """List of PrefixList() named tuples
89         """
90         if values:
91             for v in values:
92                 if not isinstance(v, PrefixList):
93                     raise TypeError('Expect a list of PrefixList() but found %s' % str(v))
94         self._prefix_lists = values
95 
96 def port_is_valid(port):
97     return isinstance(port, int) and (port == -1 or 1 <= port <= 65535)

Great, so how do we use these classes? Enter the event system magic: register the SecurityGroupRules class to be added when boto3 instantiates a SecurityGroup.

1 def get_ec2_handle():
2     session = Session()
3     session.events.register('creating-resource-class.ec2.SecurityGroup', add_custom_sg_class)
4     return session.resource('ec2')
5 
6 def add_custom_sg_class(base_classes, **kwargs):
7     base_classes.insert(0, SecurityGroupRules)

Whenever a SecurityGroup object is created using the handle returned from get_ec2_handle(), SecurityGroupRules will also be instantiated. As a trivial example:

 1 ec2_handle = get_ec2_handle()
 2 
 3 # Get all the instances in an account
 4 # Use list() to force boto3 to make calls to AWS now
 5 # May take a some time if there are many instances!
 6 instances = list(ec2_handle.instances.all())
 7 
 8 # Get a unique list of security groups from all in-scope instances
 9 all_groups = []
10 for i in instances:
11     for sg_id in i.security_group_ids:
12         all_groups.append(sg_id)
13 
14 all_groups = set(all_groups)
15 
16 security_groups = []
17 for sg_id in all_groups:
18     security_groups.append(ec2_handle.SecurityGroup(sg_id))

security_groups is a list of SecurityGroup objects, each of which has a rules attribute that is a list of SecurityGroupRule objects. It’s now trivial to list all rules in all groups:

1 for sg in security_groups:
2   for rule in sg.rules:
3     print rule.protocol

We can also easily search the rules for insecure configurations, which is more detail than I planned for this post. So what’s considered an insecure configuration? Well that’s also not really the point of the post, but since you’ve asked my opinion…

  • All ports to the zero network (0.0.0.0/0) (obvious)
  • Any ports to the zero network should at least be noted
  • Access to all or many ports for a large CIDR range, where large is my somewhat arbitrary definition - /24 or larger.

The idea is that even internal to a VPC, security groups should be as restrictive as possible.

Wrapping Up

The boto3 event system is a novel approach to allow more pythonic interactions with AWS resources. This example showed how to essentially subclass ec2.SecurityGroup to allow better analysis of rules. I also subclass ec2.Instance to extract additional instance properties which aren’t immediately available, like IAM policies and instance userdata.

Please hassle me about something in the comments or on Twitter if you want. Thanks for reading.