Skip to content

sync_driver

scrapli_community.fortinet.fortios.sync_driver

FortinetFortiOSDriver

Bases: GenericDriver

Fortinet FortiOS platform class

Attributes:

Name Type Description
_vdoms_enabled bool

True when device is in multi-VDOM mode

_vdom_list List[str]

list of VDOMs read from device when needed

_original_console str

more|standard, read from device in order to restore it by cleanup

Source code in fortinet/fortios/sync_driver.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
class FortinetFortiOSDriver(GenericDriver):
    """Fortinet FortiOS platform class

    Attributes:
        _vdoms_enabled (bool): True when device is in multi-VDOM mode
        _vdom_list (List[str]): list of VDOMs read from device when needed
        _original_console (str): more|standard, read from device in order to restore it by cleanup
    """

    def __init__(self, **kwargs: Any):
        self._vdoms_enabled: bool = False
        self._vdom_list: List[str] = []
        self._original_console: str = ""
        super().__init__(**kwargs)

    def _vdoms_status(self) -> bool:
        """Determine whether virtual domains are enabled or not

        Returns:
            True is device is configured with multi VDOM mode
        """
        output = self.send_command('get system status | grep "Virtual domain configuration"')

        return bool(re.search(r"Virtual domain configuration: (multiple|enable)", output.result))

    def prepare_session(self) -> None:
        """Prepare session"""

        # check if user configured post-login banner which requires acceptance
        # config system global
        #     set post-login-banner enable
        # end
        initial = self.channel.read()
        if "(Press 'a' to accept):" in str(initial):
            self.channel.write("a")
        self.get_prompt()
        self._vdoms_enabled = self._vdoms_status()
        if self._vdoms_enabled:
            self.context("global")
        response = self.send_command("get system console | grep ^output")
        self._original_console = re.findall(r".*: (\w+)", response.result)[0]
        if self._original_console != "standard":
            disable_paging = textwrap.dedent(
                """\
                config system console
                set output standard
                end"""
            )
            if self._vdoms_enabled:  # we exit from global too
                disable_paging += "\nend"
            self.send_commands(disable_paging.splitlines())

    def cleanup_session(self) -> None:
        """Restore paging if necessary"""
        if self._original_console != "standard":
            if self._vdoms_enabled:
                self.context("global")
            else:
                self._to_system()
            restore_console = textwrap.dedent(
                f"""\
                config system console
                set output {self._original_console}
                end"""
            )
            self.send_commands(restore_console.splitlines())

    def _to_system(self) -> None:
        """Abort everything and go to the root prompt

        Note:
            This can't handle all cases, user needs to ensure all command blocks are closed.
            This won't exit deeply nested config blocks!
        """
        prompt = self.get_prompt()
        if "(" in prompt:
            self.send_commands(["abort", "end"])

    def gather_vdoms(self) -> Union[None, List[str]]:
        """Gather list of VDOMs

        Returns:
            None: if device is not in multi VDOM mode
            List[str]: list of VDOM names configured
        """

        if not self._vdoms_enabled:
            # device is not in multi VDOM mode
            return None
        self._to_system()
        output = self.send_command('show | grep "config vdom" -f -A1')
        # """
        # FIREWALL # show | grep "config vdom" -f -A1
        # config vdom
        # edit root
        # --
        # config vdom
        # edit root
        # --
        # config vdom
        # edit test1
        # """
        self._vdom_list = list(set(re.findall(r"^edit (\w+)$", output.result, re.M)))
        return self._vdom_list

    def context(self, context: str) -> Union[None, str]:
        """Change context / VDOM

        This method will abort any config block / VDOM and change to the specified context.
        If device is not in multi-VDOM mode, do nothing. If specified VDOM is not pre-defined,
        query device for available VDOMs to check if it's possible.

        Pre-defined contexts:
            * system : root prompt, not in any config block or VDOM
            * global : in global context
            * root : default VDOM which cannot be renamed

        Args:
            context (str): VDOM name or pre-defined context name

        Returns:
            None if context change is not possible or context name

        Raises:
            ScrapliCommandFailure: on context changing errors

        """
        self.logger.debug("Changing to context %s", context)
        if not self._vdoms_enabled:
            return None
        # if context is a non-predefined one, gather VDOM list from device
        if not self._vdom_list and context not in ["system", "global", "root"]:
            # gather list of VDOMs
            self.gather_vdoms()  # slow :(
        else:
            self._to_system()
        if context == "system":
            # we are already here
            pass
        elif context == "global":
            response = self.send_command("config global")
            if response.failed:
                raise ScrapliCommandFailure(f"Couldn't change to {context} context!")
        elif context in self._vdom_list or context == "root":
            responses = self.send_commands(["config vdom", f"edit {context}"])
            if responses[-1].failed:
                raise ScrapliCommandFailure(f"Couldn't change to {context} context!")
        else:
            raise ScrapliCommandFailure(f"Tried to change to {context}, but it doesn't exists!")

        return context

    def send_config(self, config: str, **kwargs: Any) -> None:
        """Not implemented on FortiOS

        Args:
            config (str): config text
            kwargs: other arguments

        Raises:
            NotImplementedError: this function is not implemented
        """
        raise NotImplementedError("send_config not implemented for FortiOS")

    def send_configs(self, configs: List[str], **kwargs: Any) -> None:
        """Not implemented on FortiOS

        Args:
            configs (List[str]): config text
            kwargs: other arguments

        Raises:
            NotImplementedError: this function is not implemented
        """
        raise NotImplementedError("send_config not implemented for FortiOS")

    # pylama:ignore=C901
    def send_commands(
        self,
        commands: List[str],
        *,
        batch_mode: bool = False,
        strip_prompt: bool = True,
        failed_when_contains: Optional[Union[str, List[str]]] = None,
        stop_on_failed: bool = False,
        eager: bool = False,
        timeout_ops: Optional[float] = None,
    ) -> MultiResponse:
        """Send multiple commands to device

        This method adds capability to use FortiOS batch mode which applies commands after all
        commands are sent. This is useful for configuration where we want to apply more things
        at once to avoid losing mgmt connectivity. E.g. when changing mgmt IP and default gw.

        If device is in multi-VDOM mode, please make sure you select full scope of your commands.

        Example of multi-VDOM batch commands::
            config vdom
                edit test1
                    config system interface
                        edit mgmt
                            set ip 1.1.1.1/30
                        next
                    end
                end
            end

        Example device output with batch result::

            Code: sent command

            0: config system global
            -61: unset set post-login-banner enable
            -61: unset post-login-banner enable
            0: unset post-login-banner
            0: end

            Code is error code. 0 means ok. The above example shows that mistyped commands gave
            error with code -61.

        Args:
            commands: list of strings to send to device in config mode
            batch_mode: True/False to indicate we want batch mode processing
            strip_prompt: True/False strip prompt from returned output
            failed_when_contains: string or list of strings indicating failure if found in response
            stop_on_failed: True/False stop executing commands if a command fails, returns results
                as of current execution
            eager: if eager is True we do not read until prompt is seen at each command sent to the
                channel. Do *not* use this unless you know what you are doing as it is possible that
                it can make scrapli less reliable!
            timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
                the duration of the operation, value is reset to initial value after operation is
                completed. Note that this is the timeout value PER CONFIG sent, not for the total
                of the configs being sent!

        Returns:
            MultiResponse: Scrapli MultiResponse object

        Raises:
            ScrapliCommandFailure: on errors entering/exiting batch mode
            ScrapliTimeout: on batch execution timeout
        """
        # sanity check, we need more than 1 valid lines to make batch_mode sane
        if batch_mode and len([config for config in commands if config.strip()]) < 2:
            batch_mode = False

        if batch_mode:
            # enable batch run
            if self._vdoms_enabled:
                self.context("global")
            response = self.send_command(
                "execute batch start", failed_when_contains="Unknown", timeout_ops=5
            )
            if response.failed:
                raise ScrapliCommandFailure(
                    "Couldn't enter batch mode, check context! (only single mode or "
                    "global VDOM supports batch mode)"
                )

        responses = super().send_commands(
            commands,
            strip_prompt=strip_prompt,
            failed_when_contains=failed_when_contains,
            stop_on_failed=stop_on_failed,
            eager=eager,
            timeout_ops=timeout_ops,
        )

        if batch_mode:
            # stop batch run
            response = self.send_command(
                "execute batch end", failed_when_contains="Unknown", timeout_ops=5
            )
            if response.failed:
                raise ScrapliCommandFailure("Couldn't stop batch mode")
            # check batch status
            check_no = 0
            wait = 1.0
            while check_no * wait < (timeout_ops or 30):
                response = self.send_command("execute batch status", failed_when_contains="Unknown")
                if "batch mode is stopped" in response.result:
                    break
                check_no += 1
                sleep(wait)
            else:
                raise ScrapliTimeout("Batch run timed out")
            # check batch results
            response = self.send_command(
                "execute batch lastlog", failed_when_contains="Unknown", timeout_ops=5
            )
            if response.failed:
                raise ScrapliCommandFailure("Couldn't list batch results")
            # set config line failed where batch returned non-zero error code
            for i, line in enumerate(response.result.splitlines()):
                code = line.split(sep=": ")[0]
                if code != "0":
                    responses[i].failed = True

        return responses

