COMCOMClient

COMCOMClient is the main class utilized to interface with a COMCOMClient instance. It provides several helper functions and API calls to aid in data input, agent configuration, and debugging.

Quickstart

class ia.gaius.experimental.comcom_client.COMCOMClient(comcom_info: dict, verify=True)

Bases: object

Interface for interacting with COMCOM. Creating/interacting with input_slots and connecting agents to COMCOM

__init__(comcom_info: dict, verify=True)
add_function_to_comcom(function_name: str, function_input_types: Dict[str, str], function_return_types: Dict[str, str], source: str, source_type: str = 'block') dict | Tuple[dict, str]

Function to add a new preprocessing function from a source code ‘block’ or ‘file’. This function will be run in a restricted manner, to protect COMCOM from nefarious functions.

Parameters:
  • function_name (str) – _description_

  • function_input_types (Dict[str, str]) – A key-value pair stating the expected input type of parameters

  • function_return_types (Dict[str, str]) – A key-value pair stating the expected return type of variables

  • source (str) – Either a block of code in text format, or path to a source code file

  • source_type (str, optional) – Type of source. Either ‘file’, or ‘block’. Defaults to “block”.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

call_agent_command(agent_name: str, command: str, command_parameters: Dict) dict | Tuple[dict, str]

Function call to pass data through comcom and call command on agent

Parameters:
  • agent_name (str) – The alias of the agent in COMCOM

  • command (str) – The command to call on the agent

  • command_parameters (Dict) – Any parameters/values needed for command(varies for each command)

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

clear_agents() dict | Tuple[dict, str]

Function call that removes all connected agents in COMCOM, and in its output_slots.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

clear_comcom() dict | Tuple[dict, str]

Function to clear COMCOM of everyting. Literally scorched earth mode. (VERY DANGEROUS)

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

clear_input_slots() dict | Tuple[dict, str]

Function call that tells COMCOM to clear all input slots from its system.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

clear_outputslot_command_queue(output_name: str) dict | Tuple[dict, str]

Function to clear the pending data in an output_slot with passed output_name.

Parameters:

output_name (str) – Name of output_slot whose work queue should be cleared.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

comcom_to_cytoscape() dict | Tuple[dict, str]

Get cytoscape representation of all of comcom.

Returns:

Cytoscape json data

Return type:

Union[dict, Tuple[dict, str]]

connect() Dict

Establishes initial connection to COMCOM, then allows calling of other function to

connect_input_slot(input_name: str, input_type: str, pipeline_slots: List[str] = [], input_slot_data_mapping: Dict = {}, max_command_queue_size: int = 100, queue_block_on_full: bool = True, queue_block_timeout: float = 1, **kwargs) dict | Tuple[dict, str]

Function call which takes in data to create a new input slot in COMCOM.

Parameters:
  • input_name (str) – The name that this input slot will be known by.

  • input_type (str) – The type of the input_slot.

  • pipeline_slots (List[str], optional) – The pipelines to be run when data is received. Defaults to [].

  • input_slot_data_mapping (Dict, optional) – Special mapping of data from one set of key:value pairs to another(useful for pipelines). Defaults to {}.

  • max_command_queue_size (int, optional) – Max number of commands that can be in command_queue before data is dropped. Defaults to 100.

  • queue_block_on_full (bool, optional) – Bool flag controlling if input_slot should wait for free slot in queue up to queue_block_timeout. Defaults to True.

  • queue_block_timeout (float, optional) – Timeout for waiting for free slot in command_queue.. Defaults to 1.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

connect_output_slot(output_name: str, output_type: str, output_slot_data_mapping: Dict = {}, max_command_queue_size: int = 100, queue_block_on_full: bool = True, queue_block_timeout: float = 1, **kwargs) dict | Tuple[dict, str]

Function call which takes in data and constructs a new output slot in COMCOM.

Parameters:
  • output_name (str) – The name that this output slot will be known by.

  • output_type (str) – The type of the output_slot.

  • output_slot_data_mapping (Dict, optional) – Special mapping of data from one set of key:value pairs to another(useful to get translate data from pipelines into relevant mappings). Defaults to {}.

  • max_command_queue_size (int, optional) – Max number of commands that can be in command_queue before data is dropped. Defaults to 100.

  • queue_block_on_full (bool, optional) – Bool flag controlling if output_slot should wait for free slot in queue up to queue_block_timeout. Defaults to True.

  • queue_block_timeout (float, optional) – Timeout for waiting for free slot in command_queue. Defaults to 1.

Returns:

_description_

Return type:

Union[dict, Tuple[dict, str]]

