Indeni Workflow Tutorial

Introduction

Using Indeni-Workflow, the IKE will have the power to:

  • Write workflow in an easy way

  • Debug workflow

  • Debug workflow block

  • Run workflow on a real device

  • Run workflow on mock data

  • View workflow in a graphic way

Prepare your environment 

1. install indeni-parser

indeni-workflow uses indeni-parser methods to parse raw data into objects (parse_data_as_list etc..)

use this link to install indeni-parser

2. Install indeni-workflow

Using the terminal, install the latest version of indeni-parser

1 pip3 install "indeni_workflow==0.0.0.*" --extra-index-url https://indeni.jfrog.io/indeni/api/pypi/indeni-pypi-develop/simple

In Ubuntu systems version 18.X (other Debian based systems could be affected) above command don’t work due the blackout in supporting TLS 1.0 and 1.1 in pip which happened 1-2 years ago. In this cases pip must be updated before to install indeni-parser

1 curl https://bootstrap.pypa.io/get-pip.py | sudo python3

in order to uninstall indeni-workflow use 

1 pip3 uninstall indeni-workflow

3. Install PyCharm

use this link

4. Define Project Interpreter

use this link

Write your workflow

Files Architecture

Every workflow directory should contain below files, under automation/workflows/<workflow_name>/<vendor_name>/

  • <workflow_name> should be the rulename

  • vendor_name

    • checkpoint

    • paloaltonetworks

Note: See cross_vendor_next_hop_router_inaccessible in https://bitbucket.org/indeni/indeni-knowledge/src/IKP-3685-ansible-to-slimsible-ate-migration/

File

Description

Details

Example

File

Description

Details

Example

<workflow_name>_workflow.yaml
Mandatory

Contains the workflow structure

The workflow contains the following fields

  • id: textual identification string
    convention: <workflow_name>_<vendor_name>

  • start_block: the id of the block that starts the workflow

  • blocks: list of all available blocks in the workflow

 

Each block is responsible to route to the next block.

The following fields are mandatory in each block:

  • type: define the block type

  • name: block friendly name (will be shown in the UI)

According to the type of the block, more fields are required.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 id: ntp_not_syncing_reason_checkpoint friendly_name: NTP not syncing start_block: get_device_tags blocks: get_device_tags: type: device_tags name: Get device tags register_to: my_device_tags go_to: collect_ntp_servers_and_stats collect_ntp_servers_and_stats: type: device_task name: Get NTP servers and stats runner: type: SSH command: ntpq -np parser: method: identity_parser args: [] register_to: ntp_result go_to: make_bool_is_ntp_up make_bool_is_ntp_up: type: logic name: Make bool str is NTP up args: [ntp_result] method: is_ntp_up register_to: bool_is_ntp_up go_to: get_single_issue_item

 

<workflow_name>_parser.py
Mandatory

Contains supporting parser methods for <workflow_name>_workflow.yaml

 

 

<workflow_name>_mock_data.py
Mandatory

Contains mappings of parser input to expected output

Eg. GET_CPU_UTILIZATION_DATA_1 = ('cpu: usage 25%', 25.0)

The name should be uppercase, beginning with the name of the parser it tests. Followed by TUPLE_n where n is the appropriate number (1 for first, etc.) See the file at the bottom of this page for more examples, and an example how how to write multi-line strings in Python

 

<workflow_name>_parser_test.py
Mandatory

Contains:

  • Parsing tests

  • Block unit tests

  • Workflow unit tests

This file should have one function named test_<parser_function_name> per parser function. Each test may have multiple asserts within it. An example is available towards the bottom of this page.

 

<workflow_name>_workflow_test.yaml

Contains information required to run WIT (workflow integration tool) for a particular workflow.