cleanup_session() -> None

Restore paging if necessary

Source code in fortinet/fortios/sync_driver.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def cleanup_session(self) -> None:
    """Restore paging if necessary"""
    if self._original_console != "standard":
        if self._vdoms_enabled:
            self.context("global")
        else:
            self._to_system()
        restore_console = textwrap.dedent(
            f"""\
            config system console
            set output {self._original_console}
            end"""
        )
        self.send_commands(restore_console.splitlines())

context(context: str) -> Union[None, str]

Change context / VDOM

This method will abort any config block / VDOM and change to the specified context. If device is not in multi-VDOM mode, do nothing. If specified VDOM is not pre-defined, query device for available VDOMs to check if it's possible.

Pre-defined contexts
  • system : root prompt, not in any config block or VDOM
  • global : in global context
  • root : default VDOM which cannot be renamed

Parameters:

Name Type Description Default
context str

VDOM name or pre-defined context name

required

Returns:

Type Description
Union[None, str]

None if context change is not possible or context name

Raises:

Type Description
ScrapliCommandFailure

on context changing errors

Source code in fortinet/fortios/sync_driver.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def context(self, context: str) -> Union[None, str]:
    """Change context / VDOM

    This method will abort any config block / VDOM and change to the specified context.
    If device is not in multi-VDOM mode, do nothing. If specified VDOM is not pre-defined,
    query device for available VDOMs to check if it's possible.

    Pre-defined contexts:
        * system : root prompt, not in any config block or VDOM
        * global : in global context
        * root : default VDOM which cannot be renamed

    Args:
        context (str): VDOM name or pre-defined context name

    Returns:
        None if context change is not possible or context name

    Raises:
        ScrapliCommandFailure: on context changing errors

    """
    self.logger.debug("Changing to context %s", context)
    if not self._vdoms_enabled:
        return None
    # if context is a non-predefined one, gather VDOM list from device
    if not self._vdom_list and context not in ["system", "global", "root"]:
        # gather list of VDOMs
        self.gather_vdoms()  # slow :(
    else:
        self._to_system()
    if context == "system":
        # we are already here
        pass
    elif context == "global":
        response = self.send_command("config global")
        if response.failed:
            raise ScrapliCommandFailure(f"Couldn't change to {context} context!")
    elif context in self._vdom_list or context == "root":
        responses = self.send_commands(["config vdom", f"edit {context}"])
        if responses[-1].failed:
            raise ScrapliCommandFailure(f"Couldn't change to {context} context!")
    else:
        raise ScrapliCommandFailure(f"Tried to change to {context}, but it doesn't exists!")

    return context

