Run black on moto & test directories.

This commit is contained in:
Asher Foa 2019-10-31 08:44:26 -07:00
commit 96e5b1993d
507 changed files with 52541 additions and 47814 deletions

View file

@ -2,6 +2,6 @@ from __future__ import unicode_literals
from .models import elb_backends
from ..core.models import base_decorator, deprecated_base_decorator
elb_backend = elb_backends['us-east-1']
elb_backend = elb_backends["us-east-1"]
mock_elb = base_decorator(elb_backends)
mock_elb_deprecated = deprecated_base_decorator(elb_backends)

View file

@ -7,68 +7,66 @@ class ELBClientError(RESTError):
class DuplicateTagKeysError(ELBClientError):
def __init__(self, cidr):
super(DuplicateTagKeysError, self).__init__(
"DuplicateTagKeys",
"Tag key was specified more than once: {0}"
.format(cidr))
"DuplicateTagKeys", "Tag key was specified more than once: {0}".format(cidr)
)
class LoadBalancerNotFoundError(ELBClientError):
def __init__(self, cidr):
super(LoadBalancerNotFoundError, self).__init__(
"LoadBalancerNotFound",
"The specified load balancer does not exist: {0}"
.format(cidr))
"The specified load balancer does not exist: {0}".format(cidr),
)
class TooManyTagsError(ELBClientError):
def __init__(self):
super(TooManyTagsError, self).__init__(
"LoadBalancerNotFound",
"The quota for the number of tags that can be assigned to a load balancer has been reached")
"The quota for the number of tags that can be assigned to a load balancer has been reached",
)
class BadHealthCheckDefinition(ELBClientError):
def __init__(self):
super(BadHealthCheckDefinition, self).__init__(
"ValidationError",
"HealthCheck Target must begin with one of HTTP, TCP, HTTPS, SSL")
"HealthCheck Target must begin with one of HTTP, TCP, HTTPS, SSL",
)
class DuplicateListenerError(ELBClientError):
def __init__(self, name, port):
super(DuplicateListenerError, self).__init__(
"DuplicateListener",
"A listener already exists for {0} with LoadBalancerPort {1}, but with a different InstancePort, Protocol, or SSLCertificateId"
.format(name, port))
"A listener already exists for {0} with LoadBalancerPort {1}, but with a different InstancePort, Protocol, or SSLCertificateId".format(
name, port
),
)
class DuplicateLoadBalancerName(ELBClientError):
def __init__(self, name):
super(DuplicateLoadBalancerName, self).__init__(
"DuplicateLoadBalancerName",
"The specified load balancer name already exists for this account: {0}"
.format(name))
"The specified load balancer name already exists for this account: {0}".format(
name
),
)
class EmptyListenersError(ELBClientError):
def __init__(self):
super(EmptyListenersError, self).__init__(
"ValidationError",
"Listeners cannot be empty")
"ValidationError", "Listeners cannot be empty"
)
class InvalidSecurityGroupError(ELBClientError):
def __init__(self):
super(InvalidSecurityGroupError, self).__init__(
"ValidationError",
"One or more of the specified security groups do not exist.")
"One or more of the specified security groups do not exist.",
)

View file

