diff --git a/capirca/aclgen.py b/capirca/aclgen.py index 743709c3..6c864a71 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -37,6 +37,7 @@ from capirca.lib import ciscoasa from capirca.lib import ciscoxr from capirca.lib import cloudarmor +from capirca.lib import fortigate from capirca.lib import gce from capirca.lib import ipset from capirca.lib import iptables @@ -174,6 +175,7 @@ def RenderFile(input_file, output_directory, definitions, win_afw = False xacl = False paloalto = False + fcl = False try: conf = open(input_file).read() @@ -238,6 +240,8 @@ def RenderFile(input_file, output_directory, definitions, paloalto = copy.deepcopy(pol) if 'cloudarmor' in platforms: gca = copy.deepcopy(pol) + if 'fortigate' in platforms: + fcl = copy.deepcopy(pol) if not output_directory.endswith('/'): output_directory += '/' @@ -327,11 +331,15 @@ def RenderFile(input_file, output_directory, definitions, acl_obj = cloudarmor.CloudArmor(gca, exp_info) RenderACL(str(acl_obj), acl_obj.SUFFIX, output_directory, input_file, write_files) + if fcl: + acl_obj = fortigate.Fortigate(fcl, exp_info) + RenderACL(str(acl_obj), acl_obj.SUFFIX, output_directory, + input_file, write_files) # TODO(robankeny) add additional errors. except (juniper.Error, junipersrx.Error, cisco.Error, ipset.Error, iptables.Error, speedway.Error, pcap.Error, aclgenerator.Error, aruba.Error, nftables.Error, gce.Error, - cloudarmor.Error) as e: + cloudarmor.Error, fortigate.Error) as e: raise ACLGeneratorError( 'Error generating target ACL for %s:\n%s' % (input_file, e)) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py new file mode 100644 index 00000000..ba67eedd --- /dev/null +++ b/capirca/lib/fortigate.py @@ -0,0 +1,397 @@ +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Fortigate generator.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import datetime + +from absl import logging +from capirca.lib import aclgenerator +from capirca.lib import nacaddr +import six + + +_ACTION_TABLE = { + 'accept': 'accept', + 'deny': 'deny', + 'reject': 'deny', + 'reject-with-tcp-rst': 'deny', # tcp rst not supported +} + + +class Error(Exception): + """Generic error class.""" + pass + + +class FilterError(Error): + """Generic pol Filter class.""" + pass + + +class FortiGateValueError(Error): + """Raised when invalid values provided.""" + pass + + +class FortiGateFindServiceError(Error): + """Raised when unable to get the service name.""" + pass + + +class FortiGateDuplicateTermError(Error): + """Raised when duplicate term found.""" + pass + + +class FortiGatePortDoesNotExistError(Error): + """Raised when port is not found in ports list.""" + pass + + +class FortigatePortMap(object): + """Map port numbers to service names.""" + _PORTS_TCP = { + 179: 'BGP', + 53: 'DNS', + 7: 'PING', + 79: 'FINGER', + 21: 'FTP', + 70: 'GOPHER', + 443: 'HTTPS', + 194: 'IRC', + 2049: 'NFS', + 119: 'NNTP', + 110: 'POP3', + 1723: 'PPTP', + 25: 'SMTP', + 22: 'SSH', + 517: 'TALK', + 23: 'TELNET', + 540: 'UUCP', + 80: 'HTTP', + 993: 'IMAPS', + 3389: 'RDP', + 3306: 'MYSQL', + 1433: 'MS-SQL', + 1812: 'RADIUS', + 995: 'POP3S', + 465: 'SMTPS', + 389: 'LDAP', + 69: 'TFTP' + } + _PORTS_UDP = { + 53: 'DNS', + 7: 'PING', + 500: 'IKE', + 2049: 'NFS', + 123: 'NTP', + 520: 'RIP', + 161: 'SNMP', + 162: 'snmptrap', + 514: 'SYSLOG', + 517: 'TALK', + 69: 'TFTP', + 37: 'TIMESTAMP', + 1812: 'RADIUS', + 67: 'DHCP' + } + _PROTO_MAP = { + 'icmp': 'ALL_ICMP', + 'gre': 'GRE', + 'ip': 'ALL', + 'tcp': _PORTS_TCP, + 'udp': _PORTS_UDP + } + + @staticmethod + def get_protocol(protocol, port=None): + """Converts a port number to a service name. + + Args: + protocol: string representing protocol (tcp, udp, etc) + port: integer representing the port number + + Returns: + string + + Raises: + FortiGateValueError: When unsupported protocol is used. + FortiGatePortDoesNotExistError: if the port does not exist. + FortiGateFindServiceError: when unable to find the requested service. + """ + f_proto = FortigatePortMap._PROTO_MAP.get(protocol, None) + if f_proto is None: + raise FortiGateValueError( + '%r protocol is unsupported, supported protocols = %r' % ( + protocol, FortigatePortMap._PROTO_MAP.keys())) + + if isinstance(f_proto, six.string_types): + return f_proto + elif port: + try: + return f_proto[port] + except KeyError: + raise FortiGatePortDoesNotExistError + else: + raise FortiGateFindServiceError( + 'service not found from %r protocol and %r port' % (protocol, port)) + + +class ObjectsContainer(object): + """A Container that holds service and network objects.""" + + def __init__(self): + self._fw_addresses = [] + self._fw_services = [] + + self._fe_dup_check = set() + + def get_fw_addresses(self): + """Returns the collected addresses.""" + self._fw_addresses.extend([' ', 'end', ' ']) + return self._fw_addresses + + def get_fw_services(self): + """Returns the collected services.""" + self._fw_services.extend([' ', 'end', ' ']) + return self._fw_services + + def add_address_to_fw_addresses(self, addr): + """Add address to address store.""" + if addr in self._fe_dup_check: + return + self._fw_addresses.extend(['\tedit %s' % addr, + '\t\tset subnet %s' % addr, + '\tnext']) + self._fe_dup_check.add(addr) + + def add_service_to_fw_services(self, protocol, service): + """Add service to services store.""" + if service in self._fe_dup_check: + return + + self._fw_services.extend( + ['\tedit %s' % service, + '\t\tset protocol TCP/UDP', + '\t\tset %s-portrange %s' % (protocol.lower(), service), + '\tnext']) + + self._fe_dup_check.add(service) + + +class Term(aclgenerator.Term): + """Single Firewall Policy.""" + ALLOWED_PROTO_STRINGS = ['gre', 'icmp', 'ip', 'tcp', 'udp'] + COMMENT_MAX_WIDTH = 70 + + CURRENT_ID = 0 + + def __init__(self, term, object_container): + super(Term, self).__init__(term) + self._term = term + self._obj_container = object_container + + self.id_ = type(self).CURRENT_ID + type(self).CURRENT_ID += 1 + + @staticmethod + def _get_addresses_name(addresses): + """Returns the addresses or 'all' if no addresses specified.""" + v4_addresses = [x.with_prefixlen for x in addresses if + not isinstance(x, nacaddr.IPv6)] + addresses = ' '.join(v4_addresses) + return addresses or 'all' + + @staticmethod + def clean_ports(src_ports, dest_ports): + """Returns a set() of src and dest ports.""" + all_ports = [] + if src_ports: + all_ports += src_ports + if dest_ports: + all_ports += dest_ports + return set(all_ports) + + def _get_services_string(self, protocols, ports): + """Get the service name, if not exist create it. + + Args: + protocols: list of protocols + ports: list of ports + + Returns: + string (all services separated by spaces. + """ + services = [] + if protocols and not ports: + services.append(FortigatePortMap.get_protocol(protocols[0])) + for port in ports: + try: + service = FortigatePortMap.get_protocol(protocols[0], port[0]) + except FortiGatePortDoesNotExistError: + self._obj_container.add_service_to_fw_services(protocols[0], port[0]) + service = str(port[0]) + services.append(service) + + return ' '.join(services) or 'ALL' + + def _generate_address_names(self, *addresses): + """Generate the addresses names (object-network names).""" + for group in addresses: + for addr in group: + if addr and not isinstance(addr, nacaddr.IPv6): + self._obj_container.add_address_to_fw_addresses(addr.with_prefixlen) + + def __str__(self): + lines = [] + + self._generate_address_names(self._term.destination_address, + self._term.source_address) + + dest_addresses = self._get_addresses_name(self._term.destination_address) + src_addresses = self._get_addresses_name(self._term.source_address) + all_ports = self.clean_ports(self._term.source_port, + self._term.destination_port) + + services = self._get_services_string(self._term.protocol, + all_ports) + + lines.append('\t\tset comments %s' % self._term.name) + lines.append('\t\tset srcintf %s' % (self._term.source_interface or 'any')) + lines.append( + '\t\tset dstintf %s' % (self._term.destination_interface or 'any')) + lines.append('\t\tset dstaddr %s' % dest_addresses) + lines.append('\t\tset srcaddr %s' % src_addresses) + lines.append('\t\tset action %s' % _ACTION_TABLE.get(self._term.action[0])) + lines.append('\t\tset service %s' % services) + lines.append('\t\tset schedule always') + if self._term.logging: + lines.append('\t\tset logtraffic all') + + return '\n'.join(lines) + + +class Fortigate(aclgenerator.ACLGenerator): + """A Fortigate policy object.""" + + _PLATFORM = 'fortigate' + _DEFAULT_PROTOCOL = 'ALL' + SUFFIX = '.fcl' + # Protocols should be emitted as numbers. + _PROTO_INT = True + _TERM_REMARK = True + _TERM_MAX_LENGTH = 1023 + + def __init__(self, *args, **kwargs): + self._obj_container = ObjectsContainer() + super(Fortigate, self).__init__(*args, **kwargs) + + def _BuildTokens(self): + """Build supported tokens for platform. + + Returns: + tuple containing both supported tokens and sub tokens. + """ + supported_tokens, supported_sub_tokens = super(Fortigate, + self)._BuildTokens() + + supported_tokens |= {'source_interface', + 'destination_interface', + 'logging'} + + supported_sub_tokens.update({'option': {'from_id'}, + # Warning, some of these are mapped + # differently. See _ACTION_TABLE + 'action': {'accept', 'deny', 'reject', + 'reject-with-tcp-rst'}}) + return supported_tokens, supported_sub_tokens + + def _TranslatePolicy(self, pol, exp_info): + """Translate Capirca pol to fortigate pol.""" + self.fortigate_policies = [] + current_date = datetime.datetime.utcnow().date() + exp_info_date = current_date + datetime.timedelta(weeks=exp_info) + + # a mixed filter outputs both ipv4 and ipv6 acls in the same output file + + for header, terms in pol.filters: + if self._PLATFORM not in header.platforms: + continue + + filter_options = header.FilterOptions(self._PLATFORM) + + if (len(filter_options) < 2 or filter_options[0] != 'from-id'): + raise FilterError( + 'Fortigate Firewall filter arguments must specify from_id') + + from_id = filter_options[1] + Term.CURRENT_ID = int(from_id) + + term_dup_check = set() + + for term in terms: + filter_name = header.FilterName(self._PLATFORM) + if term.expiration: + if term.expiration <= exp_info_date: + logging.info('INFO: Term %s in policy %s expires ' + 'in less than two weeks.', term.name, filter_name) + if term.expiration <= current_date: + logging.warn('WARNING: Term %s in policy %s is expired and ' + 'will not be rendered.', term.name, filter_name) + continue + if term.name in term_dup_check: + raise FortiGateDuplicateTermError('You have a duplicate term: %s' % + term.name) + term_dup_check.add(term.name) + + new_term = Term(term, self._obj_container) + + self.fortigate_policies.append((header, term.name, new_term)) + + def _GetTargetByPolicyID(self, id_): + return '\tedit %s' % id_ + + def __str__(self): + start_addresses = ['config firewall address'] + start_services = ['config firewall service custom'] + start_policies = ['config firewall policy'] + end = ['end'] + target_addresses = [] + target_services = [] + target_policies = [] + + for (_, _, term) in self.fortigate_policies: + target_policies.append(self._GetTargetByPolicyID(term.id_)) + + term_str = str(term) + + target_policies.append(term_str) + + target_policies += ['\tnext', ''] + target_addresses.extend(self._obj_container.get_fw_addresses()) + target_services.extend(self._obj_container.get_fw_services()) + + fw_addresses = start_addresses + target_addresses + fw_services = start_services + target_services + + target = fw_addresses + fw_services + start_policies + target_policies + end + + return '\n'.join(target) diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py new file mode 100644 index 00000000..ed9bacdd --- /dev/null +++ b/tests/lib/fortigate_test.py @@ -0,0 +1,483 @@ +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unittest for fortigate policy rendering module.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import re +import string +import unittest + +from capirca.lib import fortigate +from capirca.lib import nacaddr +from capirca.lib import naming +from capirca.lib import policy +import mock + + +GOOD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate from-id 2 +} +""" + +BAD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate edge-filter +} +""" + +TERM_TEMPLATE = """ +term good-term-2 {{ + source-interface:: {src_interface} + destination-interface:: {dest_interface} + protocol:: {protocol} + destination-address:: {dest_addr} + destination-port:: {dest_port} + source-address:: {src_addr} + source-port:: {src_port} + action:: {action} + logging:: {logging} +}} +""" + +SUPPORTED_TOKENS = { + 'action', + 'comment', + 'destination_address', + 'destination_address_exclude', + 'destination_port', + 'expiration', + 'icmp_type', + 'stateless_reply', + 'logging', + 'name', + 'option', + 'platform', + 'platform_exclude', + 'protocol', + 'source_interface', + 'destination_interface', + 'source_address', + 'source_address_exclude', + 'source_port', + 'translated', + 'verbatim', +} + +SUPPORTED_SUB_TOKENS = { + 'action': {'accept', 'deny', 'reject', + 'reject-with-tcp-rst'}, + 'icmp_type': { + 'alternate-address', + 'certification-path-advertisement', + 'certification-path-solicitation', + 'conversion-error', + 'destination-unreachable', + 'echo-reply', + 'echo-request', + 'mobile-redirect', + 'home-agent-address-discovery-reply', + 'home-agent-address-discovery-request', + 'icmp-node-information-query', + 'icmp-node-information-response', + 'information-request', + 'inverse-neighbor-discovery-advertisement', + 'inverse-neighbor-discovery-solicitation', + 'mask-reply', + 'mask-request', + 'information-reply', + 'mobile-prefix-advertisement', + 'mobile-prefix-solicitation', + 'multicast-listener-done', + 'multicast-listener-query', + 'multicast-listener-report', + 'multicast-router-advertisement', + 'multicast-router-solicitation', + 'multicast-router-termination', + 'neighbor-advertisement', + 'neighbor-solicit', + 'packet-too-big', + 'parameter-problem', + 'redirect', + 'redirect-message', + 'router-advertisement', + 'router-renumbering', + 'router-solicit', + 'router-solicitation', + 'source-quench', + 'time-exceeded', + 'timestamp-reply', + 'timestamp-request', + 'unreachable', + 'version-2-multicast-listener-report', + }, + 'option': {'from_id'} +} + +EXP_INFO = 2 + + +class CustomFormatter(string.Formatter): + DEFAULT_VALUES = { + 'src_interface': 'wan1', + 'dest_interface': 'wan2', + 'protocol': 'tcp', + 'src_addr': 'SOME_HOST', + 'dest_addr': 'SOME_HOST', + 'src_port': 'HTTP', + 'dest_port': 'HTTP', + 'action': 'accept', + 'logging': 'true' + } + + def format(*args, **kwargs): + if 'remove_fields' in kwargs: + args = list(args) + for field in kwargs['remove_fields']: + remove_regex = '.*' + field + '.*' + args[1] = re.sub(remove_regex, '', args[1]) + + return string.Formatter.format(*args, **kwargs) + return string.Formatter.format(*args, **kwargs) + + def get_value(self, key, args, kwds): + try: + return kwds[key] + except KeyError: + return self.DEFAULT_VALUES[key] + + +class FortigateTest(unittest.TestCase): + def setUp(self): + self.naming = mock.create_autospec(naming.Naming) + + def get_addr_side_eff(host): + hosts = { + 'SOME_HOST': [nacaddr.IP('10.0.0.0/8')], + 'SOME_HOST2': [nacaddr.IP('20.0.0.0/8')] + } + return hosts[host] + + def get_port_side_eff(*args): + hosts = { + 'HTTP': ['80'], + 'HTTPS': ['443'], + 'SSH': ['22'], + 'WHOIS': ['43'] + } + return hosts[args[0]] + + self.naming.GetNetAddr.side_effect = get_addr_side_eff + self.naming.GetServiceByProto.side_effect = get_port_side_eff + self.fmt = CustomFormatter() + + def testGoodHeader(self): + term = self.fmt.format(TERM_TEMPLATE) + acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + term, + self.naming), EXP_INFO) + + expected_sig = 'edit 2' + + get_net_calls = [mock.call('SOME_HOST')] * 2 + get_server_by_proto_calls = [mock.call('HTTP', 'tcp')] * 2 + + self.assertIn(expected_sig, str(acl), '[%s]' % str(acl)) + self.naming.GetNetAddr.assert_has_calls(get_net_calls) + self.naming.GetServiceByProto.assert_has_calls(get_server_by_proto_calls) + + def testBadHeader(self): + term = self.fmt.format(TERM_TEMPLATE) + parsed_p = policy.ParsePolicy(BAD_HEADER + term, + self.naming) + + self.assertRaises(fortigate.FilterError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testBuildTokens(self): + term = self.fmt.format(TERM_TEMPLATE) + pol1 = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + term, + self.naming), EXP_INFO) + st, sst = pol1._BuildTokens() + self.assertEqual(st, SUPPORTED_TOKENS) + self.assertEqual(sst, SUPPORTED_SUB_TOKENS) + + def testActionAccept(self): + accept_term = self.fmt.format(TERM_TEMPLATE, action='accept') + + accept_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + accept_term, + self.naming), EXP_INFO) + + accept_sig = 'set action accept' + self.assertIn( + accept_sig, str(accept_acl), '[%s]' % str(accept_acl)) + + def testActionDeny(self): + deny_term = self.fmt.format(TERM_TEMPLATE, action='deny') + reject_term = self.fmt.format(TERM_TEMPLATE, action='reject') + + deny_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + deny_term, + self.naming), EXP_INFO) + reject_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + reject_term, + self.naming), EXP_INFO) + + deny_sig = 'set action deny' + self.assertIn( + deny_sig, str(deny_sig), '[%s]' % str(deny_acl)) + self.assertIn( + deny_sig, str(reject_acl), '[%s]' % str(reject_acl)) + + def testAddressDiff(self): + diff_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST', + dest_addr='SOME_HOST2') + + diff_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + diff_addr_term, + self.naming), EXP_INFO) + + src_sig = 'set srcaddr 10.0.0.0/8' + dest_sig = 'set dstaddr 20.0.0.0/8' + + self.assertTrue( + src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), + '[%s]' % str(diff_addr_acl)) + + def testAddressSame(self): + same_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST2', + dest_addr='SOME_HOST2') + + same_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + same_addr_term, + self.naming), EXP_INFO) + + self.assertEqual( + str(same_addr_acl).count('set subnet'), 1) + + def testAddressAny(self): + any_src_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_addr',)) + any_dest_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_addr',)) + + any_src_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_src_term, + self.naming), EXP_INFO) + + any_dest_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_dest_term, + self.naming), EXP_INFO) + + any_dest_sig = 'set dstaddr all' + any_src_sig = 'set srcaddr all' + + self.assertIn( + any_src_sig, str(any_src_acl), '[%s]' % str(any_src_acl)) + self.assertIn( + any_dest_sig, str(any_dest_acl), '[%s]' % str(any_dest_acl)) + + def testServiceDupPort(self): + dup_port_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + dest_port='HTTP') + + dup_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + dup_port_term, + self.naming), EXP_INFO) + + dup_sig = 'set service HTTP\n' + + self.assertIn( + dup_sig, str(dup_acl), '[%s]' % str(dup_acl)) + + def testServiceDiffPort(self): + diff_port_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + dest_port='HTTPS') + + diff_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + diff_port_term, + self.naming), EXP_INFO) + + diff_sig = 'set service HTTP HTTPS\n' + + self.assertIn( + diff_sig, str(diff_acl), '[%s]' % str(diff_acl)) + + def testServiceSrcOnly(self): + src_only_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + remove_fields=('dest_port',)) + + src_only_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + src_only_term, + self.naming), EXP_INFO) + + src_only_sig = 'set service HTTP\n' + + self.assertIn( + src_only_sig, str(src_only_acl), '[%s]' % str(src_only_acl)) + + def testServiceIp(self): + icmp_term = self.fmt.format(TERM_TEMPLATE, + protocol='icmp', + remove_fields=('dest_port', 'src_port')) + ip_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_port', + 'src_port', 'protocol')) + + icmp_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + icmp_term, + self.naming), EXP_INFO) + ip_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + ip_term, + self.naming), EXP_INFO) + + icmp_sig = 'set service ALL_ICMP\n' + ip_sig = 'set service ALL\n' + + self.assertIn( + icmp_sig, str(icmp_acl), '[%s]' % str(icmp_acl)) + self.assertIn( + ip_sig, str(ip_acl), '[%s]' % str(ip_acl)) + + def testServiceCustomPort(self): + custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') + + custom_port_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + custom_port_term, + self.naming), EXP_INFO) + + custom_port_sig = ('config firewall service custom\n\tedit 43\n\t\t' + 'set protocol TCP/UDP\n\t\tset tcp-portrange 43\n\tnext') + + self.assertIn( + custom_port_sig, str(custom_port_acl), '[%s]' % str(custom_port_acl)) + + def testInterfaceNone(self): + no_interfaces_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_interface', + 'dest_interface')) + + no_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, + self.naming), EXP_INFO) + + no_interfaces_sig = 'set srcintf any\n\t\tset dstintf any' + + self.assertIn( + no_interfaces_sig, str(no_interfaces_acl), + '[%s]' % str(no_interfaces_acl)) + + def testInterfaceSrcOnly(self): + src_only_int_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + remove_fields=('dest_interface',)) + + src_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + src_only_int_term, + self.naming), EXP_INFO) + + src_int_only_sig = 'set srcintf wan1\n\t\tset dstintf any' + + self.assertIn( + src_int_only_sig, str(src_only_int_acl), + '[%s]' % str(src_only_int_acl)) + + def testInterfaceDestOnly(self): + dest_only_int_term = self.fmt.format(TERM_TEMPLATE, + dest_interface='wan2', + remove_fields=('src_interface',)) + + dest_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, + self.naming), EXP_INFO) + + dest_int_only_sig = 'set srcintf any\n\t\tset dstintf wan2' + + self.assertIn( + dest_int_only_sig, str(dest_only_int_acl), + '[%s]' % str(dest_only_int_acl)) + + def testInterfaceBoth(self): + both_interfaces_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + dest_interface='wan2') + + both_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, + self.naming), EXP_INFO) + + both_interfaces_sig = 'set srcintf wan1\n\t\tset dstintf wan2' + + self.assertIn( + both_interfaces_sig, str(both_interfaces_acl), + '[%s]' % str(both_interfaces_acl)) + + def testLogging(self): + log_term = self.fmt.format(TERM_TEMPLATE, + logging='true') + no_log_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('logging',)) + + log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + log_term, + self.naming), EXP_INFO) + no_log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_log_term, + self.naming), EXP_INFO) + + log_sig = 'set logtraffic all' + + self.assertIn( + log_sig, str(log_acl), '[%s]' % str(log_acl)) + self.assertNotIn( + log_sig, str(no_log_term), '[%s]' % str(no_log_acl)) + + def testDuplicateTermError(self): + term = self.fmt.format(TERM_TEMPLATE, logging='true') + duplicate_terms = term + term + parsed_p = policy.ParsePolicy(GOOD_HEADER + duplicate_terms, + self.naming) + + self.assertRaises(fortigate.FortiGateDuplicateTermError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testPortMap(self): + port_map = fortigate.FortigatePortMap() + self.assertEqual('SSH', port_map.get_protocol('tcp', 22)) + self.assertRaises(fortigate.FortiGatePortDoesNotExistError, + port_map.get_protocol, + 'tcp', 5000) + self.assertRaises(fortigate.FortiGateValueError, + port_map.get_protocol, + 'bad_proto', 22) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file