gather_vdoms() -> Union[None, List[str]]

Gather list of VDOMs

Returns:

Name Type Description
None Union[None, List[str]]

if device is not in multi VDOM mode

Union[None, List[str]]

List[str]: list of VDOM names configured

Source code in fortinet/fortios/sync_driver.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def gather_vdoms(self) -> Union[None, List[str]]:
    """Gather list of VDOMs

    Returns:
        None: if device is not in multi VDOM mode
        List[str]: list of VDOM names configured
    """

    if not self._vdoms_enabled:
        # device is not in multi VDOM mode
        return None
    self._to_system()
    output = self.send_command('show | grep "config vdom" -f -A1')
    # """
    # FIREWALL # show | grep "config vdom" -f -A1
    # config vdom
    # edit root
    # --
    # config vdom
    # edit root
    # --
    # config vdom
    # edit test1
    # """
    self._vdom_list = list(set(re.findall(r"^edit (\w+)$", output.result, re.M)))
    return self._vdom_list

prepare_session() -> None

Prepare session

Source code in fortinet/fortios/sync_driver.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def prepare_session(self) -> None:
    """Prepare session"""

    # check if user configured post-login banner which requires acceptance
    # config system global
    #     set post-login-banner enable
    # end
    initial = self.channel.read()
    if "(Press 'a' to accept):" in str(initial):
        self.channel.write("a")
    self.get_prompt()
    self._vdoms_enabled = self._vdoms_status()
    if self._vdoms_enabled:
        self.context("global")
    response = self.send_command("get system console | grep ^output")
    self._original_console = re.findall(r".*: (\w+)", response.result)[0]
    if self._original_console != "standard":
        disable_paging = textwrap.dedent(
            """\
            config system console
            set output standard
            end"""
        )
        if self._vdoms_enabled:  # we exit from global too
            disable_paging += "\nend"
        self.send_commands(disable_paging.splitlines())