@ -8,10 +8,7 @@ from boto.ec2.elb.attributes import (
AccessLogAttribute,
CrossZoneLoadBalancingAttribute,
)
from boto.ec2.elb.policies import (
Policies,
OtherPolicy,
)
from boto.ec2.elb.policies import Policies, OtherPolicy
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
from moto.ec2.models import ec2_backends
@ -27,20 +24,19 @@ from .exceptions import (
class FakeHealthCheck(BaseModel):
def __init__(self, timeout, healthy_threshold, unhealthy_threshold,
interval, target):
def __init__(
self, timeout, healthy_threshold, unhealthy_threshold, interval, target
):
self.timeout = timeout
self.healthy_threshold = healthy_threshold
self.unhealthy_threshold = unhealthy_threshold
self.interval = interval
self.target = target
if not target.startswith(('HTTP', 'TCP', 'HTTPS', 'SSL')):
if not target.startswith(("HTTP", "TCP", "HTTPS", "SSL")):
raise BadHealthCheckDefinition
class FakeListener(BaseModel):
def __init__(self, load_balancer_port, instance_port, protocol, ssl_certificate_id):
self.load_balancer_port = load_balancer_port
self.instance_port = instance_port
@ -49,22 +45,38 @@ class FakeListener(BaseModel):
self.policy_names = []
def __repr__(self):
return "FakeListener(lbp: %s, inp: %s, pro: %s, cid: %s, policies: %s)" % (self.load_balancer_port, self.instance_port, self.protocol, self.ssl_certificate_id, self.policy_names)
return "FakeListener(lbp: %s, inp: %s, pro: %s, cid: %s, policies: %s)" % (
self.load_balancer_port,
self.instance_port,
self.protocol,
self.ssl_certificate_id,
self.policy_names,
)
class FakeBackend(BaseModel):
def __init__(self, instance_port):
self.instance_port = instance_port
self.policy_names = []
def __repr__(self):
return "FakeBackend(inp: %s, policies: %s)" % (self.instance_port, self.policy_names)
return "FakeBackend(inp: %s, policies: %s)" % (
self.instance_port,
self.policy_names,
)
class FakeLoadBalancer(BaseModel):
def __init__(self, name, zones, ports, scheme='internet-facing', vpc_id=None, subnets=None, security_groups=None):
def __init__(
self,
name,
zones,
ports,
scheme="internet-facing",
vpc_id=None,
subnets=None,
security_groups=None,
):
self.name = name
self.health_check = None
self.instance_ids = []
@ -80,47 +92,49 @@ class FakeLoadBalancer(BaseModel):
self.policies.lb_cookie_stickiness_policies = []
self.security_groups = security_groups or []
self.subnets = subnets or []
self.vpc_id = vpc_id or 'vpc-56e10e3d'
self.vpc_id = vpc_id or "vpc-56e10e3d"
self.tags = {}
self.dns_name = "%s.us-east-1.elb.amazonaws.com" % (name)
for port in ports:
listener = FakeListener(
protocol=(port.get('protocol') or port['Protocol']),
protocol=(port.get("protocol") or port["Protocol"]),
load_balancer_port=(
port.get('load_balancer_port') or port['LoadBalancerPort']),
instance_port=(
port.get('instance_port') or port['InstancePort']),
port.get("load_balancer_port") or port["LoadBalancerPort"]
),
instance_port=(port.get("instance_port") or port["InstancePort"]),
ssl_certificate_id=port.get(
'ssl_certificate_id', port.get('SSLCertificateId')),
"ssl_certificate_id", port.get("SSLCertificateId")
),
)
self.listeners.append(listener)
# it is unclear per the AWS documentation as to when or how backend
# information gets set, so let's guess and set it here *shrug*
backend = FakeBackend(
instance_port=(
port.get('instance_port') or port['InstancePort']),
instance_port=(port.get("instance_port") or port["InstancePort"])
)
self.backends.append(backend)
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
elb_backend = elb_backends[region_name]
new_elb = elb_backend.create_load_balancer(
name=properties.get('LoadBalancerName', resource_name),
zones=properties.get('AvailabilityZones', []),
ports=properties['Listeners'],
scheme=properties.get('Scheme', 'internet-facing'),
name=properties.get("LoadBalancerName", resource_name),
zones=properties.get("AvailabilityZones", []),
ports=properties["Listeners"],
scheme=properties.get("Scheme", "internet-facing"),
)
instance_ids = properties.get('Instances', [])
instance_ids = properties.get("Instances", [])
for instance_id in instance_ids:
elb_backend.register_instances(new_elb.name, [instance_id])
policies = properties.get('Policies', [])
policies = properties.get("Policies", [])
port_policies = {}
for policy in policies:
policy_name = policy["PolicyName"]
@ -134,29 +148,37 @@ class FakeLoadBalancer(BaseModel):
for port, policies in port_policies.items():
elb_backend.set_load_balancer_policies_of_backend_server(
new_elb.name, port, list(policies))
new_elb.name, port, list(policies)
)
health_check = properties.get('HealthCheck')
health_check = properties.get("HealthCheck")
if health_check:
elb_backend.configure_health_check(
load_balancer_name=new_elb.name,
timeout=health_check['Timeout'],
healthy_threshold=health_check['HealthyThreshold'],
unhealthy_threshold=health_check['UnhealthyThreshold'],
interval=health_check['Interval'],
target=health_check['Target'],
timeout=health_check["Timeout"],
healthy_threshold=health_check["HealthyThreshold"],
unhealthy_threshold=health_check["UnhealthyThreshold"],
interval=health_check["Interval"],
target=health_check["Target"],
)
return new_elb
@classmethod
def update_from_cloudformation_json(cls, original_resource, new_resource_name, cloudformation_json, region_name):
def update_from_cloudformation_json(
cls, original_resource, new_resource_name, cloudformation_json, region_name
):
cls.delete_from_cloudformation_json(
original_resource.name, cloudformation_json, region_name)
return cls.create_from_cloudformation_json(new_resource_name, cloudformation_json, region_name)
original_resource.name, cloudformation_json, region_name
)
return cls.create_from_cloudformation_json(
new_resource_name, cloudformation_json, region_name
)
@classmethod
def delete_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
def delete_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
elb_backend = elb_backends[region_name]
try:
elb_backend.delete_load_balancer(resource_name)
@ -169,20 +191,25 @@ class FakeLoadBalancer(BaseModel):
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'CanonicalHostedZoneName':
if attribute_name == "CanonicalHostedZoneName":
raise NotImplementedError(
'"Fn::GetAtt" : [ "{0}" , "CanonicalHostedZoneName" ]"')
elif attribute_name == 'CanonicalHostedZoneNameID':
'"Fn::GetAtt" : [ "{0}" , "CanonicalHostedZoneName" ]"'
)
elif attribute_name == "CanonicalHostedZoneNameID":
raise NotImplementedError(
'"Fn::GetAtt" : [ "{0}" , "CanonicalHostedZoneNameID" ]"')
elif attribute_name == 'DNSName':
'"Fn::GetAtt" : [ "{0}" , "CanonicalHostedZoneNameID" ]"'
)
elif attribute_name == "DNSName":
return self.dns_name
elif attribute_name == 'SourceSecurityGroup.GroupName':
elif attribute_name == "SourceSecurityGroup.GroupName":
raise NotImplementedError(
'"Fn::GetAtt" : [ "{0}" , "SourceSecurityGroup.GroupName" ]"')
elif attribute_name == 'SourceSecurityGroup.OwnerAlias':
'"Fn::GetAtt" : [ "{0}" , "SourceSecurityGroup.GroupName" ]"'
)
elif attribute_name == "SourceSecurityGroup.OwnerAlias":
raise NotImplementedError(
'"Fn::GetAtt" : [ "{0}" , "SourceSecurityGroup.OwnerAlias" ]"')
'"Fn::GetAtt" : [ "{0}" , "SourceSecurityGroup.OwnerAlias" ]"'
)
raise UnformattedGetAttTemplateException()
@classmethod
@ -220,12 +247,11 @@ class FakeLoadBalancer(BaseModel):
del self.tags[key]
def delete(self, region):
''' Not exposed as part of the ELB API - used for CloudFormation. '''
""" Not exposed as part of the ELB API - used for CloudFormation. """
elb_backends[region].delete_load_balancer(self.name)
class ELBBackend(BaseBackend):
def __init__(self, region_name=None):
self.region_name = region_name
self.load_balancers = OrderedDict()
@ -235,7 +261,15 @@ class ELBBackend(BaseBackend):
self.__dict__ = {}
self.__init__(region_name)
def create_load_balancer(self, name, zones, ports, scheme='internet-facing', subnets=None, security_groups=None):
def create_load_balancer(
self,
name,
zones,
ports,
scheme="internet-facing",
subnets=None,
security_groups=None,
):
vpc_id = None
ec2_backend = ec2_backends[self.region_name]
if subnets:
@ -257,7 +291,8 @@ class ELBBackend(BaseBackend):
scheme=scheme,
subnets=subnets,
security_groups=security_groups,
vpc_id=vpc_id)
vpc_id=vpc_id,
)
self.load_balancers[name] = new_load_balancer
return new_load_balancer
@ -265,10 +300,10 @@ class ELBBackend(BaseBackend):
balancer = self.load_balancers.get(name, None)
if balancer:
for port in ports:
protocol = port['protocol']
instance_port = port['instance_port']
lb_port = port['load_balancer_port']
ssl_certificate_id = port.get('ssl_certificate_id')
protocol = port["protocol"]
instance_port = port["instance_port"]
lb_port = port["load_balancer_port"]
ssl_certificate_id = port.get("ssl_certificate_id")
for listener in balancer.listeners:
if lb_port == listener.load_balancer_port:
if protocol != listener.protocol:
@ -279,8 +314,11 @@ class ELBBackend(BaseBackend):
raise DuplicateListenerError(name, lb_port)
break
else:
balancer.listeners.append(FakeListener(
lb_port, instance_port, protocol, ssl_certificate_id))
balancer.listeners.append(
FakeListener(
lb_port, instance_port, protocol, ssl_certificate_id
)
)
return balancer
@ -288,7 +326,8 @@ class ELBBackend(BaseBackend):
balancers = self.load_balancers.values()
if names:
matched_balancers = [
balancer for balancer in balancers if balancer.name in names]
balancer for balancer in balancers if balancer.name in names
]
if len(names) != len(matched_balancers):
missing_elb = list(set(names) - set(matched_balancers))[0]
raise LoadBalancerNotFoundError(missing_elb)
@ -315,7 +354,9 @@ class ELBBackend(BaseBackend):
def get_load_balancer(self, load_balancer_name):
return self.load_balancers.get(load_balancer_name)
def apply_security_groups_to_load_balancer(self, load_balancer_name, security_group_ids):
def apply_security_groups_to_load_balancer(
self, load_balancer_name, security_group_ids
):
load_balancer = self.load_balancers.get(load_balancer_name)
ec2_backend = ec2_backends[self.region_name]
for security_group_id in security_group_ids:
@ -323,22 +364,30 @@ class ELBBackend(BaseBackend):
raise InvalidSecurityGroupError()
load_balancer.security_groups = security_group_ids
def configure_health_check(self, load_balancer_name, timeout,
healthy_threshold, unhealthy_threshold, interval,
target):
check = FakeHealthCheck(timeout, healthy_threshold, unhealthy_threshold,
interval, target)
def configure_health_check(
self,
load_balancer_name,
timeout,
healthy_threshold,
unhealthy_threshold,
interval,
target,
):
check = FakeHealthCheck(
timeout, healthy_threshold, unhealthy_threshold, interval, target
)
load_balancer = self.get_load_balancer(load_balancer_name)
load_balancer.health_check = check
return check
def set_load_balancer_listener_sslcertificate(self, name, lb_port, ssl_certificate_id):
def set_load_balancer_listener_sslcertificate(
self, name, lb_port, ssl_certificate_id
):
balancer = self.load_balancers.get(name, None)
if balancer:
for idx, listener in enumerate(balancer.listeners):
if lb_port == listener.load_balancer_port:
balancer.listeners[
idx].ssl_certificate_id = ssl_certificate_id
balancer.listeners[idx].ssl_certificate_id = ssl_certificate_id
return balancer
@ -350,7 +399,10 @@ class ELBBackend(BaseBackend):
def deregister_instances(self, load_balancer_name, instance_ids):
load_balancer = self.get_load_balancer(load_balancer_name)
new_instance_ids = [
instance_id for instance_id in load_balancer.instance_ids if instance_id not in instance_ids]
instance_id
for instance_id in load_balancer.instance_ids
if instance_id not in instance_ids
]
load_balancer.instance_ids = new_instance_ids
return load_balancer
@ -376,7 +428,9 @@ class ELBBackend(BaseBackend):
def create_lb_other_policy(self, load_balancer_name, other_policy):
load_balancer = self.get_load_balancer(load_balancer_name)
if other_policy.policy_name not in [p.policy_name for p in load_balancer.policies.other_policies]:
if other_policy.policy_name not in [
p.policy_name for p in load_balancer.policies.other_policies
]:
load_balancer.policies.other_policies.append(other_policy)
return load_balancer
@ -391,19 +445,27 @@ class ELBBackend(BaseBackend):
load_balancer.policies.lb_cookie_stickiness_policies.append(policy)
return load_balancer
def set_load_balancer_policies_of_backend_server(self, load_balancer_name, instance_port, policies):
def set_load_balancer_policies_of_backend_server(
self, load_balancer_name, instance_port, policies
):
load_balancer = self.get_load_balancer(load_balancer_name)
backend = [b for b in load_balancer.backends if int(
b.instance_port) == instance_port][0]
backend = [
b for b in load_balancer.backends if int(b.instance_port) == instance_port
][0]
backend_idx = load_balancer.backends.index(backend)
backend.policy_names = policies
load_balancer.backends[backend_idx] = backend
return load_balancer
def set_load_balancer_policies_of_listener(self, load_balancer_name, load_balancer_port, policies):
def set_load_balancer_policies_of_listener(
self, load_balancer_name, load_balancer_port, policies
):
load_balancer = self.get_load_balancer(load_balancer_name)
listener = [l for l in load_balancer.listeners if int(
l.load_balancer_port) == load_balancer_port][0]
listener = [
l
for l in load_balancer.listeners
if int(l.load_balancer_port) == load_balancer_port
][0]
listener_idx = load_balancer.listeners.index(listener)
listener.policy_names = policies
load_balancer.listeners[listener_idx] = listener

