scrapli_netconf.channel.base_channel
BaseNetconfChannel
Bases: BaseChannel
Source code in channel/base_channel.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 | class BaseNetconfChannel(BaseChannel):
_netconf_base_channel_args: NetconfBaseChannelArgs
def _process_capabilities_exchange(self, raw_server_capabilities: bytes) -> None:
"""
Process received capabilities; return client capabilities
Args:
raw_server_capabilities: raw bytes containing server capabilities
Returns:
None
Raises:
CapabilityNotSupported: if user has provided a preferred netconf version but it is not
available in servers offered capabilities
"""
server_capabilities = self._parse_server_capabilities(
raw_server_capabilities=raw_server_capabilities
)
self._netconf_base_channel_args.server_capabilities = server_capabilities
if "urn:ietf:params:netconf:base:1.1" in server_capabilities:
final_channel_version = NetconfVersion.VERSION_1_1
else:
final_channel_version = NetconfVersion.VERSION_1_0
if self._netconf_base_channel_args.netconf_version != NetconfVersion.UNKNOWN:
if self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_0:
if "urn:ietf:params:netconf:base:1.0" not in server_capabilities:
raise CapabilityNotSupported(
"user requested netconf version 1.0 but capability not offered"
)
final_channel_version = NetconfVersion.VERSION_1_0
elif self._netconf_base_channel_args.netconf_version == NetconfVersion.VERSION_1_1:
if "urn:ietf:params:netconf:base:1.1" not in server_capabilities:
raise CapabilityNotSupported(
"user requested netconf version 1.1 but capability not offered"
)
final_channel_version = NetconfVersion.VERSION_1_1
if final_channel_version == NetconfVersion.VERSION_1_0:
self._netconf_base_channel_args.netconf_version = NetconfVersion.VERSION_1_0
self._base_channel_args.comms_prompt_pattern = "]]>]]>"
self._netconf_base_channel_args.client_capabilities = (
NetconfClientCapabilities.CAPABILITIES_1_0
)
else:
self._netconf_base_channel_args.netconf_version = NetconfVersion.VERSION_1_1
self._base_channel_args.comms_prompt_pattern = r"^##$"
self._netconf_base_channel_args.client_capabilities = (
NetconfClientCapabilities.CAPABILITIES_1_1
)
def _parse_server_capabilities(self, raw_server_capabilities: bytes) -> List[str]:
"""
Parse netconf server capabilities
Args:
raw_server_capabilities: raw bytes containing server capabilities
Returns:
N/A # noqa: DAR202
Raises:
CouldNotExchangeCapabilities: if server capabilities cannot be parsed
"""
server_capabilities = []
# matches hello with or without namespace
filtered_raw_server_capabilities = re.search(
pattern=rb"(<(\w+\:){0,1}hello.*<\/(\w+\:){0,1}hello>)",
string=raw_server_capabilities,
flags=re.I | re.S,
)
if filtered_raw_server_capabilities is None:
msg = "failed to parse server capabilities"
raise CouldNotExchangeCapabilities(msg)
# IOSXR/XR7 7.3.1 returns corrupt '<capabil\n\nity>' property on call-home line, so replace
# newlines to have a parsable read
server_capabilities_xml = etree.fromstring(
filtered_raw_server_capabilities.groups()[0].replace(b"\n", b"")
)
for elem in server_capabilities_xml.iter():
if "capability" not in elem.tag:
continue
server_capabilities.append(elem.text.strip())
self.logger.info(f"server capabilities received and parsed: {server_capabilities}")
return server_capabilities
def _process_output(self, buf: bytes, strip_prompt: bool) -> bytes:
"""
Override scrapli _process_output as this is unnecessary for scrapli_netconf
Args:
buf: bytes output from the device
strip_prompt: ignored in this base class; for LSP reasons for subclasses
Returns:
bytes: output of joined output lines optionally with prompt removed
Raises:
N/A
"""
_ = strip_prompt
return buf
def _pre_send_client_capabilities(
self, client_capabilities: NetconfClientCapabilities
) -> bytes:
"""
Handle pre "_send_client_capabilities" tasks for consistency between sync/async versions
Args:
client_capabilities: string of client netconf capabilities to send to server
Returns:
bytes: bytes of client capabilities to send to the channel
Raises:
N/A
"""
self.logger.info("sending client capabilities")
bytes_client_capabilities: bytes = client_capabilities.value.encode()
self.logger.debug(f"attempting to send capabilities: {client_capabilities}")
self.write(client_capabilities.value)
return bytes_client_capabilities
|