From 9b77271d647c395d1148d4d1edf05a0807b36573 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Tue, 28 May 2019 19:24:41 +0300 Subject: [PATCH 01/13] fortigate firewall policy support --- capirca/aclgen.py | 9 +- capirca/lib/fortigate.py | 330 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 capirca/lib/fortigate.py diff --git a/capirca/aclgen.py b/capirca/aclgen.py index 743709c3..0e9416b4 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -52,7 +52,7 @@ from capirca.lib import speedway from capirca.lib import srxlo from capirca.lib import windows_advfirewall - +from capirca.lib import fortigate FLAGS = flags.FLAGS flags.DEFINE_string( @@ -174,6 +174,7 @@ def RenderFile(input_file, output_directory, definitions, win_afw = False xacl = False paloalto = False + fcl = False try: conf = open(input_file).read() @@ -238,6 +239,8 @@ def RenderFile(input_file, output_directory, definitions, paloalto = copy.deepcopy(pol) if 'cloudarmor' in platforms: gca = copy.deepcopy(pol) + if 'fortigate' in platforms: + fcl = copy.deepcopy(pol) if not output_directory.endswith('/'): output_directory += '/' @@ -327,6 +330,10 @@ def RenderFile(input_file, output_directory, definitions, acl_obj = cloudarmor.CloudArmor(gca, exp_info) RenderACL(str(acl_obj), acl_obj.SUFFIX, output_directory, input_file, write_files) + if fcl: + acl_obj = fortigate.Fortigate(fcl, exp_info) + RenderACL(str(acl_obj), acl_obj.SUFFIX, output_directory, + input_file, write_files) # TODO(robankeny) add additional errors. except (juniper.Error, junipersrx.Error, cisco.Error, ipset.Error, iptables.Error, speedway.Error, pcap.Error, diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py new file mode 100644 index 00000000..b9a63aca --- /dev/null +++ b/capirca/lib/fortigate.py @@ -0,0 +1,330 @@ +# Copyright 2011 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Fortigate generator.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import datetime + +from capirca.lib import nacaddr +from absl import logging +from capirca.lib import aclgenerator + +_ACTION_TABLE = { + 'accept': 'accept', + 'deny': 'deny', + 'reject': 'deny', + 'reject-with-tcp-rst': 'deny', # tcp rst not supported +} + + +class UnsupportedFilterError(Exception): + pass + + +class FortiGateValueError(Exception): + pass + + +class FortiGateFindServiceError(Exception): + pass + + +class FortiGateDuplicateTermError(Exception): + pass + + +class FortigatePortMap(object): + _PORTS_TCP = { + 179: 'BGP', + 53: 'DNS', + 7: 'PING', + 79: 'FINGER', + 21: 'FTP', + 70: 'GOPHER', + 443: 'HTTPS', + 194: 'IRC', + 2049: 'NFS', + 119: 'NNTP', + 110: 'POP3', + 1723: 'PPTP', + 25: 'SMTP', + 22: 'SSH', + 517: 'TALK', + 23: 'TELNET', + 540: 'UUCP', + 80: 'HTTP', + 993: 'IMAPS', + 3389: 'RDP', + 3306: 'MYSQL', + 1433: 'MS-SQL', + 1812: 'RADIUS', + 995: 'POP3S', + 465: 'SMTPS', + 389: 'LDAP', + 69: 'TFTP' + + } + + _PORTS_UDP = { + 53: 'DNS', + 7: 'PING', + 500: 'IKE', + 2049: 'NFS', + 123: 'NTP', + 520: 'RIP', + 161: 'SNMP', + 162: 'snmptrap', + 514: 'SYSLOG', + 517: 'TALK', + 69: 'TFTP', + 37: 'TIMESTAMP', + 1812: 'RADIUS', + 67: 'DHCP' + + } + + _PROTO_MAP = { + 'icmp': 'ALL_ICMP', + 'gre': 'GRE', + 'ip': 'ALL', + 'tcp': _PORTS_TCP, + 'udp': _PORTS_UDP + } + + @staticmethod + def GetProtocol(protocol, port=None): + f_proto = FortigatePortMap._PROTO_MAP.get(protocol, None) + if f_proto is None: + raise FortiGateValueError('%r protocol is not supported by Fortigate, supported protocols = %r' % ( + protocol, FortigatePortMap._PROTO_MAP.keys())) + + if isinstance(f_proto, str): + return f_proto + elif port: + return f_proto[port] + + else: + raise FortiGateFindServiceError('failed to get service from %r protocol and %r port' % (protocol, port)) + + +class Term(aclgenerator.Term): + ALLOWED_PROTO_STRINGS = ['gre', 'icmp', 'ip', 'tcp', 'udp'] + COMMENT_MAX_WIDTH = 70 + + FW_ADDRESSES = [] + FW_SERVICES = [] + + _FW_DUP_CHECK = set() + + CURRENT_ID = 0 + + def __init__(self, term): + super(Term, self).__init__(term) + self._term = term + + self.id = type(self).CURRENT_ID + type(self).CURRENT_ID += 1 + + @staticmethod + def get_fw_addresses(): + Term.FW_ADDRESSES.extend([' ', 'end', ' ']) + return Term.FW_ADDRESSES + + @staticmethod + def get_fw_services(): + Term.FW_SERVICES.extend([' ', 'end', ' ']) + return Term.FW_SERVICES + + @staticmethod + def _get_addresses_name(addresses): + v4_addresses = [x.with_prefixlen for x in addresses if + not isinstance(x, nacaddr.IPv6)] + addresses = ' '.join(v4_addresses) + return addresses or 'all' + + def _get_services_string(self, protocol, ports): + + services = [] + if protocol and not ports: + services.append(FortigatePortMap.GetProtocol(protocol[0])) + for port in ports: + try: + service = FortigatePortMap.GetProtocol(protocol[0], port[0]) + except KeyError: + self._add_service_to_fw_services(protocol[0], port[0]) + service = str(port[0]) + services.append(service) + + return ' '.join(services) or 'ALL' + + def _add_address_to_fw_addresses(self, addr): + if addr in type(self)._FW_DUP_CHECK: + return + type(self).FW_ADDRESSES.extend(['\tedit %s' % addr, + '\t\tset subnet %s' % addr, + '\tnext']) + type(self)._FW_DUP_CHECK.add(addr) + + def _add_service_to_fw_services(self, protocol, service): + if service in type(self)._FW_DUP_CHECK: + return + + type(self).FW_SERVICES.extend(['\tedit %s' % service, + '\t\tset protocol TCP/UDP', + '\t\tset %s-portrange %s' % (protocol.lower(), service), + '\tnext']) + + type(self)._FW_DUP_CHECK.add(service) + + def _generate_address_names(self, *addresses): + for group in addresses: + for addr in group: + if addr and not isinstance(addr, nacaddr.IPv6): + self._add_address_to_fw_addresses(addr.with_prefixlen) + + def __str__(self): + lines = [] + + self._generate_address_names(self._term.destination_address, self._term.source_address) + # lines.extend(self.firewall_addresses) + + dest_addresses = self._get_addresses_name(self._term.destination_address) + src_addresses = self._get_addresses_name(self._term.source_address) + services = self._get_services_string(self._term.protocol, self._term.destination_port) + + lines.append('\t\tset comments %s' % self._term.name) + lines.append('\t\tset srcintf %s' % (self._term.source_interface or 'any')) + lines.append('\t\tset dstintf %s' % (self._term.destination_interface or 'any')) + lines.append('\t\tset dstaddr %s' % dest_addresses) + lines.append('\t\tset srcaddr %s' % src_addresses) + lines.append('\t\tset action %s' % _ACTION_TABLE.get(self._term.action[0])) + lines.append('\t\tset service %s' % services) + lines.append('\t\tset schedule always') + if self._term.logging: + lines.append('\t\tset logtraffic all') + + return '\n'.join(lines) + + +class Fortigate(aclgenerator.ACLGenerator): + """A cisco policy object.""" + + _PLATFORM = 'fortigate' + _DEFAULT_PROTOCOL = 'ALL' + SUFFIX = '.fcl' + # Protocols should be emitted as numbers. + _PROTO_INT = True + _TERM_REMARK = True + + def _BuildTokens(self): + """Build supported tokens for platform. + + Returns: + tuple containing both supported tokens and sub tokens + """ + supported_tokens, supported_sub_tokens = super(Fortigate, self)._BuildTokens() + + supported_tokens |= {'source_interface', + 'destination_interface', + 'logging'} + + supported_sub_tokens.update({'option': {'from_id'}, + # Warning, some of these are mapped + # differently. See _ACTION_TABLE + 'action': {'accept', 'deny', 'reject', + 'reject-with-tcp-rst'}}) + return supported_tokens, supported_sub_tokens + + def _TranslatePolicy(self, pol, exp_info): + self.fortigate_policies = [] + current_date = datetime.datetime.utcnow().date() + exp_info_date = current_date + datetime.timedelta(weeks=exp_info) + + # a mixed filter outputs both ipv4 and ipv6 acls in the same output file + + for header, terms in pol.filters: + if self._PLATFORM not in header.platforms: + continue + + filter_options = header.FilterOptions(self._PLATFORM) + + if (len(filter_options) < 2 or filter_options[0] != "from-id"): + raise UnsupportedFilterError( + "Fortigate Firewall filter arguments must specify from_id") + + from_id = filter_options[1] + Term.CURRENT_ID = int(from_id) + + self.verbose = True + if 'noverbose' in filter_options: + filter_options.remove('noverbose') + self.verbose = False + + term_dup_check = set() + + for term in terms: + filter_name = header.FilterName(self._PLATFORM) + if term.expiration: + if term.expiration <= exp_info_date: + logging.info('INFO: Term %s in policy %s expires ' + 'in less than two weeks.', term.name, filter_name) + if term.expiration <= current_date: + logging.warn('WARNING: Term %s in policy %s is expired and ' + 'will not be rendered.', term.name, filter_name) + continue + if term.name in term_dup_check: + raise FortiGateDuplicateTermError('You have a duplicate term: %s' % + term.name) + term_dup_check.add(term.name) + + term.name = self.FixTermLength(term.name) + new_term = Term(term) + + self.fortigate_policies.append((header, term.name, new_term)) + + def _GetTargetByPolicyID(self, id): + return '\tedit %s' % id + + def __str__(self): + start_addresses = ['config firewall address'] + start_services = ['config firewall service custom'] + start_policies = ['config firewall policy'] + end = ['end'] + target_addresses = [] + target_services = [] + target_policies = [] + + for (header, filter_name, term) in self.fortigate_policies: + target_policies.append(self._GetTargetByPolicyID(term.id)) + + term_str = str(term) + + target_policies.append(term_str) + + target_policies += ['\tnext', ''] + target_addresses.extend(Term.get_fw_addresses()) + target_services.extend(Term.get_fw_services()) + + fw_addresses = start_addresses + target_addresses + fw_services = start_services + target_services + + target = fw_addresses + fw_services + start_policies + target_policies + end + + return '\n'.join(target) From 904448b9ffcb14dc5e7710306f1d4c8aa864a59d Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Tue, 28 May 2019 19:30:38 +0300 Subject: [PATCH 02/13] fix spacing after imports --- capirca/aclgen.py | 1 + 1 file changed, 1 insertion(+) diff --git a/capirca/aclgen.py b/capirca/aclgen.py index 0e9416b4..cc5e66c3 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -54,6 +54,7 @@ from capirca.lib import windows_advfirewall from capirca.lib import fortigate + FLAGS = flags.FLAGS flags.DEFINE_string( 'base_directory', From 7cb320e3c4ad70f73b9e1a3cd090bd75a24a3102 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Thu, 13 Jun 2019 01:06:12 +0300 Subject: [PATCH 03/13] added unittest and resolved code reviews --- capirca/aclgen.py | 2 +- capirca/lib/fortigate.py | 190 +++++++++++++++--------- capirca/policies.acl | 37 +++++ capirca/policies.fcl | 96 ++++++++++++ capirca/policies.jcl | 83 +++++++++++ capirca/policies.xml | 210 ++++++++++++++++++++++++++ tests/lib/fortigate_test.py | 287 ++++++++++++++++++++++++++++++++++++ 7 files changed, 832 insertions(+), 73 deletions(-) create mode 100644 capirca/policies.acl create mode 100644 capirca/policies.fcl create mode 100644 capirca/policies.jcl create mode 100644 capirca/policies.xml create mode 100644 tests/lib/fortigate_test.py diff --git a/capirca/aclgen.py b/capirca/aclgen.py index cc5e66c3..c63fbaea 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -339,7 +339,7 @@ def RenderFile(input_file, output_directory, definitions, except (juniper.Error, junipersrx.Error, cisco.Error, ipset.Error, iptables.Error, speedway.Error, pcap.Error, aclgenerator.Error, aruba.Error, nftables.Error, gce.Error, - cloudarmor.Error) as e: + cloudarmor.Error, fortigate.Error) as e: raise ACLGeneratorError( 'Error generating target ACL for %s:\n%s' % (input_file, e)) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index b9a63aca..e73853ca 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -1,4 +1,4 @@ -# Copyright 2011 Google Inc. All Rights Reserved. +# Copyright 2019 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,10 +21,11 @@ from __future__ import unicode_literals import datetime +import six +from capirca.lib import aclgenerator from capirca.lib import nacaddr from absl import logging -from capirca.lib import aclgenerator _ACTION_TABLE = { 'accept': 'accept', @@ -34,23 +35,32 @@ } -class UnsupportedFilterError(Exception): +class Error(Exception): + pass + + +class FilterError(Error): + pass + + +class FortiGateValueError(Error): pass -class FortiGateValueError(Exception): +class FortiGateFindServiceError(Error): pass -class FortiGateFindServiceError(Exception): +class FortiGateDuplicateTermError(Error): pass -class FortiGateDuplicateTermError(Exception): +class FortiGatePortDoesNotExist(Error): pass class FortigatePortMap(object): + """Map port numbers to service names""" _PORTS_TCP = { 179: 'BGP', 53: 'DNS', @@ -79,9 +89,7 @@ class FortigatePortMap(object): 465: 'SMTPS', 389: 'LDAP', 69: 'TFTP' - } - _PORTS_UDP = { 53: 'DNS', 7: 'PING', @@ -97,9 +105,7 @@ class FortigatePortMap(object): 37: 'TIMESTAMP', 1812: 'RADIUS', 67: 'DHCP' - } - _PROTO_MAP = { 'icmp': 'ALL_ICMP', 'gre': 'GRE', @@ -110,48 +116,87 @@ class FortigatePortMap(object): @staticmethod def GetProtocol(protocol, port=None): + """ + Converts a port number to a service name. + :param protocol: string representing protocol (tcp, udp, etc). + :param port: integer representing the port number. + :return: the service name of provided port-protocol + """ f_proto = FortigatePortMap._PROTO_MAP.get(protocol, None) if f_proto is None: - raise FortiGateValueError('%r protocol is not supported by Fortigate, supported protocols = %r' % ( - protocol, FortigatePortMap._PROTO_MAP.keys())) + raise FortiGateValueError( + '%r protocol is not supported by Fortigate, supported protocols = %r' % ( + protocol, FortigatePortMap._PROTO_MAP.keys() + ) + ) - if isinstance(f_proto, str): + if isinstance(f_proto, six.string_types): return f_proto elif port: - return f_proto[port] - + try: + return f_proto[port] + except KeyError: + raise FortiGatePortDoesNotExist else: - raise FortiGateFindServiceError('failed to get service from %r protocol and %r port' % (protocol, port)) + raise FortiGateFindServiceError( + 'failed to get service from %r protocol and %r port' % (protocol, port) + ) + + +class ObjectsContainer: + """a Container that holds service and network objects""" + + def __init__(self): + self._FW_ADDRESSES = [] + self._FW_SERVICES = [] + + self._FW_DUP_CHECK = set() + + def get_fw_addresses(self): + self._FW_ADDRESSES.extend([' ', 'end', ' ']) + return self._FW_ADDRESSES + + def get_fw_services(self): + self._FW_SERVICES.extend([' ', 'end', ' ']) + return self._FW_SERVICES + + def _add_address_to_fw_addresses(self, addr): + if addr in self._FW_DUP_CHECK: + return + self._FW_ADDRESSES.extend(['\tedit %s' % addr, + '\t\tset subnet %s' % addr, + '\tnext']) + self._FW_DUP_CHECK.add(addr) + + def _add_service_to_fw_services(self, protocol, service): + if service in self._FW_DUP_CHECK: + return + + self._FW_SERVICES.extend( + ['\tedit %s' % service, + '\t\tset protocol TCP/UDP', + '\t\tset %s-portrange %s' % (protocol.lower(), service), + '\tnext'] + ) + + self._FW_DUP_CHECK.add(service) class Term(aclgenerator.Term): + """Single Firewall Policy""" ALLOWED_PROTO_STRINGS = ['gre', 'icmp', 'ip', 'tcp', 'udp'] COMMENT_MAX_WIDTH = 70 - FW_ADDRESSES = [] - FW_SERVICES = [] - - _FW_DUP_CHECK = set() - CURRENT_ID = 0 - def __init__(self, term): + def __init__(self, term, object_container): super(Term, self).__init__(term) self._term = term + self._obj_container = object_container self.id = type(self).CURRENT_ID type(self).CURRENT_ID += 1 - @staticmethod - def get_fw_addresses(): - Term.FW_ADDRESSES.extend([' ', 'end', ' ']) - return Term.FW_ADDRESSES - - @staticmethod - def get_fw_services(): - Term.FW_SERVICES.extend([' ', 'end', ' ']) - return Term.FW_SERVICES - @staticmethod def _get_addresses_name(addresses): v4_addresses = [x.with_prefixlen for x in addresses if @@ -159,7 +204,22 @@ def _get_addresses_name(addresses): addresses = ' '.join(v4_addresses) return addresses or 'all' + @staticmethod + def clean_ports(src_ports, dest_ports): + all_ports = [] + if src_ports: + all_ports += src_ports + if dest_ports: + all_ports += dest_ports + return set(all_ports) + def _get_services_string(self, protocol, ports): + """ + get the service name if exist, if not create a service object and return the name + :param protocol: list of protocols + :param ports: list of ports + :return: + """ services = [] if protocol and not ports: @@ -167,47 +227,32 @@ def _get_services_string(self, protocol, ports): for port in ports: try: service = FortigatePortMap.GetProtocol(protocol[0], port[0]) - except KeyError: - self._add_service_to_fw_services(protocol[0], port[0]) + except FortiGatePortDoesNotExist: + self._obj_container._add_service_to_fw_services(protocol[0], port[0]) service = str(port[0]) services.append(service) return ' '.join(services) or 'ALL' - def _add_address_to_fw_addresses(self, addr): - if addr in type(self)._FW_DUP_CHECK: - return - type(self).FW_ADDRESSES.extend(['\tedit %s' % addr, - '\t\tset subnet %s' % addr, - '\tnext']) - type(self)._FW_DUP_CHECK.add(addr) - - def _add_service_to_fw_services(self, protocol, service): - if service in type(self)._FW_DUP_CHECK: - return - - type(self).FW_SERVICES.extend(['\tedit %s' % service, - '\t\tset protocol TCP/UDP', - '\t\tset %s-portrange %s' % (protocol.lower(), service), - '\tnext']) - - type(self)._FW_DUP_CHECK.add(service) - def _generate_address_names(self, *addresses): for group in addresses: for addr in group: if addr and not isinstance(addr, nacaddr.IPv6): - self._add_address_to_fw_addresses(addr.with_prefixlen) + self._obj_container._add_address_to_fw_addresses(addr.with_prefixlen) def __str__(self): lines = [] - self._generate_address_names(self._term.destination_address, self._term.source_address) + self._generate_address_names(self._term.destination_address, + self._term.source_address) # lines.extend(self.firewall_addresses) dest_addresses = self._get_addresses_name(self._term.destination_address) src_addresses = self._get_addresses_name(self._term.source_address) - services = self._get_services_string(self._term.protocol, self._term.destination_port) + all_ports = self.clean_ports(self._term.source_port, self._term.destination_port) + + services = self._get_services_string(self._term.protocol, + all_ports) lines.append('\t\tset comments %s' % self._term.name) lines.append('\t\tset srcintf %s' % (self._term.source_interface or 'any')) @@ -232,6 +277,11 @@ class Fortigate(aclgenerator.ACLGenerator): # Protocols should be emitted as numbers. _PROTO_INT = True _TERM_REMARK = True + _TERM_MAX_LENGTH = 1023 + + def __init__(self, *args, **kwargs): + self._obj_container = ObjectsContainer() + super(Fortigate, self).__init__(*args, **kwargs) def _BuildTokens(self): """Build supported tokens for platform. @@ -239,7 +289,8 @@ def _BuildTokens(self): Returns: tuple containing both supported tokens and sub tokens """ - supported_tokens, supported_sub_tokens = super(Fortigate, self)._BuildTokens() + supported_tokens, supported_sub_tokens = super(Fortigate, + self)._BuildTokens() supported_tokens |= {'source_interface', 'destination_interface', @@ -253,6 +304,7 @@ def _BuildTokens(self): return supported_tokens, supported_sub_tokens def _TranslatePolicy(self, pol, exp_info): + """Translate Capirca pol to fortigate pol""" self.fortigate_policies = [] current_date = datetime.datetime.utcnow().date() exp_info_date = current_date + datetime.timedelta(weeks=exp_info) @@ -265,18 +317,13 @@ def _TranslatePolicy(self, pol, exp_info): filter_options = header.FilterOptions(self._PLATFORM) - if (len(filter_options) < 2 or filter_options[0] != "from-id"): - raise UnsupportedFilterError( - "Fortigate Firewall filter arguments must specify from_id") + if (len(filter_options) < 2 or filter_options[0] != 'from-id'): + raise FilterError( + 'Fortigate Firewall filter arguments must specify from_id') from_id = filter_options[1] Term.CURRENT_ID = int(from_id) - self.verbose = True - if 'noverbose' in filter_options: - filter_options.remove('noverbose') - self.verbose = False - term_dup_check = set() for term in terms: @@ -294,13 +341,12 @@ def _TranslatePolicy(self, pol, exp_info): term.name) term_dup_check.add(term.name) - term.name = self.FixTermLength(term.name) - new_term = Term(term) + new_term = Term(term, self._obj_container) self.fortigate_policies.append((header, term.name, new_term)) - def _GetTargetByPolicyID(self, id): - return '\tedit %s' % id + def _GetTargetByPolicyID(self, id_): + return '\tedit %s' % id_ def __str__(self): start_addresses = ['config firewall address'] @@ -311,7 +357,7 @@ def __str__(self): target_services = [] target_policies = [] - for (header, filter_name, term) in self.fortigate_policies: + for (_, filter_name, term) in self.fortigate_policies: target_policies.append(self._GetTargetByPolicyID(term.id)) term_str = str(term) @@ -319,8 +365,8 @@ def __str__(self): target_policies.append(term_str) target_policies += ['\tnext', ''] - target_addresses.extend(Term.get_fw_addresses()) - target_services.extend(Term.get_fw_services()) + target_addresses.extend(self._obj_container.get_fw_addresses()) + target_services.extend(self._obj_container.get_fw_services()) fw_addresses = start_addresses + target_addresses fw_services = start_services + target_services diff --git a/capirca/policies.acl b/capirca/policies.acl new file mode 100644 index 00000000..e2e096f7 --- /dev/null +++ b/capirca/policies.acl @@ -0,0 +1,37 @@ +! $Id:$ +! $Date:$ +! $Revision:$ +no ip access-list extended edge-filter +ip access-list extended edge-filter + remark $Id:$ + + + remark allow-https-web + permit tcp any host 200.1.1.1 eq 80 + permit tcp any host 200.1.1.1 eq 443 + permit tcp any host 200.1.1.2 eq 80 + permit tcp any host 200.1.1.2 eq 443 + + + remark customers-policy + deny tcp host 4.71.113.2 host 4.71.113.2 eq 80 + deny tcp host 4.71.113.2 host 4.71.113.2 eq 443 + + + remark good-term-2 + permit tcp any eq 993 host 9.9.9.9 + + + remark customers-policy2 + deny tcp host 9.9.9.9 192.168.1.0 0.0.0.255 eq 80 log + deny tcp host 9.9.9.9 192.168.1.0 0.0.0.255 eq 443 log + + + remark deny-any-any + deny icmp any any + + + remark accept-any-any + permit udp any any eq 43 + +exit diff --git a/capirca/policies.fcl b/capirca/policies.fcl new file mode 100644 index 00000000..6ad5d91d --- /dev/null +++ b/capirca/policies.fcl @@ -0,0 +1,96 @@ +config firewall address + edit 200.1.1.1/32 + set subnet 200.1.1.1/32 + next + edit 200.1.1.2/32 + set subnet 200.1.1.2/32 + next + edit 4.71.113.2/32 + set subnet 4.71.113.2/32 + next + edit 9.9.9.9/32 + set subnet 9.9.9.9/32 + next + edit 192.168.1.0/24 + set subnet 192.168.1.0/24 + next + +end + +config firewall service custom + edit 43 + set protocol TCP/UDP + set udp-portrange 43 + next + +end + +config firewall policy + edit 2 + set comments allow-https-web + set srcintf any + set dstintf any + set dstaddr 200.1.1.1/32 200.1.1.2/32 + set srcaddr all + set action accept + set service HTTP HTTPS + set schedule always + next + + edit 3 + set comments customers-policy + set srcintf any + set dstintf any + set dstaddr 4.71.113.2/32 + set srcaddr 4.71.113.2/32 + set action deny + set service HTTP HTTPS + set schedule always + next + + edit 4 + set comments good-term-2 + set srcintf any + set dstintf any + set dstaddr 9.9.9.9/32 + set srcaddr all + set action accept + set service IMAPS + set schedule always + next + + edit 5 + set comments customers-policy2 + set srcintf any + set dstintf any + set dstaddr 192.168.1.0/24 + set srcaddr 9.9.9.9/32 + set action deny + set service HTTP HTTPS + set schedule always + set logtraffic all + next + + edit 6 + set comments deny-any-any + set srcintf any + set dstintf any + set dstaddr all + set srcaddr all + set action deny + set service ALL_ICMP + set schedule always + next + + edit 7 + set comments accept-any-any + set srcintf any + set dstintf any + set dstaddr all + set srcaddr all + set action accept + set service 43 + set schedule always + next + +end \ No newline at end of file diff --git a/capirca/policies.jcl b/capirca/policies.jcl new file mode 100644 index 00000000..175c0adb --- /dev/null +++ b/capirca/policies.jcl @@ -0,0 +1,83 @@ +firewall { + family inet { + replace: + /* + ** $Id:$ + ** $Date:$ + ** $Revision:$ + ** + */ + filter edge-filter { + interface-specific; + term allow-https-web { + from { + destination-address { + /* Example web server 1 */ + 200.1.1.1/32; + /* Example web server 2 */ + 200.1.1.2/32; + } + protocol tcp; + destination-port [ 80 443 ]; + } + then accept; + } + term customers-policy { + from { + source-address { + 4.71.113.2/32; + } + destination-address { + 4.71.113.2/32; + } + protocol tcp; + destination-port [ 80 443 ]; + } + then { + discard; + } + } + term good-term-2 { + from { + destination-address { + 9.9.9.9/32; + } + protocol tcp; + source-port 993; + } + then accept; + } + term customers-policy2 { + from { + source-address { + 9.9.9.9/32; + } + destination-address { + 192.168.1.0/24; + } + protocol tcp; + destination-port [ 80 443 ]; + } + then { + syslog; + discard; + } + } + term deny-any-any { + from { + protocol icmp; + } + then { + discard; + } + } + term accept-any-any { + from { + protocol udp; + destination-port 43; + } + then accept; + } + } + } +} diff --git a/capirca/policies.xml b/capirca/policies.xml new file mode 100644 index 00000000..cb33ac57 --- /dev/null +++ b/capirca/policies.xml @@ -0,0 +1,210 @@ + + + + + + + + + + + + + + + 80, 443 + + + + + + + 43 + + + + + + + + + + + + external + + + internal + + + any + + + WEB_SERVERS + + + service-allow-https-web-tcp + + allow + + any + + + + + external + + + internal + + + CUSTOMERS + + + CUSTOMERS + + + service-allow-https-web-tcp + + deny + + any + + + + + external + + + internal + + + any + + + ALI_HOME + + + any + + allow + + any + + + + + external + + + internal + + + ALI_HOME + + + CRM + + + service-allow-https-web-tcp + + deny + + any + + + + + external + + + internal + + + any + + + any + + + application-default + + deny + + ping + + + + + external + + + internal + + + any + + + any + + + service-accept-any-any-udp + + allow + + any + + + + + + + + + ALI_HOME_0 + + + + + CRM_0 + + + + + CUSTOMERS_0 + + + + + WEB_SERVERS_0 + WEB_SERVERS_1 + + + +
+ + ALI_HOME_0 + 9.9.9.9/32 + + + CRM_0 + 192.168.1.0/24 + + + CUSTOMERS_0 + 4.71.113.2/32 + + + WEB_SERVERS_0 + 200.1.1.1/32 + + + WEB_SERVERS_1 + 200.1.1.2/32 + +
+
+
+
+
+
diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py new file mode 100644 index 00000000..98b5d307 --- /dev/null +++ b/tests/lib/fortigate_test.py @@ -0,0 +1,287 @@ +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unittest for fortigate policy rendering module.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import string +import re +import unittest +import six + +from capirca.lib import fortigate +from capirca.lib import nacaddr +from capirca.lib import naming +from capirca.lib import policy +import mock + +GOOD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate from-id 2 +} +""" + +BAD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate edge-filter +} +""" + +TERM_TEMPLATE = """ +term good-term-2 {{ + source-interface:: {src_interface} + destination-interface:: {dest_interface} + protocol:: {protocol} + destination-address:: {dest_addr} + destination-port:: {dest_port} + source-address:: {src_addr} + source-port:: {src_port} + action:: {action} + logging:: {logging} +}} +""" + + +class CustomFormatter(string.Formatter): + DEFAULT_VALUES = { + 'src_interface': 'wan1', + 'dest_interface': 'wan2', + 'protocol': 'tcp', + 'src_addr': 'SOME_HOST', + 'dest_addr': 'SOME_HOST', + 'src_port': 'HTTP', + 'dest_port': 'HTTP', + 'action': 'accept', + 'logging': 'true' + } + + def format(*args, **kwargs): + if 'remove_fields' in kwargs: + args = list(args) + for field in kwargs['remove_fields']: + remove_regex = '.*' + field + '.*' + args[1] = re.sub(remove_regex, '', args[1]) + + return string.Formatter.format(*args, **kwargs) + return string.Formatter.format(*args, **kwargs) + + def get_value(self, key, args, kwds): + try: + return kwds[key] + except KeyError: + return self.DEFAULT_VALUES[key] + + +EXP_INFO = 2 + + +class FortigateTest(unittest.TestCase): + def setUp(self): + self.naming = mock.create_autospec(naming.Naming) + + def get_addr_side_eff(host): + HOSTS = { + 'SOME_HOST': [nacaddr.IP('10.0.0.0/8')], + 'SOME_HOST2': [nacaddr.IP('20.0.0.0/8')] + } + return HOSTS[host] + + def get_port_side_eff(port, protocol): + HOSTS = { + 'HTTP': ['80'], + 'HTTPS': ['443'], + 'SSH': ['22'], + 'WHOIS': ['43'] + } + return HOSTS[port] + + self.naming.GetNetAddr.side_effect = get_addr_side_eff + self.naming.GetServiceByProto.side_effect = get_port_side_eff + self.fmt = CustomFormatter() + + def testGoodHeader(self): + term = self.fmt.format(TERM_TEMPLATE) + acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + term, + self.naming), EXP_INFO) + + expected_sig = 'edit 2' + + get_net_calls = [mock.call('SOME_HOST')] * 2 + get_server_by_proto_calls = [mock.call('HTTP', 'tcp')] * 2 + + self.assertTrue(expected_sig in str(acl), '[%s]' % str(acl)) + self.naming.GetNetAddr.assert_has_calls(get_net_calls) + self.naming.GetServiceByProto.assert_has_calls(get_server_by_proto_calls) + + def testBadHeader(self): + term = self.fmt.format(TERM_TEMPLATE) + parsed_p = policy.ParsePolicy(BAD_HEADER + term, + self.naming) + + self.assertRaises(fortigate.FilterError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testAction(self): + accept_term = self.fmt.format(TERM_TEMPLATE, action='accept') + deny_term = self.fmt.format(TERM_TEMPLATE, action='deny') + reject_term = self.fmt.format(TERM_TEMPLATE, action='reject') + + accept_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + accept_term, + self.naming), EXP_INFO) + deny_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + deny_term, + self.naming), EXP_INFO) + reject_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + reject_term, + self.naming), EXP_INFO) + + accept_sig = 'set action accept' + deny_sig = 'set action deny' + self.assertTrue(accept_sig in str(accept_acl), '[%s]' % str(accept_acl)) + self.assertTrue(deny_sig in str(deny_sig), '[%s]' % str(deny_acl)) + self.assertTrue(deny_sig in str(reject_acl), '[%s]' % str(reject_acl)) + + def testAddresses(self): + diff_addr_term = self.fmt.format(TERM_TEMPLATE, src_addr='SOME_HOST', + dest_addr='SOME_HOST2') + same_addr_term = self.fmt.format(TERM_TEMPLATE, src_addr='SOME_HOST2', + dest_addr='SOME_HOST2') + any_src_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('src_addr',)) + any_dest_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('dest_addr',)) + + diff_addr_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + diff_addr_term, + self.naming), EXP_INFO) + + same_addr_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + same_addr_term, + self.naming), EXP_INFO) + + any_src_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + any_src_term, + self.naming), EXP_INFO) + + any_dest_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + any_dest_term, + self.naming), EXP_INFO) + + src_sig = 'set srcaddr 10.0.0.0/8' + dest_sig = 'set dstaddr 20.0.0.0/8' + any_dest_sig = 'set dstaddr all' + any_src_sig = 'set srcaddr all' + + self.assertTrue(src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), + '[%s]' % str(diff_addr_acl)) + # [] check acl generate one 'set subnet' for dup addresses + self.assertEqual(str(same_addr_acl).count('set subnet'), 1) + self.assertTrue(any_src_sig in str(any_src_acl), '[%s]' % str(any_src_acl)) + self.assertTrue(any_dest_sig in str(any_dest_acl), '[%s]' % str(any_dest_acl)) + + def testServices(self): + dup_port_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', dest_port='HTTP') + diff_port_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', dest_port='HTTPS') + src_only_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', remove_fields=('dest_port',)) + icmp_term = self.fmt.format(TERM_TEMPLATE, protocol='icmp', remove_fields=('dest_port', 'src_port')) + ip_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('dest_port', 'src_port', 'protocol')) + custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') + + dup_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + dup_port_term, + self.naming), EXP_INFO) + diff_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + diff_port_term, + self.naming), EXP_INFO) + src_only_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + src_only_term, + self.naming), EXP_INFO) + icmp_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + icmp_term, + self.naming), EXP_INFO) + ip_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + ip_term, + self.naming), EXP_INFO) + custom_port_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + custom_port_term, + self.naming), EXP_INFO) + + dup_sig = 'set service HTTP\n' + diff_sig = 'set service HTTP HTTPS\n' + src_only_sig = dup_sig + icmp_sig = 'set service ALL_ICMP\n' + ip_sig = 'set service ALL\n' + custom_port_sig = 'config firewall service custom\n\tedit 43\n\t\t' \ + 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext' + + self.assertTrue(dup_sig in str(dup_acl), '[%s]' % str(dup_acl)) + self.assertTrue(diff_sig in str(diff_acl), '[%s]' % str(diff_acl)) + self.assertTrue(src_only_sig in str(src_only_acl), '[%s]' % str(src_only_acl)) + self.assertTrue(icmp_sig in str(icmp_acl), '[%s]' % str(icmp_acl)) + self.assertTrue(ip_sig in str(ip_acl), '[%s]' % str(ip_acl)) + self.assertTrue(custom_port_sig in str(custom_port_acl), '[%s]' % str(custom_port_acl)) + + def testInterfaces(self): + no_interfaces_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('src_interface', 'dest_interface')) + src_only_int_term = self.fmt.format(TERM_TEMPLATE, src_interface='wan1', remove_fields=('dest_interface',)) + dest_only_int_term = self.fmt.format(TERM_TEMPLATE, dest_interface='wan2', remove_fields=('src_interface',)) + both_interfaces_term = self.fmt.format(TERM_TEMPLATE, src_interface='wan1', dest_interface='wan2', ) + + no_interfaces_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, + self.naming), EXP_INFO) + src_only_int_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + src_only_int_term, + self.naming), EXP_INFO) + dest_only_int_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, + self.naming), EXP_INFO) + both_interfaces_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, + self.naming), EXP_INFO) + + no_interfaces_sig = 'set srcintf any\n\t\tset dstintf any' + src_int_only_sig = 'set srcintf wan1\n\t\tset dstintf any' + dest_int_only_sig = 'set srcintf any\n\t\tset dstintf wan2' + both_interfaces_sig = 'set srcintf wan1\n\t\tset dstintf wan2' + + self.assertTrue(no_interfaces_sig in str(no_interfaces_acl), '[%s]' % str(no_interfaces_acl)) + self.assertTrue(src_int_only_sig in str(src_only_int_acl), '[%s]' % str(src_only_int_acl)) + self.assertTrue(dest_int_only_sig in str(dest_only_int_acl), '[%s]' % str(dest_only_int_acl)) + self.assertTrue(both_interfaces_sig in str(both_interfaces_acl), '[%s]' % str(both_interfaces_acl)) + + def testLogging(self): + log_term = self.fmt.format(TERM_TEMPLATE, logging='true') + no_log_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('logging',)) + + log_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + log_term, + self.naming), EXP_INFO) + no_log_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + no_log_term, + self.naming), EXP_INFO) + + log_sig = 'set logtraffic all' + + self.assertTrue(log_sig in str(log_acl), '[%s]' % str(log_acl)) + self.assertTrue(log_sig not in str(no_log_term), '[%s]' % str(no_log_acl)) + + def testDuplicateTermError(self): + term = self.fmt.format(TERM_TEMPLATE, logging='true') + duplicate_terms = term + term + parsed_p = policy.ParsePolicy(GOOD_HEADER + duplicate_terms, + self.naming) + + self.assertRaises(fortigate.FortiGateDuplicateTermError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testPortMap(self): + port_map = fortigate.FortigatePortMap() + self.assertEqual('SSH', port_map.GetProtocol('tcp', 22)) + self.assertRaises(fortigate.FortiGatePortDoesNotExist, + port_map.GetProtocol, + 'tcp', 5000) + self.assertRaises(fortigate.FortiGateValueError, + port_map.GetProtocol, + 'bad_proto', 22) From 90c224c4dbae2a2ffde88bfd1bec1f41706a0b57 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Thu, 13 Jun 2019 01:06:32 +0300 Subject: [PATCH 04/13] added unittest and resolved code reviews --- capirca/policies.acl | 37 -------- capirca/policies.fcl | 96 -------------------- capirca/policies.jcl | 83 ----------------- capirca/policies.xml | 210 ------------------------------------------- 4 files changed, 426 deletions(-) delete mode 100644 capirca/policies.acl delete mode 100644 capirca/policies.fcl delete mode 100644 capirca/policies.jcl delete mode 100644 capirca/policies.xml diff --git a/capirca/policies.acl b/capirca/policies.acl deleted file mode 100644 index e2e096f7..00000000 --- a/capirca/policies.acl +++ /dev/null @@ -1,37 +0,0 @@ -! $Id:$ -! $Date:$ -! $Revision:$ -no ip access-list extended edge-filter -ip access-list extended edge-filter - remark $Id:$ - - - remark allow-https-web - permit tcp any host 200.1.1.1 eq 80 - permit tcp any host 200.1.1.1 eq 443 - permit tcp any host 200.1.1.2 eq 80 - permit tcp any host 200.1.1.2 eq 443 - - - remark customers-policy - deny tcp host 4.71.113.2 host 4.71.113.2 eq 80 - deny tcp host 4.71.113.2 host 4.71.113.2 eq 443 - - - remark good-term-2 - permit tcp any eq 993 host 9.9.9.9 - - - remark customers-policy2 - deny tcp host 9.9.9.9 192.168.1.0 0.0.0.255 eq 80 log - deny tcp host 9.9.9.9 192.168.1.0 0.0.0.255 eq 443 log - - - remark deny-any-any - deny icmp any any - - - remark accept-any-any - permit udp any any eq 43 - -exit diff --git a/capirca/policies.fcl b/capirca/policies.fcl deleted file mode 100644 index 6ad5d91d..00000000 --- a/capirca/policies.fcl +++ /dev/null @@ -1,96 +0,0 @@ -config firewall address - edit 200.1.1.1/32 - set subnet 200.1.1.1/32 - next - edit 200.1.1.2/32 - set subnet 200.1.1.2/32 - next - edit 4.71.113.2/32 - set subnet 4.71.113.2/32 - next - edit 9.9.9.9/32 - set subnet 9.9.9.9/32 - next - edit 192.168.1.0/24 - set subnet 192.168.1.0/24 - next - -end - -config firewall service custom - edit 43 - set protocol TCP/UDP - set udp-portrange 43 - next - -end - -config firewall policy - edit 2 - set comments allow-https-web - set srcintf any - set dstintf any - set dstaddr 200.1.1.1/32 200.1.1.2/32 - set srcaddr all - set action accept - set service HTTP HTTPS - set schedule always - next - - edit 3 - set comments customers-policy - set srcintf any - set dstintf any - set dstaddr 4.71.113.2/32 - set srcaddr 4.71.113.2/32 - set action deny - set service HTTP HTTPS - set schedule always - next - - edit 4 - set comments good-term-2 - set srcintf any - set dstintf any - set dstaddr 9.9.9.9/32 - set srcaddr all - set action accept - set service IMAPS - set schedule always - next - - edit 5 - set comments customers-policy2 - set srcintf any - set dstintf any - set dstaddr 192.168.1.0/24 - set srcaddr 9.9.9.9/32 - set action deny - set service HTTP HTTPS - set schedule always - set logtraffic all - next - - edit 6 - set comments deny-any-any - set srcintf any - set dstintf any - set dstaddr all - set srcaddr all - set action deny - set service ALL_ICMP - set schedule always - next - - edit 7 - set comments accept-any-any - set srcintf any - set dstintf any - set dstaddr all - set srcaddr all - set action accept - set service 43 - set schedule always - next - -end \ No newline at end of file diff --git a/capirca/policies.jcl b/capirca/policies.jcl deleted file mode 100644 index 175c0adb..00000000 --- a/capirca/policies.jcl +++ /dev/null @@ -1,83 +0,0 @@ -firewall { - family inet { - replace: - /* - ** $Id:$ - ** $Date:$ - ** $Revision:$ - ** - */ - filter edge-filter { - interface-specific; - term allow-https-web { - from { - destination-address { - /* Example web server 1 */ - 200.1.1.1/32; - /* Example web server 2 */ - 200.1.1.2/32; - } - protocol tcp; - destination-port [ 80 443 ]; - } - then accept; - } - term customers-policy { - from { - source-address { - 4.71.113.2/32; - } - destination-address { - 4.71.113.2/32; - } - protocol tcp; - destination-port [ 80 443 ]; - } - then { - discard; - } - } - term good-term-2 { - from { - destination-address { - 9.9.9.9/32; - } - protocol tcp; - source-port 993; - } - then accept; - } - term customers-policy2 { - from { - source-address { - 9.9.9.9/32; - } - destination-address { - 192.168.1.0/24; - } - protocol tcp; - destination-port [ 80 443 ]; - } - then { - syslog; - discard; - } - } - term deny-any-any { - from { - protocol icmp; - } - then { - discard; - } - } - term accept-any-any { - from { - protocol udp; - destination-port 43; - } - then accept; - } - } - } -} diff --git a/capirca/policies.xml b/capirca/policies.xml deleted file mode 100644 index cb33ac57..00000000 --- a/capirca/policies.xml +++ /dev/null @@ -1,210 +0,0 @@ - - - - - - - - - - - - - - - 80, 443 - - - - - - - 43 - - - - - - - - - - - - external - - - internal - - - any - - - WEB_SERVERS - - - service-allow-https-web-tcp - - allow - - any - - - - - external - - - internal - - - CUSTOMERS - - - CUSTOMERS - - - service-allow-https-web-tcp - - deny - - any - - - - - external - - - internal - - - any - - - ALI_HOME - - - any - - allow - - any - - - - - external - - - internal - - - ALI_HOME - - - CRM - - - service-allow-https-web-tcp - - deny - - any - - - - - external - - - internal - - - any - - - any - - - application-default - - deny - - ping - - - - - external - - - internal - - - any - - - any - - - service-accept-any-any-udp - - allow - - any - - - - - - - - - ALI_HOME_0 - - - - - CRM_0 - - - - - CUSTOMERS_0 - - - - - WEB_SERVERS_0 - WEB_SERVERS_1 - - - -
- - ALI_HOME_0 - 9.9.9.9/32 - - - CRM_0 - 192.168.1.0/24 - - - CUSTOMERS_0 - 4.71.113.2/32 - - - WEB_SERVERS_0 - 200.1.1.1/32 - - - WEB_SERVERS_1 - 200.1.1.2/32 - -
-
-
-
-
-
From 6224973402a08b06d8af9c684bb6d30749971a33 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Thu, 13 Jun 2019 01:08:43 +0300 Subject: [PATCH 05/13] fix code style --- capirca/lib/fortigate.py | 93 +++++++++++++++++++------------------ tests/lib/fortigate_test.py | 32 ++++++------- 2 files changed, 63 insertions(+), 62 deletions(-) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index e73853ca..69fcf1ed 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -27,6 +27,7 @@ from capirca.lib import nacaddr from absl import logging + _ACTION_TABLE = { 'accept': 'accept', 'deny': 'deny', @@ -62,56 +63,56 @@ class FortiGatePortDoesNotExist(Error): class FortigatePortMap(object): """Map port numbers to service names""" _PORTS_TCP = { - 179: 'BGP', - 53: 'DNS', - 7: 'PING', - 79: 'FINGER', - 21: 'FTP', - 70: 'GOPHER', - 443: 'HTTPS', - 194: 'IRC', - 2049: 'NFS', - 119: 'NNTP', - 110: 'POP3', - 1723: 'PPTP', - 25: 'SMTP', - 22: 'SSH', - 517: 'TALK', - 23: 'TELNET', - 540: 'UUCP', - 80: 'HTTP', - 993: 'IMAPS', - 3389: 'RDP', - 3306: 'MYSQL', - 1433: 'MS-SQL', - 1812: 'RADIUS', - 995: 'POP3S', - 465: 'SMTPS', - 389: 'LDAP', - 69: 'TFTP' + 179: 'BGP', + 53: 'DNS', + 7: 'PING', + 79: 'FINGER', + 21: 'FTP', + 70: 'GOPHER', + 443: 'HTTPS', + 194: 'IRC', + 2049: 'NFS', + 119: 'NNTP', + 110: 'POP3', + 1723: 'PPTP', + 25: 'SMTP', + 22: 'SSH', + 517: 'TALK', + 23: 'TELNET', + 540: 'UUCP', + 80: 'HTTP', + 993: 'IMAPS', + 3389: 'RDP', + 3306: 'MYSQL', + 1433: 'MS-SQL', + 1812: 'RADIUS', + 995: 'POP3S', + 465: 'SMTPS', + 389: 'LDAP', + 69: 'TFTP' } _PORTS_UDP = { - 53: 'DNS', - 7: 'PING', - 500: 'IKE', - 2049: 'NFS', - 123: 'NTP', - 520: 'RIP', - 161: 'SNMP', - 162: 'snmptrap', - 514: 'SYSLOG', - 517: 'TALK', - 69: 'TFTP', - 37: 'TIMESTAMP', - 1812: 'RADIUS', - 67: 'DHCP' + 53: 'DNS', + 7: 'PING', + 500: 'IKE', + 2049: 'NFS', + 123: 'NTP', + 520: 'RIP', + 161: 'SNMP', + 162: 'snmptrap', + 514: 'SYSLOG', + 517: 'TALK', + 69: 'TFTP', + 37: 'TIMESTAMP', + 1812: 'RADIUS', + 67: 'DHCP' } _PROTO_MAP = { - 'icmp': 'ALL_ICMP', - 'gre': 'GRE', - 'ip': 'ALL', - 'tcp': _PORTS_TCP, - 'udp': _PORTS_UDP + 'icmp': 'ALL_ICMP', + 'gre': 'GRE', + 'ip': 'ALL', + 'tcp': _PORTS_TCP, + 'udp': _PORTS_UDP } @staticmethod diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 98b5d307..801d53db 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -22,7 +22,6 @@ import string import re import unittest -import six from capirca.lib import fortigate from capirca.lib import nacaddr @@ -30,6 +29,7 @@ from capirca.lib import policy import mock + GOOD_HEADER = """ header { comment:: "this is a test acl" @@ -61,15 +61,15 @@ class CustomFormatter(string.Formatter): DEFAULT_VALUES = { - 'src_interface': 'wan1', - 'dest_interface': 'wan2', - 'protocol': 'tcp', - 'src_addr': 'SOME_HOST', - 'dest_addr': 'SOME_HOST', - 'src_port': 'HTTP', - 'dest_port': 'HTTP', - 'action': 'accept', - 'logging': 'true' + 'src_interface': 'wan1', + 'dest_interface': 'wan2', + 'protocol': 'tcp', + 'src_addr': 'SOME_HOST', + 'dest_addr': 'SOME_HOST', + 'src_port': 'HTTP', + 'dest_port': 'HTTP', + 'action': 'accept', + 'logging': 'true' } def format(*args, **kwargs): @@ -98,17 +98,17 @@ def setUp(self): def get_addr_side_eff(host): HOSTS = { - 'SOME_HOST': [nacaddr.IP('10.0.0.0/8')], - 'SOME_HOST2': [nacaddr.IP('20.0.0.0/8')] + 'SOME_HOST': [nacaddr.IP('10.0.0.0/8')], + 'SOME_HOST2': [nacaddr.IP('20.0.0.0/8')] } return HOSTS[host] def get_port_side_eff(port, protocol): HOSTS = { - 'HTTP': ['80'], - 'HTTPS': ['443'], - 'SSH': ['22'], - 'WHOIS': ['43'] + 'HTTP': ['80'], + 'HTTPS': ['443'], + 'SSH': ['22'], + 'WHOIS': ['43'] } return HOSTS[port] From 0662b9c49b3e5b29ac7573f2be4521fb39b18a89 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Fri, 21 Jun 2019 23:26:54 +0300 Subject: [PATCH 06/13] eleminate long lines --- capirca/aclgen.py | 2 +- capirca/lib/fortigate.py | 16 ++- tests/lib/fortigate_test.py | 273 +++++++++++++++++++++++++----------- 3 files changed, 200 insertions(+), 91 deletions(-) diff --git a/capirca/aclgen.py b/capirca/aclgen.py index c63fbaea..6c864a71 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -37,6 +37,7 @@ from capirca.lib import ciscoasa from capirca.lib import ciscoxr from capirca.lib import cloudarmor +from capirca.lib import fortigate from capirca.lib import gce from capirca.lib import ipset from capirca.lib import iptables @@ -52,7 +53,6 @@ from capirca.lib import speedway from capirca.lib import srxlo from capirca.lib import windows_advfirewall -from capirca.lib import fortigate FLAGS = flags.FLAGS diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index 69fcf1ed..d28057fe 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -23,9 +23,9 @@ import datetime import six +from absl import logging from capirca.lib import aclgenerator from capirca.lib import nacaddr -from absl import logging _ACTION_TABLE = { @@ -154,14 +154,17 @@ def __init__(self): self._FW_DUP_CHECK = set() def get_fw_addresses(self): + """return the collected addresses""" self._FW_ADDRESSES.extend([' ', 'end', ' ']) return self._FW_ADDRESSES def get_fw_services(self): + """return the collected services""" self._FW_SERVICES.extend([' ', 'end', ' ']) return self._FW_SERVICES def _add_address_to_fw_addresses(self, addr): + """add address to address store""" if addr in self._FW_DUP_CHECK: return self._FW_ADDRESSES.extend(['\tedit %s' % addr, @@ -170,6 +173,7 @@ def _add_address_to_fw_addresses(self, addr): self._FW_DUP_CHECK.add(addr) def _add_service_to_fw_services(self, protocol, service): + """add service to services store""" if service in self._FW_DUP_CHECK: return @@ -195,11 +199,12 @@ def __init__(self, term, object_container): self._term = term self._obj_container = object_container - self.id = type(self).CURRENT_ID + self.id_ = type(self).CURRENT_ID type(self).CURRENT_ID += 1 @staticmethod def _get_addresses_name(addresses): + """return the addresses or 'all' if no addresses specified""" v4_addresses = [x.with_prefixlen for x in addresses if not isinstance(x, nacaddr.IPv6)] addresses = ' '.join(v4_addresses) @@ -207,6 +212,7 @@ def _get_addresses_name(addresses): @staticmethod def clean_ports(src_ports, dest_ports): + """return a set() of src and dest ports""" all_ports = [] if src_ports: all_ports += src_ports @@ -236,6 +242,7 @@ def _get_services_string(self, protocol, ports): return ' '.join(services) or 'ALL' def _generate_address_names(self, *addresses): + """this will generate the addresses names (object-network names)""" for group in addresses: for addr in group: if addr and not isinstance(addr, nacaddr.IPv6): @@ -246,7 +253,6 @@ def __str__(self): self._generate_address_names(self._term.destination_address, self._term.source_address) - # lines.extend(self.firewall_addresses) dest_addresses = self._get_addresses_name(self._term.destination_address) src_addresses = self._get_addresses_name(self._term.source_address) @@ -358,8 +364,8 @@ def __str__(self): target_services = [] target_policies = [] - for (_, filter_name, term) in self.fortigate_policies: - target_policies.append(self._GetTargetByPolicyID(term.id)) + for (_, _, term) in self.fortigate_policies: + target_policies.append(self._GetTargetByPolicyID(term.id_)) term_str = str(term) diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 801d53db..ef576361 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -97,20 +97,20 @@ def setUp(self): self.naming = mock.create_autospec(naming.Naming) def get_addr_side_eff(host): - HOSTS = { + hosts = { 'SOME_HOST': [nacaddr.IP('10.0.0.0/8')], 'SOME_HOST2': [nacaddr.IP('20.0.0.0/8')] } - return HOSTS[host] + return hosts[host] def get_port_side_eff(port, protocol): - HOSTS = { + hosts = { 'HTTP': ['80'], 'HTTPS': ['443'], 'SSH': ['22'], 'WHOIS': ['43'] } - return HOSTS[port] + return hosts[port] self.naming.GetNetAddr.side_effect = get_addr_side_eff self.naming.GetServiceByProto.side_effect = get_port_side_eff @@ -145,71 +145,128 @@ def testAction(self): deny_term = self.fmt.format(TERM_TEMPLATE, action='deny') reject_term = self.fmt.format(TERM_TEMPLATE, action='reject') - accept_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + accept_term, - self.naming), EXP_INFO) - deny_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + deny_term, - self.naming), EXP_INFO) - reject_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + reject_term, - self.naming), EXP_INFO) + accept_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + accept_term, + self.naming), EXP_INFO + ) + deny_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + deny_term, + self.naming), EXP_INFO + ) + reject_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + reject_term, + self.naming), EXP_INFO + ) accept_sig = 'set action accept' deny_sig = 'set action deny' - self.assertTrue(accept_sig in str(accept_acl), '[%s]' % str(accept_acl)) - self.assertTrue(deny_sig in str(deny_sig), '[%s]' % str(deny_acl)) - self.assertTrue(deny_sig in str(reject_acl), '[%s]' % str(reject_acl)) + self.assertTrue( + accept_sig in str(accept_acl), '[%s]' % str(accept_acl) + ) + self.assertTrue( + deny_sig in str(deny_sig), '[%s]' % str(deny_acl) + ) + self.assertTrue( + deny_sig in str(reject_acl), '[%s]' % str(reject_acl) + ) def testAddresses(self): - diff_addr_term = self.fmt.format(TERM_TEMPLATE, src_addr='SOME_HOST', - dest_addr='SOME_HOST2') - same_addr_term = self.fmt.format(TERM_TEMPLATE, src_addr='SOME_HOST2', - dest_addr='SOME_HOST2') - any_src_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('src_addr',)) - any_dest_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('dest_addr',)) - - diff_addr_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + diff_addr_term, - self.naming), EXP_INFO) - - same_addr_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + same_addr_term, - self.naming), EXP_INFO) - - any_src_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + any_src_term, - self.naming), EXP_INFO) - - any_dest_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + any_dest_term, - self.naming), EXP_INFO) + diff_addr_term = self.fmt.format( + TERM_TEMPLATE, src_addr='SOME_HOST', + dest_addr='SOME_HOST2' + ) + same_addr_term = self.fmt.format( + TERM_TEMPLATE, src_addr='SOME_HOST2', + dest_addr='SOME_HOST2' + ) + any_src_term = self.fmt.format( + TERM_TEMPLATE, remove_fields=('src_addr',) + ) + any_dest_term = self.fmt.format( + TERM_TEMPLATE, remove_fields=('dest_addr',) + ) + + diff_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + diff_addr_term, + self.naming), EXP_INFO + ) + + same_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + same_addr_term, + self.naming), EXP_INFO + ) + + any_src_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_src_term, + self.naming), EXP_INFO + ) + + any_dest_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_dest_term, + self.naming), EXP_INFO + ) src_sig = 'set srcaddr 10.0.0.0/8' dest_sig = 'set dstaddr 20.0.0.0/8' any_dest_sig = 'set dstaddr all' any_src_sig = 'set srcaddr all' - self.assertTrue(src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), - '[%s]' % str(diff_addr_acl)) + self.assertTrue( + src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), + '[%s]' % str(diff_addr_acl) + ) # [] check acl generate one 'set subnet' for dup addresses - self.assertEqual(str(same_addr_acl).count('set subnet'), 1) - self.assertTrue(any_src_sig in str(any_src_acl), '[%s]' % str(any_src_acl)) - self.assertTrue(any_dest_sig in str(any_dest_acl), '[%s]' % str(any_dest_acl)) + self.assertEqual( + str(same_addr_acl).count('set subnet'), 1 + ) + self.assertTrue( + any_src_sig in str(any_src_acl), '[%s]' % str(any_src_acl) + ) + self.assertTrue( + any_dest_sig in str(any_dest_acl), '[%s]' % str(any_dest_acl) + ) def testServices(self): - dup_port_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', dest_port='HTTP') - diff_port_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', dest_port='HTTPS') - src_only_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', remove_fields=('dest_port',)) - icmp_term = self.fmt.format(TERM_TEMPLATE, protocol='icmp', remove_fields=('dest_port', 'src_port')) - ip_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('dest_port', 'src_port', 'protocol')) + dup_port_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + dest_port='HTTP') + diff_port_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + dest_port='HTTPS') + src_only_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + remove_fields=('dest_port',)) + icmp_term = self.fmt.format(TERM_TEMPLATE, + protocol='icmp', + remove_fields=('dest_port', 'src_port')) + ip_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_port', 'src_port', 'protocol')) custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') - dup_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + dup_port_term, - self.naming), EXP_INFO) - diff_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + diff_port_term, - self.naming), EXP_INFO) - src_only_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + src_only_term, - self.naming), EXP_INFO) - icmp_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + icmp_term, - self.naming), EXP_INFO) - ip_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + ip_term, - self.naming), EXP_INFO) - custom_port_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + custom_port_term, - self.naming), EXP_INFO) + dup_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + dup_port_term, + self.naming), EXP_INFO + ) + diff_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + diff_port_term, + self.naming), EXP_INFO + ) + src_only_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + src_only_term, + self.naming), EXP_INFO + ) + icmp_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + icmp_term, + self.naming), EXP_INFO + ) + ip_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + ip_term, + self.naming), EXP_INFO + ) + custom_port_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + custom_port_term, + self.naming), EXP_INFO + ) dup_sig = 'set service HTTP\n' diff_sig = 'set service HTTP HTTPS\n' @@ -219,51 +276,97 @@ def testServices(self): custom_port_sig = 'config firewall service custom\n\tedit 43\n\t\t' \ 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext' - self.assertTrue(dup_sig in str(dup_acl), '[%s]' % str(dup_acl)) - self.assertTrue(diff_sig in str(diff_acl), '[%s]' % str(diff_acl)) - self.assertTrue(src_only_sig in str(src_only_acl), '[%s]' % str(src_only_acl)) - self.assertTrue(icmp_sig in str(icmp_acl), '[%s]' % str(icmp_acl)) - self.assertTrue(ip_sig in str(ip_acl), '[%s]' % str(ip_acl)) - self.assertTrue(custom_port_sig in str(custom_port_acl), '[%s]' % str(custom_port_acl)) + self.assertTrue( + dup_sig in str(dup_acl), '[%s]' % str(dup_acl) + ) + self.assertTrue( + diff_sig in str(diff_acl), '[%s]' % str(diff_acl) + ) + self.assertTrue( + src_only_sig in str(src_only_acl), '[%s]' % str(src_only_acl) + ) + self.assertTrue( + icmp_sig in str(icmp_acl), '[%s]' % str(icmp_acl) + ) + self.assertTrue( + ip_sig in str(ip_acl), '[%s]' % str(ip_acl) + ) + self.assertTrue( + custom_port_sig in str(custom_port_acl), '[%s]' % str(custom_port_acl) + ) def testInterfaces(self): - no_interfaces_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('src_interface', 'dest_interface')) - src_only_int_term = self.fmt.format(TERM_TEMPLATE, src_interface='wan1', remove_fields=('dest_interface',)) - dest_only_int_term = self.fmt.format(TERM_TEMPLATE, dest_interface='wan2', remove_fields=('src_interface',)) - both_interfaces_term = self.fmt.format(TERM_TEMPLATE, src_interface='wan1', dest_interface='wan2', ) - - no_interfaces_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, - self.naming), EXP_INFO) - src_only_int_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + src_only_int_term, - self.naming), EXP_INFO) - dest_only_int_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, - self.naming), EXP_INFO) - both_interfaces_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, - self.naming), EXP_INFO) + no_interfaces_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_interface', + 'dest_interface')) + src_only_int_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + remove_fields=('dest_interface',)) + dest_only_int_term = self.fmt.format(TERM_TEMPLATE, + dest_interface='wan2', + remove_fields=('src_interface',)) + both_interfaces_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + dest_interface='wan2', ) + + no_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, + self.naming), EXP_INFO + ) + src_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + src_only_int_term, + self.naming), EXP_INFO + ) + dest_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, + self.naming), EXP_INFO + ) + both_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, + self.naming), EXP_INFO + ) no_interfaces_sig = 'set srcintf any\n\t\tset dstintf any' src_int_only_sig = 'set srcintf wan1\n\t\tset dstintf any' dest_int_only_sig = 'set srcintf any\n\t\tset dstintf wan2' both_interfaces_sig = 'set srcintf wan1\n\t\tset dstintf wan2' - self.assertTrue(no_interfaces_sig in str(no_interfaces_acl), '[%s]' % str(no_interfaces_acl)) - self.assertTrue(src_int_only_sig in str(src_only_int_acl), '[%s]' % str(src_only_int_acl)) - self.assertTrue(dest_int_only_sig in str(dest_only_int_acl), '[%s]' % str(dest_only_int_acl)) - self.assertTrue(both_interfaces_sig in str(both_interfaces_acl), '[%s]' % str(both_interfaces_acl)) + self.assertTrue( + no_interfaces_sig in str(no_interfaces_acl), '[%s]' % str(no_interfaces_acl) + ) + self.assertTrue( + src_int_only_sig in str(src_only_int_acl), '[%s]' % str(src_only_int_acl) + ) + self.assertTrue( + dest_int_only_sig in str(dest_only_int_acl), '[%s]' % str(dest_only_int_acl) + ) + self.assertTrue( + both_interfaces_sig in str(both_interfaces_acl), '[%s]' % str(both_interfaces_acl) + ) def testLogging(self): - log_term = self.fmt.format(TERM_TEMPLATE, logging='true') - no_log_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('logging',)) - - log_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + log_term, - self.naming), EXP_INFO) - no_log_acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + no_log_term, - self.naming), EXP_INFO) + log_term = self.fmt.format(TERM_TEMPLATE, + logging='true') + no_log_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('logging',)) + + log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + log_term, + self.naming), EXP_INFO + ) + no_log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_log_term, + self.naming), EXP_INFO + ) log_sig = 'set logtraffic all' - self.assertTrue(log_sig in str(log_acl), '[%s]' % str(log_acl)) - self.assertTrue(log_sig not in str(no_log_term), '[%s]' % str(no_log_acl)) + self.assertTrue( + log_sig in str(log_acl), '[%s]' % str(log_acl) + ) + self.assertTrue( + log_sig not in str(no_log_term), '[%s]' % str(no_log_acl) + ) def testDuplicateTermError(self): term = self.fmt.format(TERM_TEMPLATE, logging='true') From cf91480d58e4a96d9f5e45f2609755d2c62a6c9c Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Tue, 19 May 2020 21:57:49 +0300 Subject: [PATCH 07/13] code style fixes --- capirca/lib/fortigate.py | 117 ++++++++++-------- tests/lib/fortigate_test.py | 233 +++++++++++++++--------------------- 2 files changed, 162 insertions(+), 188 deletions(-) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index d28057fe..d573b15e 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -21,47 +21,53 @@ from __future__ import unicode_literals import datetime -import six -from absl import logging from capirca.lib import aclgenerator from capirca.lib import nacaddr +import six +from absl import logging _ACTION_TABLE = { - 'accept': 'accept', - 'deny': 'deny', - 'reject': 'deny', - 'reject-with-tcp-rst': 'deny', # tcp rst not supported + 'accept': 'accept', + 'deny': 'deny', + 'reject': 'deny', + 'reject-with-tcp-rst': 'deny', # tcp rst not supported } class Error(Exception): + """Generic error class.""" pass class FilterError(Error): + """Generic pol Filter class.""" pass class FortiGateValueError(Error): + """Raised when invalid values provided.""" pass class FortiGateFindServiceError(Error): + """Raised when unable to get the service name.""" pass class FortiGateDuplicateTermError(Error): + """Raised when duplicate term found.""" pass -class FortiGatePortDoesNotExist(Error): +class FortiGatePortDoesNotExistError(Error): + """Raised when port is not found in ports list.""" pass class FortigatePortMap(object): - """Map port numbers to service names""" + """Map port numbers to service names.""" _PORTS_TCP = { 179: 'BGP', 53: 'DNS', @@ -117,19 +123,23 @@ class FortigatePortMap(object): @staticmethod def GetProtocol(protocol, port=None): - """ - Converts a port number to a service name. - :param protocol: string representing protocol (tcp, udp, etc). - :param port: integer representing the port number. - :return: the service name of provided port-protocol + """Converts a port number to a service name. + + Args: + protocol: string representing protocol (tcp, udp, etc) + port: integer representing the port number + + Returns: + string + + Raises: + FortiGateValueError: When unsupported protocol is used. """ f_proto = FortigatePortMap._PROTO_MAP.get(protocol, None) if f_proto is None: raise FortiGateValueError( - '%r protocol is not supported by Fortigate, supported protocols = %r' % ( - protocol, FortigatePortMap._PROTO_MAP.keys() - ) - ) + '%r protocol is unsupported, supported protocols = %r' % ( + protocol, FortigatePortMap._PROTO_MAP.keys())) if isinstance(f_proto, six.string_types): return f_proto @@ -137,15 +147,13 @@ def GetProtocol(protocol, port=None): try: return f_proto[port] except KeyError: - raise FortiGatePortDoesNotExist + raise FortiGatePortDoesNotExistError else: raise FortiGateFindServiceError( - 'failed to get service from %r protocol and %r port' % (protocol, port) - ) - + 'service not found from %r protocol and %r port' % (protocol, port)) class ObjectsContainer: - """a Container that holds service and network objects""" + """A Container that holds service and network objects.""" def __init__(self): self._FW_ADDRESSES = [] @@ -154,17 +162,17 @@ def __init__(self): self._FW_DUP_CHECK = set() def get_fw_addresses(self): - """return the collected addresses""" + """Returns the collected addresses.""" self._FW_ADDRESSES.extend([' ', 'end', ' ']) return self._FW_ADDRESSES def get_fw_services(self): - """return the collected services""" + """Returns the collected services.""" self._FW_SERVICES.extend([' ', 'end', ' ']) return self._FW_SERVICES def _add_address_to_fw_addresses(self, addr): - """add address to address store""" + """Add address to address store.""" if addr in self._FW_DUP_CHECK: return self._FW_ADDRESSES.extend(['\tedit %s' % addr, @@ -173,22 +181,21 @@ def _add_address_to_fw_addresses(self, addr): self._FW_DUP_CHECK.add(addr) def _add_service_to_fw_services(self, protocol, service): - """add service to services store""" + """Add service to services store.""" if service in self._FW_DUP_CHECK: return self._FW_SERVICES.extend( - ['\tedit %s' % service, - '\t\tset protocol TCP/UDP', - '\t\tset %s-portrange %s' % (protocol.lower(), service), - '\tnext'] - ) + ['\tedit %s' % service, + '\t\tset protocol TCP/UDP', + '\t\tset %s-portrange %s' % (protocol.lower(), service), + '\tnext']) self._FW_DUP_CHECK.add(service) class Term(aclgenerator.Term): - """Single Firewall Policy""" + """Single Firewall Policy.""" ALLOWED_PROTO_STRINGS = ['gre', 'icmp', 'ip', 'tcp', 'udp'] COMMENT_MAX_WIDTH = 70 @@ -204,7 +211,7 @@ def __init__(self, term, object_container): @staticmethod def _get_addresses_name(addresses): - """return the addresses or 'all' if no addresses specified""" + """Returns the addresses or 'all' if no addresses specified.""" v4_addresses = [x.with_prefixlen for x in addresses if not isinstance(x, nacaddr.IPv6)] addresses = ' '.join(v4_addresses) @@ -212,7 +219,7 @@ def _get_addresses_name(addresses): @staticmethod def clean_ports(src_ports, dest_ports): - """return a set() of src and dest ports""" + """Returns a set() of src and dest ports.""" all_ports = [] if src_ports: all_ports += src_ports @@ -220,29 +227,31 @@ def clean_ports(src_ports, dest_ports): all_ports += dest_ports return set(all_ports) - def _get_services_string(self, protocol, ports): - """ - get the service name if exist, if not create a service object and return the name - :param protocol: list of protocols - :param ports: list of ports - :return: - """ + def _get_services_string(self, protocols, ports): + """Get the service name, if not exist create it. + Args: + protocol: list of protocols + port: list of ports + + Returns: + string (all services separated by spaces. + """ services = [] - if protocol and not ports: - services.append(FortigatePortMap.GetProtocol(protocol[0])) + if protocols and not ports: + services.append(FortigatePortMap.GetProtocol(protocols[0])) for port in ports: try: - service = FortigatePortMap.GetProtocol(protocol[0], port[0]) - except FortiGatePortDoesNotExist: - self._obj_container._add_service_to_fw_services(protocol[0], port[0]) + service = FortigatePortMap.GetProtocol(protocols[0], port[0]) + except FortiGatePortDoesNotExistError: + self._obj_container._add_service_to_fw_services(protocols[0], port[0]) service = str(port[0]) services.append(service) return ' '.join(services) or 'ALL' def _generate_address_names(self, *addresses): - """this will generate the addresses names (object-network names)""" + """Generate the addresses names (object-network names).""" for group in addresses: for addr in group: if addr and not isinstance(addr, nacaddr.IPv6): @@ -256,14 +265,16 @@ def __str__(self): dest_addresses = self._get_addresses_name(self._term.destination_address) src_addresses = self._get_addresses_name(self._term.source_address) - all_ports = self.clean_ports(self._term.source_port, self._term.destination_port) + all_ports = self.clean_ports(self._term.source_port, + self._term.destination_port) services = self._get_services_string(self._term.protocol, all_ports) lines.append('\t\tset comments %s' % self._term.name) lines.append('\t\tset srcintf %s' % (self._term.source_interface or 'any')) - lines.append('\t\tset dstintf %s' % (self._term.destination_interface or 'any')) + lines.append( + '\t\tset dstintf %s' % (self._term.destination_interface or 'any')) lines.append('\t\tset dstaddr %s' % dest_addresses) lines.append('\t\tset srcaddr %s' % src_addresses) lines.append('\t\tset action %s' % _ACTION_TABLE.get(self._term.action[0])) @@ -276,7 +287,7 @@ def __str__(self): class Fortigate(aclgenerator.ACLGenerator): - """A cisco policy object.""" + """A Fortigate policy object.""" _PLATFORM = 'fortigate' _DEFAULT_PROTOCOL = 'ALL' @@ -294,7 +305,7 @@ def _BuildTokens(self): """Build supported tokens for platform. Returns: - tuple containing both supported tokens and sub tokens + tuple containing both supported tokens and sub tokens. """ supported_tokens, supported_sub_tokens = super(Fortigate, self)._BuildTokens() @@ -311,7 +322,7 @@ def _BuildTokens(self): return supported_tokens, supported_sub_tokens def _TranslatePolicy(self, pol, exp_info): - """Translate Capirca pol to fortigate pol""" + """Translate Capirca pol to fortigate pol.""" self.fortigate_policies = [] current_date = datetime.datetime.utcnow().date() exp_info_date = current_date + datetime.timedelta(weeks=exp_info) @@ -326,7 +337,7 @@ def _TranslatePolicy(self, pol, exp_info): if (len(filter_options) < 2 or filter_options[0] != 'from-id'): raise FilterError( - 'Fortigate Firewall filter arguments must specify from_id') + 'Fortigate Firewall filter arguments must specify from_id') from_id = filter_options[1] Term.CURRENT_ID = int(from_id) diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index ef576361..4d8db43d 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -103,14 +103,14 @@ def get_addr_side_eff(host): } return hosts[host] - def get_port_side_eff(port, protocol): + def get_port_side_eff(*args): hosts = { 'HTTP': ['80'], 'HTTPS': ['443'], 'SSH': ['22'], 'WHOIS': ['43'] } - return hosts[port] + return hosts[args[0]] self.naming.GetNetAddr.side_effect = get_addr_side_eff self.naming.GetServiceByProto.side_effect = get_port_side_eff @@ -126,7 +126,7 @@ def testGoodHeader(self): get_net_calls = [mock.call('SOME_HOST')] * 2 get_server_by_proto_calls = [mock.call('HTTP', 'tcp')] * 2 - self.assertTrue(expected_sig in str(acl), '[%s]' % str(acl)) + self.assertIn(expected_sig, str(acl), '[%s]' % str(acl)) self.naming.GetNetAddr.assert_has_calls(get_net_calls) self.naming.GetServiceByProto.assert_has_calls(get_server_by_proto_calls) @@ -146,65 +146,51 @@ def testAction(self): reject_term = self.fmt.format(TERM_TEMPLATE, action='reject') accept_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + accept_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + accept_term, + self.naming), EXP_INFO) deny_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + deny_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + deny_term, + self.naming), EXP_INFO) reject_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + reject_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + reject_term, + self.naming), EXP_INFO) accept_sig = 'set action accept' deny_sig = 'set action deny' - self.assertTrue( - accept_sig in str(accept_acl), '[%s]' % str(accept_acl) - ) - self.assertTrue( - deny_sig in str(deny_sig), '[%s]' % str(deny_acl) - ) - self.assertTrue( - deny_sig in str(reject_acl), '[%s]' % str(reject_acl) - ) + self.assertIn( + accept_sig, str(accept_acl), '[%s]' % str(accept_acl)) + self.assertIn( + deny_sig, str(deny_sig), '[%s]' % str(deny_acl)) + self.assertIn( + deny_sig, str(reject_acl), '[%s]' % str(reject_acl)) def testAddresses(self): - diff_addr_term = self.fmt.format( - TERM_TEMPLATE, src_addr='SOME_HOST', - dest_addr='SOME_HOST2' - ) - same_addr_term = self.fmt.format( - TERM_TEMPLATE, src_addr='SOME_HOST2', - dest_addr='SOME_HOST2' - ) - any_src_term = self.fmt.format( - TERM_TEMPLATE, remove_fields=('src_addr',) - ) - any_dest_term = self.fmt.format( - TERM_TEMPLATE, remove_fields=('dest_addr',) - ) + diff_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST', + dest_addr='SOME_HOST2') + same_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST2', + dest_addr='SOME_HOST2') + any_src_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_addr',)) + any_dest_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_addr',)) diff_addr_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + diff_addr_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + diff_addr_term, + self.naming), EXP_INFO) same_addr_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + same_addr_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + same_addr_term, + self.naming), EXP_INFO) any_src_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + any_src_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + any_src_term, + self.naming), EXP_INFO) any_dest_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + any_dest_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + any_dest_term, + self.naming), EXP_INFO) src_sig = 'set srcaddr 10.0.0.0/8' dest_sig = 'set dstaddr 20.0.0.0/8' @@ -212,19 +198,15 @@ def testAddresses(self): any_src_sig = 'set srcaddr all' self.assertTrue( - src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), - '[%s]' % str(diff_addr_acl) - ) + src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), + '[%s]' % str(diff_addr_acl)) # [] check acl generate one 'set subnet' for dup addresses self.assertEqual( - str(same_addr_acl).count('set subnet'), 1 - ) - self.assertTrue( - any_src_sig in str(any_src_acl), '[%s]' % str(any_src_acl) - ) - self.assertTrue( - any_dest_sig in str(any_dest_acl), '[%s]' % str(any_dest_acl) - ) + str(same_addr_acl).count('set subnet'), 1) + self.assertIn( + any_src_sig, str(any_src_acl), '[%s]' % str(any_src_acl)) + self.assertIn( + any_dest_sig, str(any_dest_acl), '[%s]' % str(any_dest_acl)) def testServices(self): dup_port_term = self.fmt.format(TERM_TEMPLATE, @@ -240,60 +222,49 @@ def testServices(self): protocol='icmp', remove_fields=('dest_port', 'src_port')) ip_term = self.fmt.format(TERM_TEMPLATE, - remove_fields=('dest_port', 'src_port', 'protocol')) + remove_fields=('dest_port', + 'src_port', 'protocol')) custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') dup_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + dup_port_term, - self.naming), EXP_INFO - ) + GOOD_HEADER + dup_port_term, + self.naming), EXP_INFO) diff_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + diff_port_term, - self.naming), EXP_INFO - ) + GOOD_HEADER + diff_port_term, + self.naming), EXP_INFO) src_only_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + src_only_term, - self.naming), EXP_INFO - ) + GOOD_HEADER + src_only_term, + self.naming), EXP_INFO) icmp_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + icmp_term, - self.naming), EXP_INFO - ) + GOOD_HEADER + icmp_term, + self.naming), EXP_INFO) ip_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + ip_term, - self.naming), EXP_INFO - ) + GOOD_HEADER + ip_term, + self.naming), EXP_INFO) custom_port_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + custom_port_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + custom_port_term, + self.naming), EXP_INFO) dup_sig = 'set service HTTP\n' diff_sig = 'set service HTTP HTTPS\n' src_only_sig = dup_sig icmp_sig = 'set service ALL_ICMP\n' ip_sig = 'set service ALL\n' - custom_port_sig = 'config firewall service custom\n\tedit 43\n\t\t' \ - 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext' - - self.assertTrue( - dup_sig in str(dup_acl), '[%s]' % str(dup_acl) - ) - self.assertTrue( - diff_sig in str(diff_acl), '[%s]' % str(diff_acl) - ) - self.assertTrue( - src_only_sig in str(src_only_acl), '[%s]' % str(src_only_acl) - ) - self.assertTrue( - icmp_sig in str(icmp_acl), '[%s]' % str(icmp_acl) - ) - self.assertTrue( - ip_sig in str(ip_acl), '[%s]' % str(ip_acl) - ) - self.assertTrue( - custom_port_sig in str(custom_port_acl), '[%s]' % str(custom_port_acl) - ) + custom_port_sig = ('config firewall service custom\n\tedit 43\n\t\t' + 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext') + + self.assertIn( + dup_sig, str(dup_acl), '[%s]' % str(dup_acl)) + self.assertIn( + diff_sig, str(diff_acl), '[%s]' % str(diff_acl)) + self.assertIn( + src_only_sig, str(src_only_acl), '[%s]' % str(src_only_acl)) + self.assertIn( + icmp_sig, str(icmp_acl), '[%s]' % str(icmp_acl)) + self.assertIn( + ip_sig , str(ip_acl), '[%s]' % str(ip_acl)) + self.assertIn( + custom_port_sig, str(custom_port_acl), '[%s]' % str(custom_port_acl)) def testInterfaces(self): no_interfaces_term = self.fmt.format(TERM_TEMPLATE, @@ -310,39 +281,35 @@ def testInterfaces(self): dest_interface='wan2', ) no_interfaces_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, + self.naming), EXP_INFO) src_only_int_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + src_only_int_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + src_only_int_term, + self.naming), EXP_INFO) dest_only_int_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, + self.naming), EXP_INFO) both_interfaces_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, + self.naming), EXP_INFO) no_interfaces_sig = 'set srcintf any\n\t\tset dstintf any' src_int_only_sig = 'set srcintf wan1\n\t\tset dstintf any' dest_int_only_sig = 'set srcintf any\n\t\tset dstintf wan2' both_interfaces_sig = 'set srcintf wan1\n\t\tset dstintf wan2' - self.assertTrue( - no_interfaces_sig in str(no_interfaces_acl), '[%s]' % str(no_interfaces_acl) - ) - self.assertTrue( - src_int_only_sig in str(src_only_int_acl), '[%s]' % str(src_only_int_acl) - ) - self.assertTrue( - dest_int_only_sig in str(dest_only_int_acl), '[%s]' % str(dest_only_int_acl) - ) - self.assertTrue( - both_interfaces_sig in str(both_interfaces_acl), '[%s]' % str(both_interfaces_acl) - ) + self.assertIn( + no_interfaces_sig, str(no_interfaces_acl), + '[%s]' % str(no_interfaces_acl)) + self.assertIn( + src_int_only_sig, str(src_only_int_acl), + '[%s]' % str(src_only_int_acl)) + self.assertIn( + dest_int_only_sig, str(dest_only_int_acl), + '[%s]' % str(dest_only_int_acl)) + self.assertIn( + both_interfaces_sig, str(both_interfaces_acl), + '[%s]' % str(both_interfaces_acl)) def testLogging(self): log_term = self.fmt.format(TERM_TEMPLATE, @@ -351,22 +318,18 @@ def testLogging(self): remove_fields=('logging',)) log_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + log_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + log_term, + self.naming), EXP_INFO) no_log_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + no_log_term, - self.naming), EXP_INFO - ) + policy.ParsePolicy(GOOD_HEADER + no_log_term, + self.naming), EXP_INFO) log_sig = 'set logtraffic all' - self.assertTrue( - log_sig in str(log_acl), '[%s]' % str(log_acl) - ) - self.assertTrue( - log_sig not in str(no_log_term), '[%s]' % str(no_log_acl) - ) + self.assertIn( + log_sig, str(log_acl), '[%s]' % str(log_acl)) + self.assertNotIn( + log_sig, str(no_log_term), '[%s]' % str(no_log_acl)) def testDuplicateTermError(self): term = self.fmt.format(TERM_TEMPLATE, logging='true') @@ -382,7 +345,7 @@ def testDuplicateTermError(self): def testPortMap(self): port_map = fortigate.FortigatePortMap() self.assertEqual('SSH', port_map.GetProtocol('tcp', 22)) - self.assertRaises(fortigate.FortiGatePortDoesNotExist, + self.assertRaises(fortigate.FortiGatePortDoesNotExistError, port_map.GetProtocol, 'tcp', 5000) self.assertRaises(fortigate.FortiGateValueError, From f60c9815e6bfe50eab2375d06d29d8a34072b391 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Wed, 20 May 2020 13:48:03 +0300 Subject: [PATCH 08/13] code convetion fixes --- capirca/lib/fortigate.py | 26 +++++++++++++------------- tests/lib/fortigate_test.py | 7 +++---- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index d573b15e..2a1a4c7b 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -152,40 +152,40 @@ def GetProtocol(protocol, port=None): raise FortiGateFindServiceError( 'service not found from %r protocol and %r port' % (protocol, port)) -class ObjectsContainer: +class ObjectsContainer(object): """A Container that holds service and network objects.""" def __init__(self): - self._FW_ADDRESSES = [] - self._FW_SERVICES = [] + self._fw_addresses = [] + self._fw_services = [] self._FW_DUP_CHECK = set() def get_fw_addresses(self): """Returns the collected addresses.""" - self._FW_ADDRESSES.extend([' ', 'end', ' ']) - return self._FW_ADDRESSES + self._fw_addresses.extend([' ', 'end', ' ']) + return self._fw_addresses def get_fw_services(self): """Returns the collected services.""" - self._FW_SERVICES.extend([' ', 'end', ' ']) - return self._FW_SERVICES + self._fw_services.extend([' ', 'end', ' ']) + return self._fw_services - def _add_address_to_fw_addresses(self, addr): + def add_address_to_fw_addresses(self, addr): """Add address to address store.""" if addr in self._FW_DUP_CHECK: return - self._FW_ADDRESSES.extend(['\tedit %s' % addr, + self._fw_addresses.extend(['\tedit %s' % addr, '\t\tset subnet %s' % addr, '\tnext']) self._FW_DUP_CHECK.add(addr) - def _add_service_to_fw_services(self, protocol, service): + def add_service_to_fw_services(self, protocol, service): """Add service to services store.""" if service in self._FW_DUP_CHECK: return - self._FW_SERVICES.extend( + self._fw_services.extend( ['\tedit %s' % service, '\t\tset protocol TCP/UDP', '\t\tset %s-portrange %s' % (protocol.lower(), service), @@ -244,7 +244,7 @@ def _get_services_string(self, protocols, ports): try: service = FortigatePortMap.GetProtocol(protocols[0], port[0]) except FortiGatePortDoesNotExistError: - self._obj_container._add_service_to_fw_services(protocols[0], port[0]) + self._obj_container.add_service_to_fw_services(protocols[0], port[0]) service = str(port[0]) services.append(service) @@ -255,7 +255,7 @@ def _generate_address_names(self, *addresses): for group in addresses: for addr in group: if addr and not isinstance(addr, nacaddr.IPv6): - self._obj_container._add_address_to_fw_addresses(addr.with_prefixlen) + self._obj_container.add_address_to_fw_addresses(addr.with_prefixlen) def __str__(self): lines = [] diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 4d8db43d..7b335bd7 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -19,8 +19,8 @@ from __future__ import print_function from __future__ import unicode_literals -import string import re +import string import unittest from capirca.lib import fortigate @@ -58,6 +58,8 @@ }} """ +EXP_INFO = 2 + class CustomFormatter(string.Formatter): DEFAULT_VALUES = { @@ -89,9 +91,6 @@ def get_value(self, key, args, kwds): return self.DEFAULT_VALUES[key] -EXP_INFO = 2 - - class FortigateTest(unittest.TestCase): def setUp(self): self.naming = mock.create_autospec(naming.Naming) From 2846f8ef21576de6d431c20d02440b15e2ad7a62 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Fri, 5 Jun 2020 00:52:25 +0300 Subject: [PATCH 09/13] code convention fixes --- capirca/lib/fortigate.py | 26 +++++++++++++++----------- tests/lib/fortigate_test.py | 6 +++--- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index 2a1a4c7b..62004975 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -24,8 +24,9 @@ from capirca.lib import aclgenerator from capirca.lib import nacaddr -import six from absl import logging +import six + _ACTION_TABLE = { @@ -122,7 +123,7 @@ class FortigatePortMap(object): } @staticmethod - def GetProtocol(protocol, port=None): + def get_protocol(protocol, port=None): """Converts a port number to a service name. Args: @@ -134,6 +135,8 @@ def GetProtocol(protocol, port=None): Raises: FortiGateValueError: When unsupported protocol is used. + FortiGatePortDoesNotExistError: if the port does not exist. + FortiGateFindServiceError: when unable to find the requested service. """ f_proto = FortigatePortMap._PROTO_MAP.get(protocol, None) if f_proto is None: @@ -152,6 +155,7 @@ def GetProtocol(protocol, port=None): raise FortiGateFindServiceError( 'service not found from %r protocol and %r port' % (protocol, port)) + class ObjectsContainer(object): """A Container that holds service and network objects.""" @@ -159,7 +163,7 @@ def __init__(self): self._fw_addresses = [] self._fw_services = [] - self._FW_DUP_CHECK = set() + self._fe_dup_check = set() def get_fw_addresses(self): """Returns the collected addresses.""" @@ -173,16 +177,16 @@ def get_fw_services(self): def add_address_to_fw_addresses(self, addr): """Add address to address store.""" - if addr in self._FW_DUP_CHECK: + if addr in self._fe_dup_check: return self._fw_addresses.extend(['\tedit %s' % addr, '\t\tset subnet %s' % addr, '\tnext']) - self._FW_DUP_CHECK.add(addr) + self._fe_dup_check.add(addr) def add_service_to_fw_services(self, protocol, service): """Add service to services store.""" - if service in self._FW_DUP_CHECK: + if service in self._fe_dup_check: return self._fw_services.extend( @@ -191,7 +195,7 @@ def add_service_to_fw_services(self, protocol, service): '\t\tset %s-portrange %s' % (protocol.lower(), service), '\tnext']) - self._FW_DUP_CHECK.add(service) + self._fe_dup_check.add(service) class Term(aclgenerator.Term): @@ -231,18 +235,18 @@ def _get_services_string(self, protocols, ports): """Get the service name, if not exist create it. Args: - protocol: list of protocols - port: list of ports + protocols: list of protocols + ports: list of ports Returns: string (all services separated by spaces. """ services = [] if protocols and not ports: - services.append(FortigatePortMap.GetProtocol(protocols[0])) + services.append(FortigatePortMap.get_protocol(protocols[0])) for port in ports: try: - service = FortigatePortMap.GetProtocol(protocols[0], port[0]) + service = FortigatePortMap.get_protocol(protocols[0], port[0]) except FortiGatePortDoesNotExistError: self._obj_container.add_service_to_fw_services(protocols[0], port[0]) service = str(port[0]) diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 7b335bd7..6693b99e 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -343,10 +343,10 @@ def testDuplicateTermError(self): def testPortMap(self): port_map = fortigate.FortigatePortMap() - self.assertEqual('SSH', port_map.GetProtocol('tcp', 22)) + self.assertEqual('SSH', port_map.get_protocol('tcp', 22)) self.assertRaises(fortigate.FortiGatePortDoesNotExistError, - port_map.GetProtocol, + port_map.get_protocol, 'tcp', 5000) self.assertRaises(fortigate.FortiGateValueError, - port_map.GetProtocol, + port_map.get_protocol, 'bad_proto', 22) From 916884a1656be3bcd0b06d9eb08f3d751be797d3 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Tue, 21 Jul 2020 22:45:28 +0300 Subject: [PATCH 10/13] code style fixes --- capirca/lib/fortigate.py | 3 +-- tests/lib/fortigate_test.py | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index 62004975..ba67eedd 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -22,13 +22,12 @@ import datetime +from absl import logging from capirca.lib import aclgenerator from capirca.lib import nacaddr -from absl import logging import six - _ACTION_TABLE = { 'accept': 'accept', 'deny': 'deny', diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 6693b99e..99c082f6 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -32,7 +32,7 @@ GOOD_HEADER = """ header { - comment:: "this is a test acl" + comment:: "this is a test acl" target:: fortigate from-id 2 } """ @@ -261,7 +261,7 @@ def testServices(self): self.assertIn( icmp_sig, str(icmp_acl), '[%s]' % str(icmp_acl)) self.assertIn( - ip_sig , str(ip_acl), '[%s]' % str(ip_acl)) + ip_sig, str(ip_acl), '[%s]' % str(ip_acl)) self.assertIn( custom_port_sig, str(custom_port_acl), '[%s]' % str(custom_port_acl)) @@ -277,7 +277,7 @@ def testInterfaces(self): remove_fields=('src_interface',)) both_interfaces_term = self.fmt.format(TERM_TEMPLATE, src_interface='wan1', - dest_interface='wan2', ) + dest_interface='wan2') no_interfaces_acl = fortigate.Fortigate( policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, From b5471d5c098ce1895eda4c3ecc4e0982582bffc6 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Tue, 21 Jul 2020 22:56:03 +0300 Subject: [PATCH 11/13] code style fixes --- tests/lib/fortigate_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 99c082f6..00dc445f 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -249,7 +249,7 @@ def testServices(self): src_only_sig = dup_sig icmp_sig = 'set service ALL_ICMP\n' ip_sig = 'set service ALL\n' - custom_port_sig = ('config firewall service custom\n\tedit 43\n\t\t' + custom_port_sig = ('config firewall service custom\n\tedit 43\n\t\t' 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext') self.assertIn( From 5d450463ecbc0172a37f7fb29015f4b130a9e327 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Thu, 23 Jul 2020 23:17:31 +0300 Subject: [PATCH 12/13] unitest fixes --- tests/lib/fortigate_test.py | 318 +++++++++++++++++++++++++----------- 1 file changed, 224 insertions(+), 94 deletions(-) diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 00dc445f..4f5f8c10 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -58,6 +58,80 @@ }} """ +SUPPORTED_TOKENS = { + 'action', + 'comment', + 'destination_address', + 'destination_address_exclude', + 'destination_port', + 'expiration', + 'icmp_type', + 'stateless_reply', + 'logging', + 'name', + 'option', + 'platform', + 'platform_exclude', + 'protocol', + 'source_interface', + 'destination_interface', + 'source_address', + 'source_address_exclude', + 'source_port', + 'translated', + 'verbatim', +} + +SUPPORTED_SUB_TOKENS = { + 'action': {'accept', 'deny', 'reject', + 'reject-with-tcp-rst'}, + 'icmp_type': { + 'alternate-address', + 'certification-path-advertisement', + 'certification-path-solicitation', + 'conversion-error', + 'destination-unreachable', + 'echo-reply', + 'echo-request', + 'mobile-redirect', + 'home-agent-address-discovery-reply', + 'home-agent-address-discovery-request', + 'icmp-node-information-query', + 'icmp-node-information-response', + 'information-request', + 'inverse-neighbor-discovery-advertisement', + 'inverse-neighbor-discovery-solicitation', + 'mask-reply', + 'mask-request', + 'information-reply', + 'mobile-prefix-advertisement', + 'mobile-prefix-solicitation', + 'multicast-listener-done', + 'multicast-listener-query', + 'multicast-listener-report', + 'multicast-router-advertisement', + 'multicast-router-solicitation', + 'multicast-router-termination', + 'neighbor-advertisement', + 'neighbor-solicit', + 'packet-too-big', + 'parameter-problem', + 'redirect', + 'redirect-message', + 'router-advertisement', + 'router-renumbering', + 'router-solicit', + 'router-solicitation', + 'source-quench', + 'time-exceeded', + 'timestamp-reply', + 'timestamp-request', + 'unreachable', + 'version-2-multicast-listener-report', + }, + 'option': {'from_id'} +} + EXP_INFO = 2 @@ -139,176 +213,229 @@ def testBadHeader(self): parsed_p, EXP_INFO) - def testAction(self): + def testBuildTokens(self): + term = self.fmt.format(TERM_TEMPLATE) + pol1 = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + term, + self.naming), EXP_INFO) + st, sst = pol1._BuildTokens() + self.assertEqual(st, SUPPORTED_TOKENS) + self.assertEqual(sst, SUPPORTED_SUB_TOKENS) + + def testActionAccept(self): accept_term = self.fmt.format(TERM_TEMPLATE, action='accept') + + accept_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + accept_term, + self.naming), EXP_INFO) + + accept_sig = 'set action accept' + self.assertIn( + accept_sig, str(accept_acl), '[%s]' % str(accept_acl)) + + def testActionDeny(self): deny_term = self.fmt.format(TERM_TEMPLATE, action='deny') reject_term = self.fmt.format(TERM_TEMPLATE, action='reject') - accept_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + accept_term, - self.naming), EXP_INFO) deny_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + deny_term, - self.naming), EXP_INFO) + policy.ParsePolicy(GOOD_HEADER + deny_term, + self.naming), EXP_INFO) reject_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + reject_term, - self.naming), EXP_INFO) + policy.ParsePolicy(GOOD_HEADER + reject_term, + self.naming), EXP_INFO) - accept_sig = 'set action accept' deny_sig = 'set action deny' self.assertIn( - accept_sig, str(accept_acl), '[%s]' % str(accept_acl)) + deny_sig, str(deny_sig), '[%s]' % str(deny_acl)) self.assertIn( - deny_sig, str(deny_sig), '[%s]' % str(deny_acl)) - self.assertIn( - deny_sig, str(reject_acl), '[%s]' % str(reject_acl)) + deny_sig, str(reject_acl), '[%s]' % str(reject_acl)) - def testAddresses(self): + def testAddressDiff(self): diff_addr_term = self.fmt.format(TERM_TEMPLATE, src_addr='SOME_HOST', dest_addr='SOME_HOST2') + + diff_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + diff_addr_term, + self.naming), EXP_INFO) + + src_sig = 'set srcaddr 10.0.0.0/8' + dest_sig = 'set dstaddr 20.0.0.0/8' + + self.assertTrue( + src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), + '[%s]' % str(diff_addr_acl)) + + def testAddressSame(self): same_addr_term = self.fmt.format(TERM_TEMPLATE, src_addr='SOME_HOST2', dest_addr='SOME_HOST2') + + same_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + same_addr_term, + self.naming), EXP_INFO) + + self.assertEqual( + str(same_addr_acl).count('set subnet'), 1) + + def testAddressAny(self): any_src_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('src_addr',)) any_dest_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('dest_addr',)) - diff_addr_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + diff_addr_term, - self.naming), EXP_INFO) - - same_addr_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + same_addr_term, - self.naming), EXP_INFO) - any_src_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + any_src_term, - self.naming), EXP_INFO) + policy.ParsePolicy(GOOD_HEADER + any_src_term, + self.naming), EXP_INFO) any_dest_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + any_dest_term, - self.naming), EXP_INFO) + policy.ParsePolicy(GOOD_HEADER + any_dest_term, + self.naming), EXP_INFO) - src_sig = 'set srcaddr 10.0.0.0/8' - dest_sig = 'set dstaddr 20.0.0.0/8' any_dest_sig = 'set dstaddr all' any_src_sig = 'set srcaddr all' - self.assertTrue( - src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), - '[%s]' % str(diff_addr_acl)) - # [] check acl generate one 'set subnet' for dup addresses - self.assertEqual( - str(same_addr_acl).count('set subnet'), 1) self.assertIn( - any_src_sig, str(any_src_acl), '[%s]' % str(any_src_acl)) + any_src_sig, str(any_src_acl), '[%s]' % str(any_src_acl)) self.assertIn( - any_dest_sig, str(any_dest_acl), '[%s]' % str(any_dest_acl)) + any_dest_sig, str(any_dest_acl), '[%s]' % str(any_dest_acl)) - def testServices(self): + def testServiceDupPort(self): dup_port_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', dest_port='HTTP') + + dup_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + dup_port_term, + self.naming), EXP_INFO) + + dup_sig = 'set service HTTP\n' + + self.assertIn( + dup_sig, str(dup_acl), '[%s]' % str(dup_acl)) + + def testServiceDiffPort(self): diff_port_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', dest_port='HTTPS') + + diff_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + diff_port_term, + self.naming), EXP_INFO) + + diff_sig = 'set service HTTP HTTPS\n' + + self.assertIn( + diff_sig, str(diff_acl), '[%s]' % str(diff_acl)) + + def testServiceSrcOnly(self): src_only_term = self.fmt.format(TERM_TEMPLATE, src_port='HTTP', remove_fields=('dest_port',)) + + src_only_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + src_only_term, + self.naming), EXP_INFO) + + src_only_sig = 'set service HTTP\n' + + self.assertIn( + src_only_sig, str(src_only_acl), '[%s]' % str(src_only_acl)) + + def testServiceIp(self): icmp_term = self.fmt.format(TERM_TEMPLATE, protocol='icmp', remove_fields=('dest_port', 'src_port')) ip_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('dest_port', 'src_port', 'protocol')) - custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') - dup_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + dup_port_term, - self.naming), EXP_INFO) - diff_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + diff_port_term, - self.naming), EXP_INFO) - src_only_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + src_only_term, - self.naming), EXP_INFO) icmp_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + icmp_term, - self.naming), EXP_INFO) + GOOD_HEADER + icmp_term, + self.naming), EXP_INFO) ip_acl = fortigate.Fortigate(policy.ParsePolicy( - GOOD_HEADER + ip_term, - self.naming), EXP_INFO) - custom_port_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + custom_port_term, - self.naming), EXP_INFO) + GOOD_HEADER + ip_term, + self.naming), EXP_INFO) - dup_sig = 'set service HTTP\n' - diff_sig = 'set service HTTP HTTPS\n' - src_only_sig = dup_sig icmp_sig = 'set service ALL_ICMP\n' ip_sig = 'set service ALL\n' - custom_port_sig = ('config firewall service custom\n\tedit 43\n\t\t' - 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext') self.assertIn( - dup_sig, str(dup_acl), '[%s]' % str(dup_acl)) - self.assertIn( - diff_sig, str(diff_acl), '[%s]' % str(diff_acl)) - self.assertIn( - src_only_sig, str(src_only_acl), '[%s]' % str(src_only_acl)) + icmp_sig, str(icmp_acl), '[%s]' % str(icmp_acl)) self.assertIn( - icmp_sig, str(icmp_acl), '[%s]' % str(icmp_acl)) - self.assertIn( - ip_sig, str(ip_acl), '[%s]' % str(ip_acl)) + ip_sig, str(ip_acl), '[%s]' % str(ip_acl)) + + def testServiceCustomPort(self): + custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') + + custom_port_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + custom_port_term, + self.naming), EXP_INFO) + + custom_port_sig = ('config firewall service custom\n\tedit 43\n\t\t' + 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext') + self.assertIn( - custom_port_sig, str(custom_port_acl), '[%s]' % str(custom_port_acl)) + custom_port_sig, str(custom_port_acl), '[%s]' % str(custom_port_acl)) - def testInterfaces(self): + def testInterfaceNone(self): no_interfaces_term = self.fmt.format(TERM_TEMPLATE, remove_fields=('src_interface', 'dest_interface')) + + no_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, + self.naming), EXP_INFO) + + no_interfaces_sig = 'set srcintf any\n\t\tset dstintf any' + + self.assertIn( + no_interfaces_sig, str(no_interfaces_acl), + '[%s]' % str(no_interfaces_acl)) + + def testInterfaceSrcOnly(self): src_only_int_term = self.fmt.format(TERM_TEMPLATE, src_interface='wan1', remove_fields=('dest_interface',)) + + src_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + src_only_int_term, + self.naming), EXP_INFO) + + src_int_only_sig = 'set srcintf wan1\n\t\tset dstintf any' + + self.assertIn( + src_int_only_sig, str(src_only_int_acl), + '[%s]' % str(src_only_int_acl)) + + def testInterfaceDestOnly(self): dest_only_int_term = self.fmt.format(TERM_TEMPLATE, dest_interface='wan2', remove_fields=('src_interface',)) + + dest_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, + self.naming), EXP_INFO) + + dest_int_only_sig = 'set srcintf any\n\t\tset dstintf wan2' + + self.assertIn( + dest_int_only_sig, str(dest_only_int_acl), + '[%s]' % str(dest_only_int_acl)) + + def testInterfaceBoth(self): both_interfaces_term = self.fmt.format(TERM_TEMPLATE, src_interface='wan1', dest_interface='wan2') - no_interfaces_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, - self.naming), EXP_INFO) - src_only_int_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + src_only_int_term, - self.naming), EXP_INFO) - dest_only_int_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, - self.naming), EXP_INFO) both_interfaces_acl = fortigate.Fortigate( - policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, - self.naming), EXP_INFO) + policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, + self.naming), EXP_INFO) - no_interfaces_sig = 'set srcintf any\n\t\tset dstintf any' - src_int_only_sig = 'set srcintf wan1\n\t\tset dstintf any' - dest_int_only_sig = 'set srcintf any\n\t\tset dstintf wan2' both_interfaces_sig = 'set srcintf wan1\n\t\tset dstintf wan2' self.assertIn( - no_interfaces_sig, str(no_interfaces_acl), - '[%s]' % str(no_interfaces_acl)) - self.assertIn( - src_int_only_sig, str(src_only_int_acl), - '[%s]' % str(src_only_int_acl)) - self.assertIn( - dest_int_only_sig, str(dest_only_int_acl), - '[%s]' % str(dest_only_int_acl)) - self.assertIn( - both_interfaces_sig, str(both_interfaces_acl), - '[%s]' % str(both_interfaces_acl)) + both_interfaces_sig, str(both_interfaces_acl), + '[%s]' % str(both_interfaces_acl)) def testLogging(self): log_term = self.fmt.format(TERM_TEMPLATE, @@ -350,3 +477,6 @@ def testPortMap(self): self.assertRaises(fortigate.FortiGateValueError, port_map.get_protocol, 'bad_proto', 22) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From f9920bd150ef259865f4a5a81062f0fc3f9d3fa8 Mon Sep 17 00:00:00 2001 From: Ali-aqrabawi Date: Thu, 23 Jul 2020 23:19:02 +0300 Subject: [PATCH 13/13] unitest fixes --- tests/lib/fortigate_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py index 4f5f8c10..ed9bacdd 100644 --- a/tests/lib/fortigate_test.py +++ b/tests/lib/fortigate_test.py @@ -478,5 +478,6 @@ def testPortMap(self): port_map.get_protocol, 'bad_proto', 22) + if __name__ == '__main__': unittest.main() \ No newline at end of file