connect_to_agent(api_key: str, domain: str, agent_name: str, agent_type: str, **kwargs) dict | Tuple[dict, str]
Function to attempt to connect to an existing/accessible agent on the network to COMCOM,

which can then be later used for other purposes

Parameters:
  • api_key (str) – API key for the agent

  • domain (str) – Domain of the agent

  • agent_name (str) – The agent_name; which will also be used as an alias in COMCOM to refer to the agent

  • agent_type (str) – The type of agent

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

create_pipeline(pipeline_name: str, pipeline_function_parameters: Dict, pipeline_source_fields: Dict[str, Dict[str, str]], pipeline_destination_fields: Dict[str, Dict[str, str]], pipeline_connections: Dict[str, List[str]], pipeline_starting_functions: List[str], **kwargs) dict | Tuple[dict, str]

Function call to create a pipeline that can be used by an input_slot.

Parameters:
  • pipeline_name (str) – Name that the pipeline will be known by in COMCOM.

  • pipeline_function_parameters (Dict) – Dictionary of static functions parameters for each function that is in pipeline.

  • pipeline_source_fields (Dict[str, Dict[str, str]]) – Dictionary of data fields that are expected to be in the output if it is a dict. Ignored if output is anything else.

  • pipeline_destination_fields (Dict[str, Dict[str, str]]) – Dictionary of data fields where function output data will be mapped to in the general data dictionary.

  • pipeline_connections (Dict[str, List[str]]) – Dictionary of connections between pipeline functions which maps flow of data during pipeline execution.

  • pipeline_starting_functions (List[str]) – List of functions that the pipeline will start with.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

delete_function_from_comcom(function_name: str = 'Craig') dict | Tuple[dict, str]

Deletes a function from COMCOM

Parameters:

function_name (str, optional) – Name of function to delete. Defaults to “Craig”.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

delete_pipeline(pipeline_name: str) dict | Tuple[dict, str]

Function call to delete a pipeline in COMCOM.

Parameters:

pipeline_name (str) – Name of pipeline to delete.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

desynchronize_input_slots(inputs_to_desync: List[str]) dict | Tuple[dict, str]

Function call to desynchronize two or more input_slots.

Parameters:

inputs_to_desync (List[str]) – List of input_slots to remove from synchronization pools.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

disconnect_agent(agent_name: str) dict | Tuple[dict, str]
Function call that disconnects from agent with specific agent_name, and removes it

from all input_slots

Parameters:

agent_name (str) – The alias of the agent in COMCOM

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

disconnect_input_slot(input_name: str) dict | Tuple[dict, str]

Function that takes in unique input_name, and deletes it from COMCOM.

Parameters:

input_name (str) – Name of input_slot to delete.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

disconnect_output_slot(output_name: str) dict | Tuple[dict, str]

Function that takes in unique input_name, and deletes it from COMCOM.

Parameters:

output_name (str) – Name of output_slot to delete.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

get_agent_data(agent_name: str) dict | Tuple[dict, str]

Function call to get detailed information about connection with passed agent_name.

Parameters:

agent_name (str) – Name of agent to get info about.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

get_config_as_json() dict | Tuple[dict, str]

Gets the json configuration to reload current comcom config at a later date

Returns:

Json object with relevant information to rebuild comcom

Return type:

json

get_dds_message_types() dict | Tuple[dict, str]

Function call to get all available DDS message types

get_debug_topic_data_single(pipeline_name: str, function_name: str, input_slot_name: str, display_result=True) Any

Function call to get a singulr debug message from function, in a pipeline, that was run by the input_slot

Parameters:
  • pipeline_name (str) – Name of pipeline containing said function

  • function_name (str) – Name of function whose debug message you wish to get

  • input_slot_name (str) – Name of input slot which ran the pipeline

  • display_result (bool, optional) – whether to display result in a general way or just return the formatted result(if applicable). Defaults to True.

Returns:

_description_

Return type:

Any

get_debug_topic_data_stream(pipeline_name: str, function_name: str, input_slot_name: str) dict | Tuple[dict, str]

Function call to get a stream of debug messages from function, in a pipeline, that was run by the input_slot

Parameters:
  • pipeline_name (str) – Name of pipeline containing said function

  • function_name (str) – Name of function whose debug message you wish to get

  • input_slot_name (str) – Name of input slot which ran the pipeline

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

get_input_slot_data(input_name: str) dict | Tuple[dict, str]

Function call to get detailed information about input_slot with passed input_name.

Parameters:

input_name (str) – Name of input_slot to get info about.

Returns:

Name of agent to get info about.

Return type:

Union[dict, Tuple[dict, str]]

