Skip to content

base_channel

scrapli_netconf.channel.base_channel

BaseNetconfChannel

Bases: BaseChannel

Source code in channel/base_channel.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class BaseNetconfChannel(BaseChannel):
    _netconf_base_channel_args: NetconfBaseChannelArgs

    def _process_capabilities_exchange(self, raw_server_capabilities: bytes) -> None:
        """
        Process received capabilities; return client capabilities

        Args:
            raw_server_capabilities: raw bytes containing server capabilities

        Returns:
            None

        Raises:
            CapabilityNotSupported: if user has provided a preferred netconf version but it is not
                available in servers offered capabilities

        """
        server_capabilities = self._parse_server_capabilities(
            raw_server_capabilities=raw_server_capabilities
        )
        self._netconf_base_channel_args.server_capabilities = server_capabilities

        if "urn:ietf:params:netconf:base:1.1" in server_capabilities:
            final_channel_version = NetconfVersion.VERSION_1_1
        else:
            final_channel_version = NetconfVersion.VERSION_1_0

        if self._netconf_base_channel_args.netconf_version != NetconfVersion.UNKNOWN:
            if self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_0:
                if "urn:ietf:params:netconf:base:1.0" not in server_capabilities:
                    raise CapabilityNotSupported(
                        "user requested netconf version 1.0 but capability not offered"
                    )
                final_channel_version = NetconfVersion.VERSION_1_0
            elif self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_1:
                if "urn:ietf:params:netconf:base:1.1" not in server_capabilities:
                    raise CapabilityNotSupported(
                        "user requested netconf version 1.1 but capability not offered"
                    )
                final_channel_version = NetconfVersion.VERSION_1_1

        if final_channel_version == NetconfVersion.VERSION_1_0:
            self._netconf_base_channel_args.netconf_version = NetconfVersion.VERSION_1_0
            self._base_channel_args.comms_prompt_pattern = "]]>]]>"
            self._netconf_base_channel_args.client_capabilities = (
                NetconfClientCapabilities.CAPABILITIES_1_0
            )
        else:
            self._netconf_base_channel_args.netconf_version = NetconfVersion.VERSION_1_1
            self._base_channel_args.comms_prompt_pattern = r"^##$"
            self._netconf_base_channel_args.client_capabilities = (
                NetconfClientCapabilities.CAPABILITIES_1_1
            )

    def _parse_server_capabilities(self, raw_server_capabilities: bytes) -> List[str]:
        """
        Parse netconf server capabilities

        Args:
            raw_server_capabilities: raw bytes containing server capabilities

        Returns:
            N/A  # noqa: DAR202

        Raises:
            CouldNotExchangeCapabilities: if server capabilities cannot be parsed

        """
        server_capabilities = []

        # matches hello with or without namespace
        filtered_raw_server_capabilities = re.search(
            pattern=rb"(<(\w+\:){0,1}hello.*<\/(\w+\:){0,1}hello>)",
            string=raw_server_capabilities,
            flags=re.I | re.S,
        )
        if filtered_raw_server_capabilities is None:
            msg = "failed to parse server capabilities"
            raise CouldNotExchangeCapabilities(msg)

        # IOSXR/XR7 7.3.1 returns corrupt '<capabil\n\nity>' property on call-home line, so replace
        # newlines to have a parsable read
        server_capabilities_xml = etree.fromstring(
            filtered_raw_server_capabilities.groups()[0].replace(b"\n", b"")
        )
        for elem in server_capabilities_xml.iter():
            if "capability" not in elem.tag:
                continue
            server_capabilities.append(elem.text.strip())
        self.logger.info(f"server capabilities received and parsed: {server_capabilities}")
        return server_capabilities

    def _process_output(self, buf: bytes, strip_prompt: bool) -> bytes:
        """
        Override scrapli _process_output as this is unnecessary for scrapli_netconf

        Args:
            buf: bytes output from the device
            strip_prompt: ignored in this base class; for LSP reasons for subclasses

        Returns:
            bytes: output of joined output lines optionally with prompt removed

        Raises:
            N/A

        """
        _ = strip_prompt
        return buf

    def _pre_send_client_capabilities(
        self, client_capabilities: NetconfClientCapabilities
    ) -> bytes:
        """
        Handle pre "_send_client_capabilities" tasks for consistency between sync/async versions

        Args:
            client_capabilities: string of client netconf capabilities to send to server

        Returns:
            bytes: bytes of client capabilities to send to the channel

        Raises:
            N/A

        """
        self.logger.info("sending client capabilities")
        bytes_client_capabilities: bytes = client_capabilities.value.encode()
        self.logger.debug(f"attempting to send capabilities: {client_capabilities}")
        self.write(client_capabilities.value)
        return bytes_client_capabilities