View file

@ -5,10 +5,7 @@ from boto.ec2.elb.attributes import (
AccessLogAttribute,
CrossZoneLoadBalancingAttribute,
)
from boto.ec2.elb.policies import (
AppCookieStickinessPolicy,
OtherPolicy,
)
from boto.ec2.elb.policies import AppCookieStickinessPolicy, OtherPolicy
from moto.core.responses import BaseResponse
from .models import elb_backends
@ -16,16 +13,15 @@ from .exceptions import DuplicateTagKeysError, LoadBalancerNotFoundError
class ELBResponse(BaseResponse):
@property
def elb_backend(self):
return elb_backends[self.region]
def create_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
availability_zones = self._get_multi_param("AvailabilityZones.member")
ports = self._get_list_prefix("Listeners.member")
scheme = self._get_param('Scheme')
scheme = self._get_param("Scheme")
subnets = self._get_multi_param("Subnets.member")
security_groups = self._get_multi_param("SecurityGroups.member")
@ -42,27 +38,29 @@ class ELBResponse(BaseResponse):
return template.render(load_balancer=load_balancer)
def create_load_balancer_listeners(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
ports = self._get_list_prefix("Listeners.member")
self.elb_backend.create_load_balancer_listeners(
name=load_balancer_name, ports=ports)
name=load_balancer_name, ports=ports
)
template = self.response_template(
CREATE_LOAD_BALANCER_LISTENERS_TEMPLATE)
template = self.response_template(CREATE_LOAD_BALANCER_LISTENERS_TEMPLATE)
return template.render()
def describe_load_balancers(self):
names = self._get_multi_param("LoadBalancerNames.member")
all_load_balancers = list(self.elb_backend.describe_load_balancers(names))
marker = self._get_param('Marker')
marker = self._get_param("Marker")
all_names = [balancer.name for balancer in all_load_balancers]
if marker:
start = all_names.index(marker) + 1
else:
start = 0
page_size = self._get_int_param('PageSize', 50) # the default is 400, but using 50 to make testing easier
load_balancers_resp = all_load_balancers[start:start + page_size]
page_size = self._get_int_param(
"PageSize", 50
) # the default is 400, but using 50 to make testing easier
load_balancers_resp = all_load_balancers[start : start + page_size]
next_marker = None
if len(all_load_balancers) > start + page_size:
next_marker = load_balancers_resp[-1].name
@ -71,143 +69,158 @@ class ELBResponse(BaseResponse):
return template.render(load_balancers=load_balancers_resp, marker=next_marker)
def delete_load_balancer_listeners(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
ports = self._get_multi_param("LoadBalancerPorts.member")
ports = [int(port) for port in ports]
self.elb_backend.delete_load_balancer_listeners(
load_balancer_name, ports)
self.elb_backend.delete_load_balancer_listeners(load_balancer_name, ports)
template = self.response_template(DELETE_LOAD_BALANCER_LISTENERS)
return template.render()
def delete_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
self.elb_backend.delete_load_balancer(load_balancer_name)
template = self.response_template(DELETE_LOAD_BALANCER_TEMPLATE)
return template.render()
def apply_security_groups_to_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
security_group_ids = self._get_multi_param("SecurityGroups.member")
self.elb_backend.apply_security_groups_to_load_balancer(load_balancer_name, security_group_ids)
self.elb_backend.apply_security_groups_to_load_balancer(
load_balancer_name, security_group_ids
)
template = self.response_template(APPLY_SECURITY_GROUPS_TEMPLATE)
return template.render(security_group_ids=security_group_ids)
def configure_health_check(self):
check = self.elb_backend.configure_health_check(
load_balancer_name=self._get_param('LoadBalancerName'),
timeout=self._get_param('HealthCheck.Timeout'),
healthy_threshold=self._get_param('HealthCheck.HealthyThreshold'),
unhealthy_threshold=self._get_param(
'HealthCheck.UnhealthyThreshold'),
interval=self._get_param('HealthCheck.Interval'),
target=self._get_param('HealthCheck.Target'),
load_balancer_name=self._get_param("LoadBalancerName"),
timeout=self._get_param("HealthCheck.Timeout"),
healthy_threshold=self._get_param("HealthCheck.HealthyThreshold"),
unhealthy_threshold=self._get_param("HealthCheck.UnhealthyThreshold"),
interval=self._get_param("HealthCheck.Interval"),
target=self._get_param("HealthCheck.Target"),
)
template = self.response_template(CONFIGURE_HEALTH_CHECK_TEMPLATE)
return template.render(check=check)
def register_instances_with_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
instance_ids = [list(param.values())[0] for param in self._get_list_prefix('Instances.member')]
load_balancer_name = self._get_param("LoadBalancerName")
instance_ids = [
list(param.values())[0]
for param in self._get_list_prefix("Instances.member")
]
template = self.response_template(REGISTER_INSTANCES_TEMPLATE)
load_balancer = self.elb_backend.register_instances(
load_balancer_name, instance_ids)
load_balancer_name, instance_ids
)
return template.render(load_balancer=load_balancer)
def set_load_balancer_listener_ssl_certificate(self):
load_balancer_name = self._get_param('LoadBalancerName')
ssl_certificate_id = self.querystring['SSLCertificateId'][0]
lb_port = self.querystring['LoadBalancerPort'][0]
load_balancer_name = self._get_param("LoadBalancerName")
ssl_certificate_id = self.querystring["SSLCertificateId"][0]
lb_port = self.querystring["LoadBalancerPort"][0]
self.elb_backend.set_load_balancer_listener_sslcertificate(
load_balancer_name, lb_port, ssl_certificate_id)
load_balancer_name, lb_port, ssl_certificate_id
)
template = self.response_template(SET_LOAD_BALANCER_SSL_CERTIFICATE)
return template.render()
def deregister_instances_from_load_balancer(self):
load_balancer_name = self._get_param('LoadBalancerName')
instance_ids = [list(param.values())[0] for param in self._get_list_prefix('Instances.member')]
load_balancer_name = self._get_param("LoadBalancerName")
instance_ids = [
list(param.values())[0]
for param in self._get_list_prefix("Instances.member")
]
template = self.response_template(DEREGISTER_INSTANCES_TEMPLATE)
load_balancer = self.elb_backend.deregister_instances(
load_balancer_name, instance_ids)
load_balancer_name, instance_ids
)
return template.render(load_balancer=load_balancer)
def describe_load_balancer_attributes(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
load_balancer = self.elb_backend.get_load_balancer(load_balancer_name)
template = self.response_template(DESCRIBE_ATTRIBUTES_TEMPLATE)
return template.render(attributes=load_balancer.attributes)
def modify_load_balancer_attributes(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
load_balancer = self.elb_backend.get_load_balancer(load_balancer_name)
cross_zone = self._get_dict_param(
"LoadBalancerAttributes.CrossZoneLoadBalancing.")
"LoadBalancerAttributes.CrossZoneLoadBalancing."
)
if cross_zone:
attribute = CrossZoneLoadBalancingAttribute()
attribute.enabled = cross_zone["enabled"] == "true"
self.elb_backend.set_cross_zone_load_balancing_attribute(
load_balancer_name, attribute)
load_balancer_name, attribute
)
access_log = self._get_dict_param("LoadBalancerAttributes.AccessLog.")
if access_log:
attribute = AccessLogAttribute()
attribute.enabled = access_log["enabled"] == "true"
attribute.s3_bucket_name = access_log['s3_bucket_name']
attribute.s3_bucket_prefix = access_log['s3_bucket_prefix']
attribute.s3_bucket_name = access_log["s3_bucket_name"]
attribute.s3_bucket_prefix = access_log["s3_bucket_prefix"]
attribute.emit_interval = access_log["emit_interval"]
self.elb_backend.set_access_log_attribute(
load_balancer_name, attribute)
self.elb_backend.set_access_log_attribute(load_balancer_name, attribute)
connection_draining = self._get_dict_param(
"LoadBalancerAttributes.ConnectionDraining.")
"LoadBalancerAttributes.ConnectionDraining."
)
if connection_draining:
attribute = ConnectionDrainingAttribute()
attribute.enabled = connection_draining["enabled"] == "true"
attribute.timeout = connection_draining.get("timeout", 300)
self.elb_backend.set_connection_draining_attribute(load_balancer_name, attribute)
self.elb_backend.set_connection_draining_attribute(
load_balancer_name, attribute
)
connection_settings = self._get_dict_param(
"LoadBalancerAttributes.ConnectionSettings.")
"LoadBalancerAttributes.ConnectionSettings."
)
if connection_settings:
attribute = ConnectionSettingAttribute()
attribute.idle_timeout = connection_settings["idle_timeout"]
self.elb_backend.set_connection_settings_attribute(
load_balancer_name, attribute)
load_balancer_name, attribute
)
template = self.response_template(MODIFY_ATTRIBUTES_TEMPLATE)
return template.render(load_balancer=load_balancer, attributes=load_balancer.attributes)
return template.render(
load_balancer=load_balancer, attributes=load_balancer.attributes
)
def create_load_balancer_policy(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
other_policy = OtherPolicy()
policy_name = self._get_param("PolicyName")
other_policy.policy_name = policy_name
self.elb_backend.create_lb_other_policy(
load_balancer_name, other_policy)
self.elb_backend.create_lb_other_policy(load_balancer_name, other_policy)
template = self.response_template(CREATE_LOAD_BALANCER_POLICY_TEMPLATE)
return template.render()
def create_app_cookie_stickiness_policy(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
policy = AppCookieStickinessPolicy()
policy.policy_name = self._get_param("PolicyName")
policy.cookie_name = self._get_param("CookieName")
self.elb_backend.create_app_cookie_stickiness_policy(
load_balancer_name, policy)
self.elb_backend.create_app_cookie_stickiness_policy(load_balancer_name, policy)
template = self.response_template(CREATE_LOAD_BALANCER_POLICY_TEMPLATE)
return template.render()
def create_lb_cookie_stickiness_policy(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
policy = AppCookieStickinessPolicy()
policy.policy_name = self._get_param("PolicyName")
@ -217,62 +230,68 @@ class ELBResponse(BaseResponse):
else:
policy.cookie_expiration_period = None
self.elb_backend.create_lb_cookie_stickiness_policy(
load_balancer_name, policy)
self.elb_backend.create_lb_cookie_stickiness_policy(load_balancer_name, policy)
template = self.response_template(CREATE_LOAD_BALANCER_POLICY_TEMPLATE)
return template.render()
def set_load_balancer_policies_of_listener(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
load_balancer = self.elb_backend.get_load_balancer(load_balancer_name)
load_balancer_port = int(self._get_param('LoadBalancerPort'))
load_balancer_port = int(self._get_param("LoadBalancerPort"))
mb_listener = [l for l in load_balancer.listeners if int(
l.load_balancer_port) == load_balancer_port]
mb_listener = [
l
for l in load_balancer.listeners
if int(l.load_balancer_port) == load_balancer_port
]
if mb_listener:
policies = self._get_multi_param("PolicyNames.member")
self.elb_backend.set_load_balancer_policies_of_listener(
load_balancer_name, load_balancer_port, policies)
load_balancer_name, load_balancer_port, policies
)
# else: explode?
template = self.response_template(
SET_LOAD_BALANCER_POLICIES_OF_LISTENER_TEMPLATE)
SET_LOAD_BALANCER_POLICIES_OF_LISTENER_TEMPLATE
)
return template.render()
def set_load_balancer_policies_for_backend_server(self):
load_balancer_name = self.querystring.get('LoadBalancerName')[0]
load_balancer_name = self.querystring.get("LoadBalancerName")[0]
load_balancer = self.elb_backend.get_load_balancer(load_balancer_name)
instance_port = int(self.querystring.get('InstancePort')[0])
instance_port = int(self.querystring.get("InstancePort")[0])
mb_backend = [b for b in load_balancer.backends if int(
b.instance_port) == instance_port]
mb_backend = [
b for b in load_balancer.backends if int(b.instance_port) == instance_port
]
if mb_backend:
policies = self._get_multi_param('PolicyNames.member')
policies = self._get_multi_param("PolicyNames.member")
self.elb_backend.set_load_balancer_policies_of_backend_server(
load_balancer_name, instance_port, policies)
load_balancer_name, instance_port, policies
)
# else: explode?
template = self.response_template(
SET_LOAD_BALANCER_POLICIES_FOR_BACKEND_SERVER_TEMPLATE)
SET_LOAD_BALANCER_POLICIES_FOR_BACKEND_SERVER_TEMPLATE
)
return template.render()
def describe_instance_health(self):
load_balancer_name = self._get_param('LoadBalancerName')
load_balancer_name = self._get_param("LoadBalancerName")
provided_instance_ids = [
list(param.values())[0]
for param in self._get_list_prefix('Instances.member')
for param in self._get_list_prefix("Instances.member")
]
registered_instances_id = self.elb_backend.get_load_balancer(
load_balancer_name).instance_ids
load_balancer_name
).instance_ids
if len(provided_instance_ids) == 0:
provided_instance_ids = registered_instances_id
template = self.response_template(DESCRIBE_INSTANCE_HEALTH_TEMPLATE)
instances = []
for instance_id in provided_instance_ids:
state = "InService" \
if instance_id in registered_instances_id\
else "Unknown"
state = "InService" if instance_id in registered_instances_id else "Unknown"
instances.append({"InstanceId": instance_id, "State": state})
return template.render(instances=instances)
@ -293,17 +312,18 @@ class ELBResponse(BaseResponse):
def remove_tags(self):
for key, value in self.querystring.items():
if "LoadBalancerNames.member" in key:
number = key.split('.')[2]
number = key.split(".")[2]
load_balancer_name = self._get_param(
'LoadBalancerNames.member.{0}'.format(number))
"LoadBalancerNames.member.{0}".format(number)
)
elb = self.elb_backend.get_load_balancer(load_balancer_name)
if not elb:
raise LoadBalancerNotFoundError(load_balancer_name)
key = 'Tag.member.{0}.Key'.format(number)
key = "Tag.member.{0}.Key".format(number)
for t_key, t_val in self.querystring.items():
if t_key.startswith('Tags.member.'):
if t_key.split('.')[3] == 'Key':
if t_key.startswith("Tags.member."):
if t_key.split(".")[3] == "Key":
elb.remove_tag(t_val[0])
template = self.response_template(REMOVE_TAGS_TEMPLATE)
@ -313,9 +333,10 @@ class ELBResponse(BaseResponse):
elbs = []
for key, value in self.querystring.items():
if "LoadBalancerNames.member" in key:
number = key.split('.')[2]
number = key.split(".")[2]
load_balancer_name = self._get_param(
'LoadBalancerNames.member.{0}'.format(number))
"LoadBalancerNames.member.{0}".format(number)
)
elb = self.elb_backend.get_load_balancer(load_balancer_name)
if not elb:
raise LoadBalancerNotFoundError(load_balancer_name)
@ -329,10 +350,10 @@ class ELBResponse(BaseResponse):
tag_keys = []
for t_key, t_val in sorted(self.querystring.items()):
if t_key.startswith('Tags.member.'):
if t_key.split('.')[3] == 'Key':
if t_key.startswith("Tags.member."):
if t_key.split(".")[3] == "Key":
tag_keys.extend(t_val)
elif t_key.split('.')[3] == 'Value':
elif t_key.split(".")[3] == "Value":
tag_values.extend(t_val)
counts = {}

View file

@ -16,29 +16,25 @@ def api_version_elb_backend(*args, **kwargs):
"""
request = args[0]
if hasattr(request, 'values'):
if hasattr(request, "values"):
# boto3
version = request.values.get('Version')
version = request.values.get("Version")
elif isinstance(request, AWSPreparedRequest):
# boto in-memory
version = parse_qs(request.body).get('Version')[0]
version = parse_qs(request.body).get("Version")[0]
else:
# boto in server mode
request.parse_request()
version = request.querystring.get('Version')[0]
version = request.querystring.get("Version")[0]
if '2012-06-01' == version:
if "2012-06-01" == version:
return ELBResponse.dispatch(*args, **kwargs)
elif '2015-12-01' == version:
elif "2015-12-01" == version:
return ELBV2Response.dispatch(*args, **kwargs)
else:
raise Exception("Unknown ELB API version: {}".format(version))
url_bases = [
"https?://elasticloadbalancing.(.+).amazonaws.com",
]
url_bases = ["https?://elasticloadbalancing.(.+).amazonaws.com"]
url_paths = {
'{0}/$': api_version_elb_backend,
}
url_paths = {"{0}/$": api_version_elb_backend}