Skip to content

response

scrapli_netconf.response

NetconfResponse

Bases: Response

Source code in scrapli_netconf/response.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
class NetconfResponse(Response):
    # intentionally overriding base class' list of strings for failed when contains
    failed_when_contains: List[bytes]  # type: ignore[assignment]

    def __init__(
        self,
        netconf_version: NetconfVersion,
        xml_input: Element,
        strip_namespaces: bool = True,
        failed_when_contains: Optional[Union[bytes, List[bytes]]] = None,
        **kwargs: Any,
    ):
        """
        Scrapli Netconf NetconfResponse

        Store channel_input, resulting output, and start/end/elapsed time information. Attempt to
        determine if command was successful or not and reflect that in a failed attribute.

        Args:
            netconf_version: string of netconf version; `1.0`|`1.1`
            xml_input: lxml Element of input to be sent to device
            strip_namespaces: strip out all namespaces if True, otherwise ignore them
            failed_when_contains: list of bytes that, if present in final output, represent a
                failed command/interaction -- should generally be left alone for netconf. Note that
                this differs from the base scrapli Response object as we want to be parsing/checking
                for these strings in raw byte strings we get back from the device
            kwargs: kwargs for instantiation of scrapli Response object supertype

        Returns:
            N/A  # noqa: DAR202

        Raises:
            ValueError: if invalid netconf_version string

        """
        if netconf_version not in (NetconfVersion.VERSION_1_0, NetconfVersion.VERSION_1_1):
            raise ValueError(f"`netconf_version` should be one of 1.0|1.1, got `{netconf_version}`")

        self.netconf_version = netconf_version
        self.xml_input = xml_input
        self.strip_namespaces = strip_namespaces
        self.xml_result: Element

        super().__init__(**kwargs)

        if failed_when_contains is None:
            # match on both opening and closing tags too so we never have to think about/compare
            # things with namespaces (the closing tags wont have namespaces)
            failed_when_contains = [
                b"</rpc-error>",
                b"</rpc-errors>",
                b"<rpc-error>",
                b"<rpc-errors>",
            ]
        if isinstance(failed_when_contains, bytes):
            failed_when_contains = [failed_when_contains]
        self.failed_when_contains = failed_when_contains

        self.error_messages: List[str] = []

    def record_response(self, result: bytes) -> None:
        """
        Record channel_input results and elapsed time of channel input/reading output

        Args:
            result: bytes result of channel_input

        Returns:
            N/A

        Raises:
            N/A

        """
        self.finish_time = datetime.now()
        self.elapsed_time = (self.finish_time - self.start_time).total_seconds()
        self.raw_result = result

        if not self.failed_when_contains:
            self.failed = False
        elif not any(err in self.raw_result for err in self.failed_when_contains):
            self.failed = False

        if self.netconf_version == NetconfVersion.VERSION_1_0:
            self._record_response_netconf_1_0()
        else:
            self._record_response_netconf_1_1()

        if self.failed:
            self._fetch_error_messages()

    @classmethod
    def _parse_raw_result(cls, raw_result: bytes) -> bytes:
        # remove the message end characters and xml document header see:
        # https://github.com/scrapli/scrapli_netconf/issues/1
        _raw_result = raw_result.replace(b"]]>]]>", b"").replace(
            b'<?xml version="1.0" encoding="UTF-8"?>', b""
        )

        parsed_result: Union[bytes, None] = etree.fromstring(
            _raw_result,
            parser=PARSER,
        )

        if parsed_result is None:
            # if we failed to parse, try again after stripping out control chars, if we still
            # end up with None, oh well, raise an exception later on down the road
            parsed_result = etree.fromstring(
                CONTROL_CHARS.sub(b"", _raw_result),
                parser=PARSER,
            )

        return parsed_result

    def _record_response_netconf_1_0(self) -> None:
        """
        Record response for netconf version 1.0

        Args:
            N/A

        Returns:
            N/A  # noqa: DAR202

        Raises:
            N/A

        """
        self.xml_result = self._parse_raw_result(self.raw_result)

        if self.strip_namespaces:
            self.xml_result = remove_namespaces(self.xml_result)
            self.result = etree.tostring(self.xml_result, pretty_print=True).decode()
        else:
            self.result = etree.tostring(self.xml_result, pretty_print=True).decode()

    def _validate_chunk_size_netconf_1_1(self, size: int, chunk: bytes) -> None:
        """
        Validate individual chunk size; handle parsing trailing new lines for chunk sizes

        It seems that some platforms behave slightly differently than others (looking at you IOSXE)
        in the way they count chunk sizes with respect to trailing whitespace. Per my reading of the
        RFC, the response for a netconf 1.1 response should look like this:

        ```
        ##XYZ
        <somexml>
        ##
        ```

        Where "XYZ" is an integer number of the count of chars in the following chunk (the chars up
        to the next "##" symbols), then the actual XML response, then a new line(!!!!) and a pair of
        hash symbols to indicate the chunk is complete.

        IOSXE seems to *not* want to see the newline between the XML payload and the double hash
        symbols... instead when it sees that newline it immediately returns the response. This
        breaks the core behavior of scrapli in that scrapli always writes the input, then reads the
        written inputs off the channel *before* sending a return character. This ensures that we
        never have to deal with stripping out the inputs and such because it has already been read.
        With IOSXE Behaving this way, we have to instead use `send_input` with the `eager` flag set
        -- this means that we do *not* read the inputs, we simply send a return. We then have to do
        a little extra parsing to strip out the inputs, but thats no big deal...

        Where this finally gets to "spacing" -- IOSXE seems to include trailing newlines *sometimes*
        but not other times, whereas IOSXR (for example) *always* counts a single trailing newline
        (after the XML). SO.... long story long... (the above chunk stuff doesn't necessarily matter
        for this, but felt like as good a place to document it as any...) this method deals w/
        newline counts -- we check the expected chunk length against the actual char count, the char
        count with all trailing whitespace stripped, and the count of the chunk + a *single*
        trailing newline character...

        FIN

        Args:
            size: the expected size of the chunk contents
            chunk: the chunk contents

        Returns:
            N/A

        Raises:
            N/A

        """
        actual_size = len(chunk)
        rstripped_size = len(chunk.rstrip())

        trailing_newline_count = actual_size - rstripped_size
        if trailing_newline_count > 1:
            extraneous_trailing_newline_count = trailing_newline_count - 1
        else:
            extraneous_trailing_newline_count = 1
        trimmed_newline_len = actual_size - extraneous_trailing_newline_count

        if rstripped_size == 0:
            # at least nokia tends to have itty bitty chunks of one element, and/or chunks that have
            # *only* whitespace and our regex ignores this, so if there was/is nothing in the result
            # section we can assume it was just whitespace and move on w/our lives
            actual_size = size

        if size == actual_size:
            return
        if size == rstripped_size:
            return
        if size == trimmed_newline_len:
            return

        LOG.critical(
            f"Return element length invalid, expected {size} got {actual_size} for "
            f"element: {repr(chunk)}"
        )
        self.failed = True

    def _record_response_netconf_1_1(self) -> None:
        """
        Record response for netconf version 1.1

        Args:
            N/A

        Returns:
            N/A

        Raises:
            N/A

        """
        chunk_matches = re.finditer(pattern=CHUNK_MATCH_1_1, string=self.raw_result)

        chunks: List[bytes] = []

        for chunk_match in chunk_matches:
            size = int(chunk_match.groupdict().get("size", 0))
            chunk = chunk_match.groupdict().get("content", "")

            self._validate_chunk_size_netconf_1_1(size=size, chunk=chunk)

            chunks.append(chunk[:-1])

        self.xml_result = etree.fromstring(
            b"\n".join(
                [
                    # remove the message end characters and xml document header see:
                    # https://github.com/scrapli/scrapli_netconf/issues/1
                    chunk.replace(b'<?xml version="1.0" encoding="UTF-8"?>', b"")
                    for chunk in chunks
                ]
            ),
            parser=PARSER,
        )

        if self.strip_namespaces:
            self.xml_result = remove_namespaces(self.xml_result)
            self.result = etree.tostring(self.xml_result, pretty_print=True).decode()
        else:
            self.result = etree.tostring(self.xml_result, pretty_print=True).decode()

    def _fetch_error_messages(self) -> None:
        """
        Fetch all error messages (if any)

        RFC states that there MAY be more than one rpc-error so we just xpath for all
        "error-message" tags and pull out the text of those elements. The strip is just to remove
        leading/trailing white space to make things look a bit nicer.

        Args:
            N/A

        Returns:
            N/A

        Raises:
            N/A

        """
        err_messages = self.xml_result.xpath("//rpc-error/error-message")
        self.error_messages = [err.text.strip() for err in err_messages]

    def get_xml_elements(self) -> Dict[str, Element]:
        """
        Parse each section under "data" into a dict of {tag: Element} for easy viewing/parsing

        Args:
            N/A

        Returns:
            xml_elements: dictionary of tag: Element

        Raises:
            N/A

        """
        xml_elements = {}
        data_element = self.xml_result.find("data", namespaces=self.xml_result.nsmap)

        # juniper doesn't return data in a "data" element for bare rpc calls, guard against that
        # breaking the iterchildren()
        if data_element is not None:
            for child in data_element.iterchildren():
                _tag = etree.QName(child.tag).localname
                xml_elements[_tag] = child
        return xml_elements

    def textfsm_parse_output(
        self, template: Union[str, TextIO, None] = None, to_dict: bool = True
    ) -> Union[Dict[str, Any], List[Any]]:
        """
        Parse results with textfsm, always return structured data

        Returns an empty list if parsing fails!

        Args:
            template: string path to textfsm template or opened textfsm template file
            to_dict: convert textfsm output from list of lists to list of dicts -- basically create
                dict from header and row data so it is easier to read/parse the output

        Returns:
            structured_result: empty list or parsed data from textfsm

        Raises:
            NotImplementedError: always!

        """
        raise NotImplementedError("No textfsm parsing for netconf output!")

    def genie_parse_output(self) -> Union[Dict[str, Any], List[Any]]:
        """
        Override scrapli Response `genie_parse_output` method; not applicable for netconf

        Args:
            N/A

        Returns:
            N/A

        Raises:
            NotImplementedError: always

        """
        raise NotImplementedError("No genie parsing for netconf output!")

    def raise_for_status(self) -> None:
        """
        Raise a `ScrapliCommandFailure` if any elements are failed

        Overrides scrapli core Response.raise_for_status to include rpc error message(s).

        Args:
            N/A

        Returns:
            None

        Raises:
            ScrapliCommandFailure: if any elements are failed

        """
        if self.failed:
            raise ScrapliCommandFailure(
                f"operation failed, reported rpc errors: {self.error_messages}"
            )

