Custom Synapse framework also provides ability to ingest webhook requests. These are handled by a routing method Extract_webhook_identifier and processing method Process_webhook. First method routes the payload to the correct syncari instance. This is handled using a field called webhookIdentifier. The value of a webhookIdentifier should always be stored in the metaConfig object (link to reference), either from the UI in the configured fields list - to have end users self configure the unique identifier or by creating it from the test method if the webhook gets registered automatically.
The endpoint for webhooks consists of the base with the synapse_name at the end. Example: https://app.syncari.com/arcade/api/v1/webhooks/{synapse_name}. It’s important to have both the Synapse name and API name when importing the synapse set to the same string value).
Both methods receive a WebhookRequest object as an argument which includes the stringified body of the webhook, headers and parameters of the request.
When using webhooks it is import to always include at minimum the following on the read method, to ensure the pipeline runs smoothly:
def read(self, sync_request: SyncRequest) - ReadResponse: return ReadResponse(watermark=watermark,data=[],offsetType=OffsetType.NONE)
Extract_webhook_identifier Method
Extract Webhook Identifier method handles the routing of the webhook payloads to the correct syncari instance. It is expected to return a string value that should match one of the configured webhookIdentifier values in the metaConfig (link to reference).
This can be parsed from the payload itself, headers or parameters - depending on where the unique identifier is set.Please note that if a duplicated value is set across different instances of the same synapse, the webhook data will be sent to both instances.
If the system doesn’t provide unique identifiers on its own you can add it in the URL parameter manually when adding the webhook. For this to work you need to add the 'webhookIdentifier' field in the SynapseInfo object: AuthField(name='webhookIdentifier', label='Webhook Identifier', dataType=DataType.STRING,required=False), set the webhook endpoint in the end system as: “https://app.syncari.com/arcade/api/v1/webhooks/{synapse_name}?webhookIdentifier={your_unique_id} and use the boilerplate bellow to handle the extraction:
def extract_webhook_identifier(self, webhook_request: WebhookRequest) - str: if 'webhookIdentifier' not in webhook_request.params or not webhook_request.params['webhookIdentifier']: raise SyncariException(ErrorResponse(status_code=400, message='Parameter "webhookIdentifier" is missing.')) if type(webhook_request.params['webhookIdentifier']) == list: return webhook_request.params['webhookIdentifier'][0] return webhook_request.params['webhookIdentifier']
Below is an example implementation of the method in our Pipedrive synapse where the webhook identifier is extracted from the payload itself.
Example:
def extract_webhook_identifier(self, webhook_request: WebhookRequest) - str: self.print(self.extract_webhook_identifier.__name__, webhook_request) request_data = json.loads(webhook_request.body) webhook_identifier = '' if not request_data: webhook_identifier = '' else: webhook_identifier = request_data['meta']['webhook_id'] return webhook_identifier
Process_webhook Method
Process_webhook method is used to process the webhook payload data. Once the Extract Webhook Identifier method routes the payload, the Process Webhook method returns a list of Syncari Records, which will show up in the platform from the Source node on the next Sync Cycle. Similarly to Read and Get_By_ID methods it’s expected to process the incoming records (date conversions, handling of delete flags, etc.).
When implementing the method it’s important to understand if the payload can include multiple records, opposed to having a single record per request. #todo - can you have different record types in a single batch?
Below is an example implementation of the method in our Pipedrive synapse. It uses the same self.__process_single_row() helper method as it is used on Read and Get_By_ID methods to create syncari Records. From the incoming payload.
Example:
def process_webhook(self, webhook_request: WebhookRequest) - List[Record]: self.print(self.process_webhook.__name__, webhook_request) request_data = json.loads(webhook_request.body) resp = [] entity_name = request_data['meta']['object'] parts = request_data['event'].split('.') row = request_data['current'] if parts[0] == 'deleted': row = request_data['previous'] # For some reason the user events come as a list. if (isinstance(row, list) and entity_name == 'user'): row = row[0] entity_data = self.__process_single_row(entity_name, None, row) entity_data.name = entity_name if parts[0] == 'deleted': entity_data.deleted = True resp.append(entity_data) return resp