COMCOMClient
COMCOMClient is the main class utilized to interface with a COMCOMClient instance. It provides several helper functions and API calls to aid in data input, agent configuration, and debugging.
Quickstart
- class ia.gaius.experimental.comcom_client.COMCOMClient(comcom_info: dict, verify=True)
Bases:
object
Interface for interacting with COMCOM. Creating/interacting with input_slots and connecting agents to COMCOM
- __init__(comcom_info: dict, verify=True)
- add_function_to_comcom(function_name: str, function_input_types: Dict[str, str], function_return_types: Dict[str, str], source: str, source_type: str = 'block') dict | Tuple[dict, str]
Function to add a new preprocessing function from a source code ‘block’ or ‘file’. This function will be run in a restricted manner, to protect COMCOM from nefarious functions.
- Parameters:
function_name (str) – _description_
function_input_types (Dict[str, str]) – A key-value pair stating the expected input type of parameters
function_return_types (Dict[str, str]) – A key-value pair stating the expected return type of variables
source (str) – Either a block of code in text format, or path to a source code file
source_type (str, optional) – Type of source. Either ‘file’, or ‘block’. Defaults to “block”.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- call_agent_command(agent_name: str, command: str, command_parameters: Dict) dict | Tuple[dict, str]
Function call to pass data through comcom and call command on agent
- Parameters:
agent_name (str) – The alias of the agent in COMCOM
command (str) – The command to call on the agent
command_parameters (Dict) – Any parameters/values needed for command(varies for each command)
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- clear_agents() dict | Tuple[dict, str]
Function call that removes all connected agents in COMCOM, and in its output_slots.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- clear_comcom() dict | Tuple[dict, str]
Function to clear COMCOM of everyting. Literally scorched earth mode. (VERY DANGEROUS)
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- clear_input_slots() dict | Tuple[dict, str]
Function call that tells COMCOM to clear all input slots from its system.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- clear_outputslot_command_queue(output_name: str) dict | Tuple[dict, str]
Function to clear the pending data in an output_slot with passed output_name.
- Parameters:
output_name (str) – Name of output_slot whose work queue should be cleared.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- comcom_to_cytoscape() dict | Tuple[dict, str]
Get cytoscape representation of all of comcom.
- Returns:
Cytoscape json data
- Return type:
Union[dict, Tuple[dict, str]]
- connect() Dict
Establishes initial connection to COMCOM, then allows calling of other function to
- connect_input_slot(input_name: str, input_type: str, pipeline_slots: List[str] = [], input_slot_data_mapping: Dict = {}, max_command_queue_size: int = 100, queue_block_on_full: bool = True, queue_block_timeout: float = 1, **kwargs) dict | Tuple[dict, str]
Function call which takes in data to create a new input slot in COMCOM.
- Parameters:
input_name (str) – The name that this input slot will be known by.
input_type (str) – The type of the input_slot.
pipeline_slots (List[str], optional) – The pipelines to be run when data is received. Defaults to [].
input_slot_data_mapping (Dict, optional) – Special mapping of data from one set of key:value pairs to another(useful for pipelines). Defaults to {}.
max_command_queue_size (int, optional) – Max number of commands that can be in command_queue before data is dropped. Defaults to 100.
queue_block_on_full (bool, optional) – Bool flag controlling if input_slot should wait for free slot in queue up to queue_block_timeout. Defaults to True.
queue_block_timeout (float, optional) – Timeout for waiting for free slot in command_queue.. Defaults to 1.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- connect_output_slot(output_name: str, output_type: str, output_slot_data_mapping: Dict = {}, max_command_queue_size: int = 100, queue_block_on_full: bool = True, queue_block_timeout: float = 1, **kwargs) dict | Tuple[dict, str]
Function call which takes in data and constructs a new output slot in COMCOM.
- Parameters:
output_name (str) – The name that this output slot will be known by.
output_type (str) – The type of the output_slot.
output_slot_data_mapping (Dict, optional) – Special mapping of data from one set of key:value pairs to another(useful to get translate data from pipelines into relevant mappings). Defaults to {}.
max_command_queue_size (int, optional) – Max number of commands that can be in command_queue before data is dropped. Defaults to 100.
queue_block_on_full (bool, optional) – Bool flag controlling if output_slot should wait for free slot in queue up to queue_block_timeout. Defaults to True.
queue_block_timeout (float, optional) – Timeout for waiting for free slot in command_queue. Defaults to 1.
- Returns:
_description_
- Return type:
Union[dict, Tuple[dict, str]]
- connect_to_agent(api_key: str, domain: str, agent_name: str, agent_type: str, **kwargs) dict | Tuple[dict, str]
- Function to attempt to connect to an existing/accessible agent on the network to COMCOM,
which can then be later used for other purposes
- Parameters:
api_key (str) – API key for the agent
domain (str) – Domain of the agent
agent_name (str) – The agent_name; which will also be used as an alias in COMCOM to refer to the agent
agent_type (str) – The type of agent
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- create_pipeline(pipeline_name: str, pipeline_function_parameters: Dict, pipeline_source_fields: Dict[str, Dict[str, str]], pipeline_destination_fields: Dict[str, Dict[str, str]], pipeline_connections: Dict[str, List[str]], pipeline_starting_functions: List[str], **kwargs) dict | Tuple[dict, str]
Function call to create a pipeline that can be used by an input_slot.
- Parameters:
pipeline_name (str) – Name that the pipeline will be known by in COMCOM.
pipeline_function_parameters (Dict) – Dictionary of static functions parameters for each function that is in pipeline.
pipeline_source_fields (Dict[str, Dict[str, str]]) – Dictionary of data fields that are expected to be in the output if it is a dict. Ignored if output is anything else.
pipeline_destination_fields (Dict[str, Dict[str, str]]) – Dictionary of data fields where function output data will be mapped to in the general data dictionary.
pipeline_connections (Dict[str, List[str]]) – Dictionary of connections between pipeline functions which maps flow of data during pipeline execution.
pipeline_starting_functions (List[str]) – List of functions that the pipeline will start with.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- delete_function_from_comcom(function_name: str = 'Craig') dict | Tuple[dict, str]
Deletes a function from COMCOM
- Parameters:
function_name (str, optional) – Name of function to delete. Defaults to “Craig”.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- delete_pipeline(pipeline_name: str) dict | Tuple[dict, str]
Function call to delete a pipeline in COMCOM.
- Parameters:
pipeline_name (str) – Name of pipeline to delete.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- desynchronize_input_slots(inputs_to_desync: List[str]) dict | Tuple[dict, str]
Function call to desynchronize two or more input_slots.
- Parameters:
inputs_to_desync (List[str]) – List of input_slots to remove from synchronization pools.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- disconnect_agent(agent_name: str) dict | Tuple[dict, str]
- Function call that disconnects from agent with specific agent_name, and removes it
from all input_slots
- Parameters:
agent_name (str) – The alias of the agent in COMCOM
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- disconnect_input_slot(input_name: str) dict | Tuple[dict, str]
Function that takes in unique input_name, and deletes it from COMCOM.
- Parameters:
input_name (str) – Name of input_slot to delete.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- disconnect_output_slot(output_name: str) dict | Tuple[dict, str]
Function that takes in unique input_name, and deletes it from COMCOM.
- Parameters:
output_name (str) – Name of output_slot to delete.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- get_agent_data(agent_name: str) dict | Tuple[dict, str]
Function call to get detailed information about connection with passed agent_name.
- Parameters:
agent_name (str) – Name of agent to get info about.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- get_config_as_json() dict | Tuple[dict, str]
Gets the json configuration to reload current comcom config at a later date
- Returns:
Json object with relevant information to rebuild comcom
- Return type:
json
- get_dds_message_types() dict | Tuple[dict, str]
Function call to get all available DDS message types
- get_debug_topic_data_single(pipeline_name: str, function_name: str, input_slot_name: str, display_result=True) Any
Function call to get a singulr debug message from function, in a pipeline, that was run by the input_slot
- Parameters:
pipeline_name (str) – Name of pipeline containing said function
function_name (str) – Name of function whose debug message you wish to get
input_slot_name (str) – Name of input slot which ran the pipeline
display_result (bool, optional) – whether to display result in a general way or just return the formatted result(if applicable). Defaults to True.
- Returns:
_description_
- Return type:
Any
- get_debug_topic_data_stream(pipeline_name: str, function_name: str, input_slot_name: str) dict | Tuple[dict, str]
Function call to get a stream of debug messages from function, in a pipeline, that was run by the input_slot
- Parameters:
pipeline_name (str) – Name of pipeline containing said function
function_name (str) – Name of function whose debug message you wish to get
input_slot_name (str) – Name of input slot which ran the pipeline
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- get_input_slot_data(input_name: str) dict | Tuple[dict, str]
Function call to get detailed information about input_slot with passed input_name.
- Parameters:
input_name (str) – Name of input_slot to get info about.
- Returns:
Name of agent to get info about.
- Return type:
Union[dict, Tuple[dict, str]]
- get_output_slot_data(output_name: str) dict | Tuple[dict, str]
Function call to get detailed information about output_slot with passed output_name.
- Parameters:
output_name (str) – Name of output_slot to get info about.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- get_pipeline_data(pipeline_name: str) dict | Tuple[dict, str]
Function call to get detailed pipeline information
- Parameters:
pipeline_name (str) – Name of pipeline to get details about.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- input_slot_to_cytoscape(input_slot_name: str) dict | Tuple[dict, str]
Gets the cytoscape representation of the input_slot with passed name input_slot_name.
- Parameters:
input_slot_name (str) – The name of the input_slot to get representation of.
- Returns:
Cytoscape json data
- Return type:
Union[dict, Tuple[dict, str]]
- list_agent_connections() dict | Tuple[dict, str]
Function call to list all unique agent connections in COMCOM.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- list_comcom() dict | Tuple[dict, str]
Function which lists all data about constructed objects in COMCOM.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- list_input_slots() dict | Tuple[dict, str]
Function call to list all input_slots in COMCOM.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- list_output_slots() dict | Tuple[dict, str]
Function call to list all output_slots in COMCOM.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- list_pipelines() dict | Tuple[dict, str]
Function call to list all pipelines in COMCOM.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- list_preprocessing_functions() dict | Tuple[dict, str]
Function call to list all unique pipelines in COMCOM.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- load_comcom_config(data: Dict) dict | Tuple[dict, str]
Function call that configures COMCOM according to the passed json. This includes construction of Agent connections, Input_Slots, Output_Slots, and Pipelines.
- Parameters:
data (Dict) – JSON data containing the fields [Agents, Input_Slots, Output_Slots, Pipelines]
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- modify_input_slot(input_name: str, modification_type: str, **kwargs) dict | Tuple[dict, str]
Complex function call which can modify the input_slot according to the modification type.
- Parameters:
input_name (str) – Name of input_slot to modify.
modification_type (str) – Name of modification to perform.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- modify_output_slot(output_name: str, modification_type: str, **kwargs) dict | Tuple[dict, str]
Complex function call which can modify the output_slot according to the modification_type.
- Parameters:
output_name (str) – Name of output_slot to modify.
modification_type (str) – Name of modification to perform.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- modify_pipeline(pipeline_name: str, modification_type: str, **kwargs) dict | Tuple[dict, str]
Function call to modify a pipeline
- Parameters:
pipeline_name (str) – Name of pipeline to modify
modification_type (str) – Name of modification to perform.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- pipeline_to_cytoscape(pipeline_name: str) dict | Tuple[dict, str]
Gets the cytoscape representation of the pipeline with passed name pipeline_name.
- Parameters:
pipeline_name (str) – The name of the pipeline to get representation of.
- Returns:
Cytoscape json data
- Return type:
Union[dict, Tuple[dict, str]]
- query_db(db_name: str, lookup_config: Dict, **kwargs) dict | Tuple[dict, str]
Function call to query COMCOM for information from its internal Database.
- Parameters:
db_name (str) – Name of database collection/table to query.
lookup_config (Dict) – Configuration data in format acceptable by MongoDB.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- query_input_slot(input_name: str) dict | Tuple[dict, str]
Function to have an input_slot retrieve a singular piece of data. Currently only a COMCOMDarkAgent can perform this action.
- Parameters:
input_name (str) – Name of input_slot to query.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- synchronize_input_slots(inputs_to_sync_to: List[str], timeout_duration: float) dict | Tuple[dict, str]
Function call to synchronize two or more input_slots.
- Parameters:
inputs_to_sync_to (List[str]) – List of input_slots to synchronization together.
timeout_duration (float) – The max amount of time that a source will wait before discregarding data.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- toggle_input_slot(input_name: str) dict | Tuple[dict, str]
Function which negates the active status of the input_slot with passed name.
- Parameters:
input_name (str) – Name of input_slot to toggle.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- toggle_output_slot(output_name: str) dict | Tuple[dict, str]
Function which negates the active status of the output_slot with passed name.
- Parameters:
output_name (str) – Name of output_slot to toggle.
- Returns:
response from COMCOM
- Return type:
Union[dict, Tuple[dict, str]]
- visualize_comcom() None
Function to visualize all of comcom using plotly
- visualize_input_slot(input_slot_name: str) None
Function to visualize a pipeline with passed pipeline_name using plotly.
- Parameters:
input_slot_name (str) – The name of the input_slot to visualize.
- visualize_pipeline(pipeline_name: str) None
Function to visualize a pipeline with passed pipeline_name using plotly.
- Parameters:
pipeline_name (str) – The name of the pipeline to visualize.