__init__(netconf_version: NetconfVersion, xml_input: Element, strip_namespaces: bool = True, failed_when_contains: Optional[Union[bytes, List[bytes]]] = None, **kwargs: Any)

Scrapli Netconf NetconfResponse

Store channel_input, resulting output, and start/end/elapsed time information. Attempt to determine if command was successful or not and reflect that in a failed attribute.

Parameters:

Name Type Description Default
netconf_version NetconfVersion

string of netconf version; 1.0|1.1

required
xml_input Element

lxml Element of input to be sent to device

required
strip_namespaces bool

strip out all namespaces if True, otherwise ignore them

True
failed_when_contains Optional[Union[bytes, List[bytes]]]

list of bytes that, if present in final output, represent a failed command/interaction -- should generally be left alone for netconf. Note that this differs from the base scrapli Response object as we want to be parsing/checking for these strings in raw byte strings we get back from the device

None
kwargs

kwargs for instantiation of scrapli Response object supertype

required

Returns:

Type Description

N/A # noqa: DAR202

Raises:

Type Description
ValueError

if invalid netconf_version string

Source code in scrapli_netconf/response.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    netconf_version: NetconfVersion,
    xml_input: Element,
    strip_namespaces: bool = True,
    failed_when_contains: Optional[Union[bytes, List[bytes]]] = None,
    **kwargs: Any,
):
    """
    Scrapli Netconf NetconfResponse

    Store channel_input, resulting output, and start/end/elapsed time information. Attempt to
    determine if command was successful or not and reflect that in a failed attribute.

    Args:
        netconf_version: string of netconf version; `1.0`|`1.1`
        xml_input: lxml Element of input to be sent to device
        strip_namespaces: strip out all namespaces if True, otherwise ignore them
        failed_when_contains: list of bytes that, if present in final output, represent a
            failed command/interaction -- should generally be left alone for netconf. Note that
            this differs from the base scrapli Response object as we want to be parsing/checking
            for these strings in raw byte strings we get back from the device
        kwargs: kwargs for instantiation of scrapli Response object supertype

    Returns:
        N/A  # noqa: DAR202

    Raises:
        ValueError: if invalid netconf_version string

    """
    if netconf_version not in (NetconfVersion.VERSION_1_0, NetconfVersion.VERSION_1_1):
        raise ValueError(f"`netconf_version` should be one of 1.0|1.1, got `{netconf_version}`")

    self.netconf_version = netconf_version
    self.xml_input = xml_input
    self.strip_namespaces = strip_namespaces
    self.xml_result: Element

    super().__init__(**kwargs)

    if failed_when_contains is None:
        # match on both opening and closing tags too so we never have to think about/compare
        # things with namespaces (the closing tags wont have namespaces)
        failed_when_contains = [
            b"</rpc-error>",
            b"</rpc-errors>",
            b"<rpc-error>",
            b"<rpc-errors>",
        ]
    if isinstance(failed_when_contains, bytes):
        failed_when_contains = [failed_when_contains]
    self.failed_when_contains = failed_when_contains

    self.error_messages: List[str] = []