send_commands(commands: List[str], *, batch_mode: bool = False, strip_prompt: bool = True, failed_when_contains: Optional[Union[str, List[str]]] = None, stop_on_failed: bool = False, eager: bool = False, timeout_ops: Optional[float] = None) -> MultiResponse

Send multiple commands to device

This method adds capability to use FortiOS batch mode which applies commands after all commands are sent. This is useful for configuration where we want to apply more things at once to avoid losing mgmt connectivity. E.g. when changing mgmt IP and default gw.

If device is in multi-VDOM mode, please make sure you select full scope of your commands.

Example of multi-VDOM batch commands:: config vdom edit test1 config system interface edit mgmt set ip 1.1.1.1/30 next end end end

Example device output with batch result::

Code: sent command

0: config system global
-61: unset set post-login-banner enable
-61: unset post-login-banner enable
0: unset post-login-banner
0: end

Code is error code. 0 means ok. The above example shows that mistyped commands gave
error with code -61.

Parameters:

Name Type Description Default
commands List[str]

list of strings to send to device in config mode

required
batch_mode bool

True/False to indicate we want batch mode processing

False
strip_prompt bool

True/False strip prompt from returned output

True
failed_when_contains Optional[Union[str, List[str]]]

string or list of strings indicating failure if found in response

None
stop_on_failed bool

True/False stop executing commands if a command fails, returns results as of current execution

False
eager bool

if eager is True we do not read until prompt is seen at each command sent to the channel. Do not use this unless you know what you are doing as it is possible that it can make scrapli less reliable!

False
timeout_ops Optional[float]

timeout ops value for this operation; only sets the timeout_ops value for the duration of the operation, value is reset to initial value after operation is completed. Note that this is the timeout value PER CONFIG sent, not for the total of the configs being sent!

None

Returns:

Name Type Description
MultiResponse MultiResponse

Scrapli MultiResponse object

Raises:

Type Description
ScrapliCommandFailure

on errors entering/exiting batch mode

ScrapliTimeout

on batch execution timeout

Source code in fortinet/fortios/sync_driver.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def send_commands(
    self,
    commands: List[str],
    *,
    batch_mode: bool = False,
    strip_prompt: bool = True,
    failed_when_contains: Optional[Union[str, List[str]]] = None,
    stop_on_failed: bool = False,
    eager: bool = False,
    timeout_ops: Optional[float] = None,
) -> MultiResponse:
    """Send multiple commands to device

    This method adds capability to use FortiOS batch mode which applies commands after all
    commands are sent. This is useful for configuration where we want to apply more things
    at once to avoid losing mgmt connectivity. E.g. when changing mgmt IP and default gw.

    If device is in multi-VDOM mode, please make sure you select full scope of your commands.

    Example of multi-VDOM batch commands::
        config vdom
            edit test1
                config system interface
                    edit mgmt
                        set ip 1.1.1.1/30
                    next
                end
            end
        end

    Example device output with batch result::

        Code: sent command

        0: config system global
        -61: unset set post-login-banner enable
        -61: unset post-login-banner enable
        0: unset post-login-banner
        0: end

        Code is error code. 0 means ok. The above example shows that mistyped commands gave
        error with code -61.

    Args:
        commands: list of strings to send to device in config mode
        batch_mode: True/False to indicate we want batch mode processing
        strip_prompt: True/False strip prompt from returned output
        failed_when_contains: string or list of strings indicating failure if found in response
        stop_on_failed: True/False stop executing commands if a command fails, returns results
            as of current execution
        eager: if eager is True we do not read until prompt is seen at each command sent to the
            channel. Do *not* use this unless you know what you are doing as it is possible that
            it can make scrapli less reliable!
        timeout_ops: timeout ops value for this operation; only sets the timeout_ops value for
            the duration of the operation, value is reset to initial value after operation is
            completed. Note that this is the timeout value PER CONFIG sent, not for the total
            of the configs being sent!

    Returns:
        MultiResponse: Scrapli MultiResponse object

    Raises:
        ScrapliCommandFailure: on errors entering/exiting batch mode
        ScrapliTimeout: on batch execution timeout
    """
    # sanity check, we need more than 1 valid lines to make batch_mode sane
    if batch_mode and len([config for config in commands if config.strip()]) < 2:
        batch_mode = False

    if batch_mode:
        # enable batch run
        if self._vdoms_enabled:
            self.context("global")
        response = self.send_command(
            "execute batch start", failed_when_contains="Unknown", timeout_ops=5
        )
        if response.failed:
            raise ScrapliCommandFailure(
                "Couldn't enter batch mode, check context! (only single mode or "
                "global VDOM supports batch mode)"
            )

    responses = super().send_commands(
        commands,
        strip_prompt=strip_prompt,
        failed_when_contains=failed_when_contains,
        stop_on_failed=stop_on_failed,
        eager=eager,
        timeout_ops=timeout_ops,
    )

    if batch_mode:
        # stop batch run
        response = self.send_command(
            "execute batch end", failed_when_contains="Unknown", timeout_ops=5
        )
        if response.failed:
            raise ScrapliCommandFailure("Couldn't stop batch mode")
        # check batch status
        check_no = 0
        wait = 1.0
        while check_no * wait < (timeout_ops or 30):
            response = self.send_command("execute batch status", failed_when_contains="Unknown")
            if "batch mode is stopped" in response.result:
                break
            check_no += 1
            sleep(wait)
        else:
            raise ScrapliTimeout("Batch run timed out")
        # check batch results
        response = self.send_command(
            "execute batch lastlog", failed_when_contains="Unknown", timeout_ops=5
        )
        if response.failed:
            raise ScrapliCommandFailure("Couldn't list batch results")
        # set config line failed where batch returned non-zero error code
        for i, line in enumerate(response.result.splitlines()):
            code = line.split(sep=": ")[0]
            if code != "0":
                responses[i].failed = True

    return responses