The file should define the workflow name, the tags of the device to simulate, and a list of “flows”. Each flow is an execution path through the workflow that terminates in a particular conclusion.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 workflow: "all_devices_ntp_not_syncing_workflow.yaml" tags: vendor: checkpoint os.name: gaia os.version: R80.30 flows: flow1: arrange: - ["SSH/ntpq_np.yaml", "output1"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output1"] assert: conclusion: "conclusion_ntp_service_crashed" # NTP not Synced + NTP is actively configured in clish) flow2: arrange: - ["SSH/ntpq_np.yaml", "output1"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output2"] assert: conclusion: "conclusion_ntp_service_is_active_off" # NTP not Synced + NTP set to off in clish) flow3: arrange: - ["SSH/ntpq_np.yaml", "output2"] assert: conclusion: "conclusion_ntp_server_in_sync" issue_items: - "216.239.35.4" # NTP is Synced, based on the ntpq output. flow4: arrange: - ["SSH/ntpq_np.yaml", "output3"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output3"] assert: conclusion: "ntp_not_configured" # Validation that NTP has actual configuration flow5: arrange: - ["SSH/ntpq_np.yaml", "output3"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output1"] - ["SSH/nslookup.yaml", "output3"] assert: conclusion: "ntp_server_name_not_resolved" issue_items: - "216.239.35.4" # DNS could not be resolved. flow6: arrange: - ["SSH/ntpq_np.yaml", "output3"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output1"] - ["SSH/nslookup.yaml", "output1"] - ["SSH/ip_r_get.yaml", "output1"] - ["SSH/ifconfig.yaml", "output1"] assert: conclusion: "ntp_interface_is_admin_down" issue_items: - "216.239.35.4" # Checking the logical interface state which NTP traffic should be left from. flow7: arrange: - ["SSH/ntpq_np.yaml", "output3"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output1"] - ["SSH/nslookup.yaml", "output1"] - ["SSH/ip_r_get.yaml", "output1"] - ["SSH/ifconfig.yaml", "output2"] - ["SSH/ethtool.yaml", "output1"] assert: conclusion: "ntp_interface_is_link_down" issue_items: - "216.239.35.4" # Checking the actual physical link state. flow8: arrange: - ["SSH/ntpq_np.yaml", "output3"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output1"] - ["SSH/nslookup.yaml", "output1"] - ["SSH/ip_r_get.yaml", "output1"] - ["SSH/ifconfig.yaml", "output2"] - ["SSH/ethtool.yaml", "output2"] - ["SSH/ping_c.yaml", "output1"] assert: conclusion: "routing_problem" issue_items: - "216.239.35.4" # Check for routing problem, based on the packet loss percentages with ping. flow9: arrange: - ["SSH/ntpq_np.yaml", "output3"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output1"] - ["SSH/nslookup.yaml", "output1"] - ["SSH/ip_r_get.yaml", "output1"] - ["SSH/ifconfig.yaml", "output2"] - ["SSH/ethtool.yaml", "output2"] - ["SSH/ping_c.yaml", "output2"] - ["SSH/ntpdate_d.yaml", "output1"] assert: conclusion: "ntp_server_not_responding" issue_items: - "216.239.35.4" # Verifying that NTP server respond / working. flow10: arrange: - ["SSH/ntpq_np.yaml", "output3"] - ["SSH/clish_c_show_configuration_ntp.yaml", "output1"] - ["SSH/nslookup.yaml", "output1"] - ["SSH/ip_r_get.yaml", "output1"] - ["SSH/ifconfig.yaml", "output2"] - ["SSH/ethtool.yaml", "output2"] - ["SSH/ping_c.yaml", "output2"] - ["SSH/ntpdate_d.yaml", "output2"] assert: conclusion: "detailed_analysis_needed" issue_items: - "216.239.35.4"

*.textfsm
Optional

Template files that workflow_name.py require

 

 

__init__.py
Optional

Pointing to a textfsm file from py file will be in a relative way

 

 

 

workflow_catalog.yaml

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 cross_vendor_mac_table_limit: vendors: paloaltonetworks: high_mac_cache_paloaltonetworks panw_logs_discarded: vendors: paloaltonetworks: discarded_logs_paloaltonetworks cross_vendor_uptime_low: vendors: checkpoint: uptime_low_checkpoint paloaltonetworks: uptime_low_paloaltonetworks CrossVendorRaidStatusRule: vendors: paloaltonetworks: raid_status_paloaltonetworks cross_vendor_bgp_peer_down: vendors: paloaltonetworks: bgp_peer_down_paloaltonetworks cross_vendor_log_servers_not_communicating: vendors: checkpoint: log_servers_not_communicating_checkpoint cross_vendor_next_hop_router_inaccessible: vendors: checkpoint: next_hop_router_inaccessible_checkpoint all_devices_ntp_not_syncing: vendors: checkpoint: ntp_not_syncing_checkpoint paloaltonetworks: ntp_not_syncing_paloaltonetworks cross_vendor_connection_from_mgmt_to_device: vendors: paloaltonetworks: mgmt_device_not_connected_paloaltonetworks cross_vendor_config_change_on_standby: vendors: paloaltonetworks: standby_config_change_paloaltonetworks arp_neighbor_overflow: vendors: paloaltonetworks: high_arp_cache_usage_paloaltonetworks high_per_core_cpu_use_by_device: vendors: checkpoint: high_per_core_cpu_use_checkpoint paloaltonetworks: high_per_core_cpu_use_paloaltonetworks

 