get_output_slot_data(output_name: str) dict | Tuple[dict, str]

Function call to get detailed information about output_slot with passed output_name.

Parameters:

output_name (str) – Name of output_slot to get info about.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

get_pipeline_data(pipeline_name: str) dict | Tuple[dict, str]

Function call to get detailed pipeline information

Parameters:

pipeline_name (str) – Name of pipeline to get details about.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

input_slot_to_cytoscape(input_slot_name: str) dict | Tuple[dict, str]

Gets the cytoscape representation of the input_slot with passed name input_slot_name.

Parameters:

input_slot_name (str) – The name of the input_slot to get representation of.

Returns:

Cytoscape json data

Return type:

Union[dict, Tuple[dict, str]]

list_agent_connections() dict | Tuple[dict, str]

Function call to list all unique agent connections in COMCOM.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

list_comcom() dict | Tuple[dict, str]

Function which lists all data about constructed objects in COMCOM.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

list_input_slots() dict | Tuple[dict, str]

Function call to list all input_slots in COMCOM.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

list_output_slots() dict | Tuple[dict, str]

Function call to list all output_slots in COMCOM.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

list_pipelines() dict | Tuple[dict, str]

Function call to list all pipelines in COMCOM.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

list_preprocessing_functions() dict | Tuple[dict, str]

Function call to list all unique pipelines in COMCOM.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

load_comcom_config(data: Dict) dict | Tuple[dict, str]

Function call that configures COMCOM according to the passed json. This includes construction of Agent connections, Input_Slots, Output_Slots, and Pipelines.

Parameters:

data (Dict) – JSON data containing the fields [Agents, Input_Slots, Output_Slots, Pipelines]

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

modify_input_slot(input_name: str, modification_type: str, **kwargs) dict | Tuple[dict, str]

Complex function call which can modify the input_slot according to the modification type.

Parameters:
  • input_name (str) – Name of input_slot to modify.

  • modification_type (str) – Name of modification to perform.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

modify_output_slot(output_name: str, modification_type: str, **kwargs) dict | Tuple[dict, str]

Complex function call which can modify the output_slot according to the modification_type.

Parameters:
  • output_name (str) – Name of output_slot to modify.

  • modification_type (str) – Name of modification to perform.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

modify_pipeline(pipeline_name: str, modification_type: str, **kwargs) dict | Tuple[dict, str]

Function call to modify a pipeline

Parameters:
  • pipeline_name (str) – Name of pipeline to modify

  • modification_type (str) – Name of modification to perform.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

pipeline_to_cytoscape(pipeline_name: str) dict | Tuple[dict, str]

Gets the cytoscape representation of the pipeline with passed name pipeline_name.

Parameters:

pipeline_name (str) – The name of the pipeline to get representation of.

Returns:

Cytoscape json data

Return type:

Union[dict, Tuple[dict, str]]

query_db(db_name: str, lookup_config: Dict, **kwargs) dict | Tuple[dict, str]

Function call to query COMCOM for information from its internal Database.

Parameters:
  • db_name (str) – Name of database collection/table to query.

  • lookup_config (Dict) – Configuration data in format acceptable by MongoDB.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

query_input_slot(input_name: str) dict | Tuple[dict, str]

Function to have an input_slot retrieve a singular piece of data. Currently only a COMCOMDarkAgent can perform this action.

Parameters:

input_name (str) – Name of input_slot to query.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

synchronize_input_slots(inputs_to_sync_to: List[str], timeout_duration: float) dict | Tuple[dict, str]

Function call to synchronize two or more input_slots.

Parameters:
  • inputs_to_sync_to (List[str]) – List of input_slots to synchronization together.

  • timeout_duration (float) – The max amount of time that a source will wait before discregarding data.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

toggle_input_slot(input_name: str) dict | Tuple[dict, str]

Function which negates the active status of the input_slot with passed name.

Parameters:

input_name (str) – Name of input_slot to toggle.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

toggle_output_slot(output_name: str) dict | Tuple[dict, str]

Function which negates the active status of the output_slot with passed name.

Parameters:

output_name (str) – Name of output_slot to toggle.

Returns:

response from COMCOM

Return type:

Union[dict, Tuple[dict, str]]

visualize_comcom() None

Function to visualize all of comcom using plotly

visualize_input_slot(input_slot_name: str) None

Function to visualize a pipeline with passed pipeline_name using plotly.

Parameters:

input_slot_name (str) – The name of the input_slot to visualize.

visualize_pipeline(pipeline_name: str) None

Function to visualize a pipeline with passed pipeline_name using plotly.

Parameters:

pipeline_name (str) – The name of the pipeline to visualize.