genie_parse_output() -> Union[Dict[str, Any], List[Any]]

Override scrapli Response genie_parse_output method; not applicable for netconf

Returns:

Type Description
Union[Dict[str, Any], List[Any]]

N/A

Raises:

Type Description
NotImplementedError

always

Source code in scrapli_netconf/response.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def genie_parse_output(self) -> Union[Dict[str, Any], List[Any]]:
    """
    Override scrapli Response `genie_parse_output` method; not applicable for netconf

    Args:
        N/A

    Returns:
        N/A

    Raises:
        NotImplementedError: always

    """
    raise NotImplementedError("No genie parsing for netconf output!")

get_xml_elements() -> Dict[str, Element]

Parse each section under "data" into a dict of {tag: Element} for easy viewing/parsing

Returns:

Name Type Description
xml_elements Dict[str, Element]

dictionary of tag: Element

Source code in scrapli_netconf/response.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def get_xml_elements(self) -> Dict[str, Element]:
    """
    Parse each section under "data" into a dict of {tag: Element} for easy viewing/parsing

    Args:
        N/A

    Returns:
        xml_elements: dictionary of tag: Element

    Raises:
        N/A

    """
    xml_elements = {}
    data_element = self.xml_result.find("data", namespaces=self.xml_result.nsmap)

    # juniper doesn't return data in a "data" element for bare rpc calls, guard against that
    # breaking the iterchildren()
    if data_element is not None:
        for child in data_element.iterchildren():
            _tag = etree.QName(child.tag).localname
            xml_elements[_tag] = child
    return xml_elements