Block types

Type

Description

Required fields

Example

Type

Description

Required fields

Example

1

device_tags

get the device_tags data as dict[str,str]

  • register_to: name of the result parameter

  • go_to: name of next block_id to go

1 2 3 4 5 get_device_tags: type: device_tags name: get device tags register_to: my_device_tags go_to: collect_cpu_usage_statistics

 

2

device_task

run ssh/https command on the device and parse it

  • runner:

    • type: remote type (SSH/HTTPS)

    • command: remote command (dynamic)

  • parser:

    • args: more args to pass to the parsing name

    • method: the parsing method name (located in workflow_name.py

  • register_to: name of the result parameter

  • go_to: name of next block_id to go

1 2 3 4 5 6 7 8 9 10 11 collect_all_cores_for_device: type: device_task name: "Find all cores for device" runner: type: SSH command: cat /proc/cpuinfo | grep processor | wc -l parser: args: [] method: parse_num_processors register_to: num_processors go_to: find_processes_with_highest_cpu_usage

 

3

if

go to next block based on condition

  • condition: the boolean condition to check. the condition can use any python statement with the collected args. unlike dynamic strings, the condition arg does not need to be wrapped in {{}}.

  • then_go_to: name of next block_id to go in case the condition is True

  • else_go_to: name of next block_id to go in case the condition is False

1 2 3 4 5 6 check_if_high_average_cpu_usage: type: if name: check if high average cpu usage condition: average_cpu_usage > 1 then_go_to: find_all_physical_interfaces else_go_to: Retrieve_securexl_status

 

4

logic

run a python logic on args and return result

  • args: more args to pass to the parsing name

  • method: the parsing method name (located in workflow_name.py

  • register_to: name of the result parameter

  • go_to: name of next block_id to go

1 2 3 4 5 6 7 get_latest_crashed_process: type: logic name: get latest crashed process method: get_latest_crashed_process args: [core_dump_entries] register_to: latest_crashed_process go_to: report_core_dump_detected

 

5

generate_panos_key

generate a panos api-key

  • register_to: name of the result parameter

  • go_to: name of next block_id to go

1 2 3 4 5 generate_panos_key: type: panos_key_generator name: generate panos key register_to: api_key go_to: get_admins

 

6

conclusion

the conclusion. after the conclusion block, the workflow should end

  • triage_conclusion: the conclusion text (dynamic)

  • triage_remediation_steps: the remediation steps (dynamic)

1 2 3 4 5 6 7 report_restart_logs: type: conclusion name: System reboot detected triage_conclusion: | This device was rebooted due a operation identified on logs: {{restart_entries}} triage_remediation_steps: sometimes reboots are needed regarding support or maintenance operations, please check if this is the case.

 

7

issue_items

Get the issue items as a list of strings

  • register_to: name of the result parameter

  • go_to: name of next block_id to go

1 2 3 4 5 get_issue_items: type: issue_items name: get issue items register_to: my_issue_items go_to: run_foreach_issue_item

 

8

foreach

Go over each issue item

  • issue_items_arg: argument to pass to each loop step

  • register_item_to: name of the result parameter

  • start_block: name of next block_id to go

  • blocks : list of the internal blocks

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 start_loop: type: foreach name: start loop register_item_to: my_item start_block: check_larger_than_10 blocks: check_larger_than_10: type: if name: check_larger_than_10 condition: my_item > 10 then_go_to: report_more_than_10 else_go_to: report_less_than_10 report_more_than_10: type: conclusion name: report_more_than_10 triage_conclusion: '{{ my_item }} report_more_than_10' triage_remediation_steps: report_more_than_10 report_less_than_10: type: conclusion name: report_less_than_10 triage_conclusion: '{{ my_item }} report_less_than_10' triage_remediation_steps: report_less_than_10
9

ping (not yet merged)

Ping a target server, returns bool (True/False) for success/failure.

Note: ping is from the Indeni server to the target server.

  • server: IP or address of server to ping

  • register_to: name of the result parameter

  • go_to: name of next block_id to go to

 

1 2 3 4 5 6 my_ping: type: ping name: my ping server: 8.8.8.8 register_to: my_result go_to: my_next_block
10

port_check (not yet merged)

Probe a server’s port and get the status. Returns one of 'open', 'closed', 'filtered', 'unfiltered', 'open|filtered', 'closed|filtered'

Note: probe is from the Indeni server to the target server.

  • server: IP or address of server to probe

  • port: Port of server to check

  • register_to: name of the result parameter

  • go_to: name of next block_id to go to

1 2 3 4 5 6 7 my_port_check: type: port_check name: my port check server: 8.8.8.8 port: 53 register_to: my_port_result go_to: my_next_block

 

11

loop

Perform actions in a loop for each item in an iterable

  • start_block: The initial block that starts the iteration

  • go_to: Name of next block_id to go to after all iterations are over

  • blocks: The list of blocks that that performs actions within the loop

  • iterable: An iterable variable, which the loop will iterate over

  • register_iterable_to: Name of the iterable-item

  • register_loop_results_to: The map(dictionary) containing the results.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 start_loop: type: loop name: Loop through my_list items iterable: my_list register_iterable_to: my_iterable register_loop_results_to: loop_result_map start_block: loop1 go_to: some_other_block blocks: loop1: type: logic name: multiply by 2 method: multiply args: [my_iterable] register_to: multiplied go_to: end_loop_block end_loop_block: type: end_loop name: this is the end of this loop iteration_result: iteration_res + 2
12

end_loop

Marks the end of a loop block iteration and registers the final result

  • iteration_result: A python code that will be evaluated and saved as the iteration's result

1 2 3 4 end_loop_block: type: end_loop name: this is the end of this loop iteration_result: iteration_res + 2

Args And Dynamic Text

Currently, args must have same name into workflow as is defined as argument of the method defined (parser). Example:

Block definition:

1 2 3 4 5 6 7 8 9 10 11 get_cpu_utilization: type: device_task name: Get CPU utiliztion runner: type: SSH command: mpstat -P ALL 1 1 | grep Average | sed 's/Average:\s*//' parser: method: get_cpu_utilization args: [cpu_id] register_to: float_cpu_utilization go_to: check_if_cpu_utilization_under_70_percent

Parser definition:

1 def get_cpu_utilization(raw_data: str,cpu_id: str) -> float:

 

There are string fields in the yaml, where the args can be passed in {{}}, and the workflow will know to replace the expression with the value.

The current dynamic fields are:

  • device_task.runner.command

  • conclusion.triage_conclusion

  • conclusion.triage_remediation_steps

examples:

  • The device has {{len(num_processors)}} cores

  • This device was rebooted due a operation identified on logs: {{restart_entries}}

Testing your workflow

WorkflowTestTool have 2 main methods to run workflows:

  • run_workflow - run a full workflow:
    Arguments:

    • workflow_path- the workflow path

    • device_data - the device IP and credentials

    • issue_items - a set of the issue item names. (Optional)

  • run_workflow_block

    • run a specific workflow block:

    Arguments:

    • workflow_path - the workflow path

    • block_id - the block to run

    • device_data - the device IP and credentials

    • scope - the current arg scope

    • run_single_block - should the workflow stop after finishing running the block

    • issue_item - a set of the issue item names. (Optional)

 

Using WorkflowTestTool you can run several tests:

 

Real device

Mock data

 

Real device

Mock data

Workflow testing

  1. Create DeviceCredentials (Can be SshCredentials/HttpsCredentials)

  2. Create DeviceData and pass IP, device_tags and the created DeviceCredentials

  3. Run the relevant workflow using WorkflowTestTool.run_workflow

  4. The method returned a list of block records that you can print.

for example:

1 2 3 4 ATE = './get_uptime_low_reason.yaml' my_device_data = DeviceData('10.11.94.200', {'vendor': 'checkpoint', 'os.name': 'gaia'}, [SshCredentials('indeni', 'indeni123')]) WorkflowTestTool.run_workflow(ATE, my_device_data)

 

  1. Create MockCredentials while passing:

    1. command_to_response: a dict[str,str] of command to respond

  2. Create DeviceData and pass IP, device_tags and the created DeviceCredentials

  3. Run the relevant workflow using WorkflowTestTool.run_workflow

  4. The method returned a list of block records that you can print.

  5. 1 2 3 4 5 6 mock_data = MockCredentials([ MockInput('uptime', '11:24:02 up 1 day, 20:43, 25 users, load average: 0.21, 0.16, 0.20'), MockInput('cat /var/log/messages* | grep restart | cat', 'Dec 31 14:40:41 2019 ENG-CP-R80 syslogd 1.4.1: restart.\nDec 31 14:40:44 2019 ENG-CP-R80 syslogd 1.4.1: restart.')]) my_device_data = DeviceData('10.11.94.200', {'vendor': 'checkpoint', 'os.name': 'gaia'}, [mock_data]) WorkflowTestTool.run_workflow(ATE, my_device_data)

     

Block testing

Create DeviceCredentials (Can be SshCredentials/HttpsCredentials)

  1. Create DeviceData and pass IP, device_tags and the created DeviceCredentials

  2. Create a scope with the current args as dict[str,str]

  3. Run the relevant workflow using WorkflowTestTool.run_workflow_block.

  4. The last argument to pass to the method is run_single_block. should the workflow run only the single block or continue to the next one?

  5. The method returned a list of block records that you can print.

For example:

1 2 3 4 5 6 ATE = './get_uptime_low_reason.yaml' my_device_data = DeviceData('10.11.94.200', {'vendor': 'checkpoint', 'os.name': 'gaia'}, [SshCredentials('indeni', 'indeni123')]) block_id = 'collect_restart_logs' scope = {'uptime_result' : '11:22'} WorkflowTestTool.run_workflow_block(ATE, block_id, my_device_data, scope, True)

 

For example:

1 2 3 4 5 6 7 8 ATE = './get_uptime_low_reason.yaml' mock_data = MockCredentials([ MockInput('uptime', '11:24:02 up 1 day, 20:43, 25 users, load average: 0.21, 0.16, 0.20'), MockInput('cat /var/log/messages* | grep restart | cat','Dec 31 14:40:41 2019 ENG-CP-R80 syslogd 1.4.1: restart.\nDec 31 14:40:44 2019 ENG-CP-R80 syslogd 1.4.1: restart.')]) my_device_data = DeviceData('10.11.94.200', {'vendor': 'checkpoint', 'os.name': 'gaia'}, [mock_data]) block_id = 'collect_restart_logs' scope = {'uptime_result': '11:22'} WorkflowTestTool.run_workflow_block(ATE, block_id, my_device_data, scope, True)

Enabled debug logs

By default, the WorkflowTestTool shows only Info logs. in case debug logs are wanted, run

1 WorkflowTestTool.toggle_debug_logs(True)

Draw workflow

Using the dot language, we can draw a graphical view of the workflow. This should be used as part of the PR, to show that the workflow matches the requirements:

Call WorkflowTestTool.draw_workflow (requires internet connection). Example:

1 2 3 4 from indeni_workflow.workflow_test_tool import WorkflowTestTool ATE = './get_uptime_low_reason.yaml' WorkflowTestTool.draw_workflow(ATE)

 

Guidelines for PR submission

  1. Required files:

    1. <workflow_name>_workflow.yaml

    2. <workflow_name>_parser.py

    3. <workflow_name>_mock_data.py

    4. <workflow_name>_parser_test.py

    5. <workflow_name>_workflow_test.py

  2. ATE folder name should be the same as the rule name

  3. Add a link to the server Knowledge Explorer workflow

  4. Capitals in first letter of name

  5. Capitals used with acronyms (e.g DNS and not dns)

  6. Fields that are being used by the UI must give meaningful info and should be super descriptive (short as possible, but descriptive). Some comments

    1. If the conclusion is the the problem is now resolved, leave the triage_remediation_stepsempty in the following manner:

triage_remediation_steps: ””

If the field is empty without a empty string defined in previous example the WorkflowTestTool crash when run

6. Do not state “N/A” in triage_remediation_steps or triage_conclusion

UI fields are:

  1. triage_remediation_steps

  2. triage_conclusion

  3. name

  4. Tests

 

An example of a <workflow_name>_parser_test.py

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import unittest import automation.workflows.high_per_core_cpu_use.check_point.high_per_core_cpu_use_mock_data as mock from automation.workflows.high_per_core_cpu_use.check_point.high_per_core_cpu_use_parser import * class HighPerCoreCPUParserTests(unittest.TestCase): def test_get_cpu_utilization(self): pair1 = mock.GET_CPU_UTILIZATION_PAIR_1 self.assertAlmostEqual(pair1[1], get_cpu_utilization(pair1[0]), 3) def test_bool_is_utilization_across_all_cpus(self): pair1 = mock.IS_UTILIZATION_ACROSS_ALL_CPUS_PAIR_1 pair2 = mock.IS_UTILIZATION_ACROSS_ALL_CPUS_PAIR_2 self.assertEqual(pair1[1], bool_is_utilization_across_all_cpus(pair1[0])) self.assertEqual(pair2[1], bool_is_utilization_across_all_cpus(pair2[0])) def test_get_securexl_and_templates_enabled(self): pair1 = mock.GET_SECUREXL_AND_TEMPLATES_ENABLED_PAIR_1 pair2 = mock.GET_SECUREXL_AND_TEMPLATES_ENABLED_PAIR_2 self.assertEqual(pair1[1], get_securexl_and_templates_enabled(pair1[0])) self.assertEqual(pair2[1], get_securexl_and_templates_enabled(pair2[0])) def test_is_redistribution_automatic(self): pair1 = mock.IS_REDISTRIBUTION_AUTOMATIC_PAIR_1 pair2 = mock.IS_REDISTRIBUTION_AUTOMATIC_PAIR_2 self.assertEqual(pair1[1], is_redistribution_automatic(pair1[0])) self.assertEqual(pair2[1], is_redistribution_automatic(pair2[0])) def test_is_corexl_running(self): pair1 = mock.IS_COREXL_RUNNING_PAIR_1 pair2 = mock.IS_COREXL_RUNNING_PAIR_2 pair3 = mock.IS_COREXL_RUNNING_PAIR_3 self.assertEqual(pair1[1], is_corexl_running(pair1[0])) self.assertEqual(pair2[1], is_corexl_running(pair2[0])) self.assertEqual(pair3[1], is_corexl_running(pair3[0])) def test_is_corexl_license_match_num_cores(self): pair1 = mock.IS_COREXL_LICENSE_MATCH_NUM_CORES_PAIR_1 pair2 = mock.IS_COREXL_LICENSE_MATCH_NUM_CORES_PAIR_2 self.assertEqual(pair1[1], is_corexl_license_match_num_cores(pair1[0])) self.assertEqual(pair2[1], is_corexl_license_match_num_cores(pair2[0])) def test_is_some_process_consuming_high_cpu(self): pair1 = mock.IS_SOME_PROCESS_CONSUMING_HIGH_CPU_PAIR_1 pair2 = mock.IS_SOME_PROCESS_CONSUMING_HIGH_CPU_PAIR_2 self.assertEqual(pair1[1], is_some_process_consuming_high_cpu(pair1[0])) self.assertEqual(pair2[1], is_some_process_consuming_high_cpu(pair2[0])) if __name__ == '__main__': unittest.main()

An partial example of <workflow_name>_mock_data.py

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 GET_CPU_UTILIZATION_DATA_1 = ("""CPU %user %nice %sys %iowait %irq %soft %steal %idle intr/s all 0.50 0.76 0.76 0.00 0.25 0.50 0.00 97.23 1990.00 0 2.00 3.00 3.00 0.00 1.00 2.00 0.00 89.00 1989.00 1 0.00 0.00 0.00 0.00 0.00 0.00 0.00 99.00 0.00 2 0.00 0.00 0.00 0.00 0.00 0.00 0.00 99.00 0.00 3 0.00 0.00 0.00 0.00 0.00 0.00 0.00 99.00 0.00 """, 2.77) IS_UTILIZATION_ACROSS_ALL_CPUS_DATA_1 = (""" Processors load --------------------------------------------------------------------------------- |CPU#|User Time(%)|System Time(%)|Idle Time(%)|Usage(%)|Run queue|Interrupts/sec| --------------------------------------------------------------------------------- | 1| 5| 4| 92| 8| ?| 2674| | 2| 0| 0| 100| 0| ?| 2674| | 3| 0| 0| 100| 0| ?| 2674| | 4| 0| 0| 100| 0| ?| 2674| --------------------------------------------------------------------------------- """, False) IS_UTILIZATION_ACROSS_ALL_CPUS_DATA_2 = (""" Processors load --------------------------------------------------------------------------------- |CPU#|User Time(%)|System Time(%)|Idle Time(%)|Usage(%)|Run queue|Interrupts/sec| --------------------------------------------------------------------------------- | 1| 5| 4| 0| 8| ?| 2674| | 2| 0| 0| 0| 0| ?| 2674| | 3| 0| 0| 0| 0| ?| 2674| | 4| 0| 0| 0| 0| ?| 2674| --------------------------------------------------------------------------------- """, True)