send_config(config: str, **kwargs: Any) -> None

Not implemented on FortiOS

Parameters:

Name Type Description Default
config str

config text

required
kwargs

other arguments

required

Raises:

Type Description
NotImplementedError

this function is not implemented

Source code in fortinet/fortios/sync_driver.py
164
165
166
167
168
169
170
171
172
173
174
def send_config(self, config: str, **kwargs: Any) -> None:
    """Not implemented on FortiOS

    Args:
        config (str): config text
        kwargs: other arguments

    Raises:
        NotImplementedError: this function is not implemented
    """
    raise NotImplementedError("send_config not implemented for FortiOS")

send_configs(configs: List[str], **kwargs: Any) -> None

Not implemented on FortiOS

Parameters:

Name Type Description Default
configs List[str]

config text

required
kwargs

other arguments

required

Raises:

Type Description
NotImplementedError

this function is not implemented

Source code in fortinet/fortios/sync_driver.py
176
177
178
179
180
181
182
183
184
185
186
def send_configs(self, configs: List[str], **kwargs: Any) -> None:
    """Not implemented on FortiOS

    Args:
        configs (List[str]): config text
        kwargs: other arguments

    Raises:
        NotImplementedError: this function is not implemented
    """
    raise NotImplementedError("send_config not implemented for FortiOS")

default_sync_on_close(conn: FortinetFortiOSDriver) -> None

Async fortinet_wlc default on_close callable

Parameters:

Name Type Description Default
conn FortinetFortiOSDriver

AsyncNetworkDriver object

required

Returns:

Type Description
None

N/A

Source code in fortinet/fortios/sync_driver.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def default_sync_on_close(conn: FortinetFortiOSDriver) -> None:
    """
    Async fortinet_wlc default on_close callable

    Args:
        conn: AsyncNetworkDriver object

    Returns:
        N/A

    Raises:
        N/A
    """
    conn.cleanup_session()
    conn.channel.write(channel_input="exit")
    conn.channel.send_return()

default_sync_on_open(conn: FortinetFortiOSDriver) -> None

Async fortinet_fortios default on_open callable

Parameters:

Name Type Description Default
conn FortinetFortiOSDriver

AsyncNetworkDriver object

required

Returns:

Type Description
None

N/A

Source code in fortinet/fortios/sync_driver.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def default_sync_on_open(conn: FortinetFortiOSDriver) -> None:
    """
    Async fortinet_fortios default on_open callable

    Args:
        conn: AsyncNetworkDriver object

    Returns:
        N/A

    Raises:
        N/A
    """
    conn.prepare_session()