raise_for_status() -> None

Raise a ScrapliCommandFailure if any elements are failed

Overrides scrapli core Response.raise_for_status to include rpc error message(s).

Returns:

Type Description
None

None

Raises:

Type Description
ScrapliCommandFailure

if any elements are failed

Source code in scrapli_netconf/response.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def raise_for_status(self) -> None:
    """
    Raise a `ScrapliCommandFailure` if any elements are failed

    Overrides scrapli core Response.raise_for_status to include rpc error message(s).

    Args:
        N/A

    Returns:
        None

    Raises:
        ScrapliCommandFailure: if any elements are failed

    """
    if self.failed:
        raise ScrapliCommandFailure(
            f"operation failed, reported rpc errors: {self.error_messages}"
        )

record_response(result: bytes) -> None

Record channel_input results and elapsed time of channel input/reading output

Parameters:

Name Type Description Default
result bytes

bytes result of channel_input

required

Returns:

Type Description
None

N/A

Source code in scrapli_netconf/response.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def record_response(self, result: bytes) -> None:
    """
    Record channel_input results and elapsed time of channel input/reading output

    Args:
        result: bytes result of channel_input

    Returns:
        N/A

    Raises:
        N/A

    """
    self.finish_time = datetime.now()
    self.elapsed_time = (self.finish_time - self.start_time).total_seconds()
    self.raw_result = result

    if not self.failed_when_contains:
        self.failed = False
    elif not any(err in self.raw_result for err in self.failed_when_contains):
        self.failed = False

    if self.netconf_version == NetconfVersion.VERSION_1_0:
        self._record_response_netconf_1_0()
    else:
        self._record_response_netconf_1_1()

    if self.failed:
        self._fetch_error_messages()

textfsm_parse_output(template: Union[str, TextIO, None] = None, to_dict: bool = True) -> Union[Dict[str, Any], List[Any]]

Parse results with textfsm, always return structured data

Returns an empty list if parsing fails!

Parameters:

Name Type Description Default
template Union[str, TextIO, None]

string path to textfsm template or opened textfsm template file

None
to_dict bool

convert textfsm output from list of lists to list of dicts -- basically create dict from header and row data so it is easier to read/parse the output

True

Returns:

Name Type Description
structured_result Union[Dict[str, Any], List[Any]]

empty list or parsed data from textfsm

Raises:

Type Description
NotImplementedError

always!

Source code in scrapli_netconf/response.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def textfsm_parse_output(
    self, template: Union[str, TextIO, None] = None, to_dict: bool = True
) -> Union[Dict[str, Any], List[Any]]:
    """
    Parse results with textfsm, always return structured data

    Returns an empty list if parsing fails!

    Args:
        template: string path to textfsm template or opened textfsm template file
        to_dict: convert textfsm output from list of lists to list of dicts -- basically create
            dict from header and row data so it is easier to read/parse the output

    Returns:
        structured_result: empty list or parsed data from textfsm

    Raises:
        NotImplementedError: always!

    """
    raise NotImplementedError("No textfsm parsing for netconf output!")