Custom Synapse SDK Documentation

Rok Kovač
Rok Kovač
  • Updated

Synapse structure

This article provides an overview on how to structure the custom synapse main.py file.

Prerequisite:

Video overview:

Before creating a Synapse it is first necessary to import the core Syncari SDK objects into our python projects. Here is an example:

from syncari.rest.client import *
from syncari.synapse.abstract_synapse import *
from syncari.models import *

Additionally we recommend to import Python’s native json module with:

import json

After we import the modules we need to define the execute method. This serves as the entry point to the custom synapse.

Example:

def execute(request):
    return YouSynapseName(request.data).execute()

Before adding the Synapse Class, it is possible to include the hardcoded version of the 3rd party system’s schema if we are not going to be fetching it live in the describe method. More on this here (link to describe method).

After the execute method we need to add the synapse class (example: class YouSynapseName(Synapse)) under which all the synapse methods will be added. Currently the SDK supports the following methods: synapse_info, test, describe, read, get_by_id, create, update, delete, extract_webhook_identifier and process_webhook.

While it is not necessary to utilize all methods it is still mandatory to have them present in the Synapse class. When a method is not used it should be provided using the following format:

def method_name(self):
    return super().method_name()

Methods

synapse_info

Synapse_info method is used to provide description and authentication settings for the custom Synapse. 

It doesn’t receive any arguments and it returns a SynapseInfo object. The object includes information about the custom synapse:

  • Metadata (Name, Icon) - > Required
  • Authentication options - > Required
  • Configurable fields in the UI - > Optional

SDK currently supports three different authentication methods: Token, Username/Password and API key. While the majority of use cases should only need one to configure it is possible to set multiple authentication methods in the Synapse_info object.

Additional fields are used for configuring the endpoint and the webhook identifier values, when a custom synapse supports webhooks.

Example:

def synapse_info(self):
return SynapseInfo(name='dummySynapse', # synapse name (REQUIRED)
category='crm', # synapse category
metadata=UIMetadata(displayName='Dummy Synapse'), # synapse metadata, can include : displayName, iconPath, backgroundColor (REQUIRED)
supportedAuthTypes=[AuthMetadata(authType=AuthType.API_KEY, label='API Key', fields=[AuthField(name='AccessToken', label='API KEY', dataType=DataType.STRING)])], # contains a list of AuthMetadata object (REQUIRED)
configuredFields=[AuthField(name='endpoint', label='Endpoint URL',dataType=DataType.STRING), # configuring the 3rd party system's endpoint (REQUIRED)
AuthField(name='webhookIdentifier', label='Webhook Identifier',dataType=DataType.STRING)]) # configuring webhook identifier (OPTIONAL)

 

Test

Test method is used for purposes of testing the connection and verifying permissions. It’s being used by the Syncari application once it is approved and a user installs it. During that process there is an authentication test step, where the test method is utilised.

The method doesn’t receive any arguments and it returns the Connection object. The object should contain the object name and an AuthConfig object where the details of the authentication are specified (endpoint, username, password, token, etc.). For more complex connections it is possible to utilize the metaConfig field, to pass additional information to the framework (for example when connecting to a database to specify which database / table the framework should connect to).

The connection object is going to persist in the framework and it can be used in other methods, for example if the authentication method requires to generate an authentication token it is possible to generate it inside the test method and then use it in other methods.

 

Example:

 def test(self, connection: Connection):
self.client.get("/users",headers=self.__auth_headers())
if not connection.metaConfig:
connection.metaConfig={}
return connection

Describe (required)

Describe method is used to configure the schema of the 3rd party system inside Syncari. With this method it is either possible to fetch the schema from the 3rd party system and transform it into the Syncari Schema class or to hardcode it into the main.py file and pass it along to Syncari.

Method takes in DescribeRequest object as an argument, which contains a list of entities that are. The system expects an array containing Schema objects for each entity as the output of the method.

When possible we recommend to use the latter in order to avoid any updates to the code base of the Custom Synapse, when the schema’s changes should be reflected on the Syncari side. 

 

Example cases for hardcoding the schema (please note that this variable should be declared before the Synapse class!):

entity_schemas = {
       "customer" : {
           "apiName" : "customer",
           "displayName" : "Customer",
           "pluralName" : "Customers",
           "description" : "Represents customer schema.",
           "attributes" : [
               {
                   "apiName":"id",
                   "displayName":"ID",
                   "dataType":"integer",
                   "isIdField":True 
               },
               {
                   "apiName":"created_at",
                   "displayName":"ID",
                   "dataType":"integer"
               },
               {
                   "apiName":"first_name",
                   "displayName":"First Name",
                   "dataType":"string"
               },
               {
                   "apiName":"last_name",
                   "displayName":"Last Name",
                   "dataType":"string"
               },
               {
                   "apiName":"email",
                   "displayName":"Email",
                   "dataType":"string"
               },
               {
                   "apiName":"company_id",
                   "displayName":"Company ID",
                   "dataType":"integer"
               },
               {
                   "apiName":"last_updated",
                   "displayName":"Company ID",
                   "dataType":"datetime"
               }
           ]
        }

 

Example Method: 

 def describe(self, desc_request: DescribeRequest):
entities = desc_request.entities
if entities is None or not entities:
entities = ALL_ENTITIES
return [Schema.parse_obj(entity_schemas[entity]) for entity in entities]



CRUD methods

All CRUD methods and the get_by_id method will take in the SyncRequest as an argument. 

The object includes the following:

  • entity - includes the schema that is returned from the describe method. Object is always present. 
  • data - includes a list of Syncari records. This is present for create, update, delete and get_by_id methods.
  • Watermark - object that is used for the read method for syncing purposes. It includes the following fields:
  • start: int -> Epoch time in milliseconds that marks the start of the interval from where the system checks for updated records.
  • end: int -> Epoch time in milliseconds  that marks the end of the interval from where the system checks for updated records.
  • offset: Optional[int] - > API page offset.
  • limit: Optional[int] - > Number of API pages.
  • cursor: Optional[str] - > API cursor.
  • isResync: bool = False - > Marks if the triggered sync was incremental or a full sync of all records.
  • isTest: bool = False - > Used when testing a pipeline for records changed in a given time frame.
  • initial: bool = False - > Marks if the triggered sync was for the first time, since the synapse was connected.

 

Read

Read method is used to receive data from the 3rd party system from destination nodes in Syncari pipelines. The method takes in a SyncRequest object and returns a ReadResponse object. SyncRequest will include watermark and schema objects.

Watermark object represents one of the core Syncari’s sync principles as it allows us to fetch records in a particular time range. In the case of the watermark object the start and end will represent the epoch timestamps of the interval where Syncari is trying to pull the updated record. Those intervals will be one minute - as the default frequency of the pipelines.

Read method is also used for first time syncs or re-syncs, which will pull data from epoch to current date. In those cases the offset field in the watermark object can be used to paginate the data. In these cases we recommend importing batches of 100 records (this is mostly important when syncing from databases directly, to cap the records retrieved inside the logic of the method for each pull). Additionally if the system doesn’t support a watermark field (like last updated date) it is possible to always fetch all records and paginate over them.

The Watermark object also supports a cursor field - a memory object. For systems like Zendesk or MS Dynamics, where each request will provide an arbitrary value you can call to fetch the next set of data in the next call.

The method should return a ReadResponse object containing a new watermark object, the array of data that was fetched and an offsetType value.

 

Example:

def read(self, sync_request: SyncRequest):
records = []
if sync_request.entity.apiName == 'user':
response = self.client.get(“/users”, headers=self.__auth_headers()).json())
for user in response:
records.append(Record(id=user['id'], name='user', values=user, lastModified=updated_at, createdAt=created_at)
return ReadResponse(watermark=watermark, data=results, offsetType=OffsetType.RECORD_COUNT)

 

Get_by_id

Get_by_id method accepts a list of Record objects values inside the SyncRequest object under data. The method should contain a logic that fetches each record received in the list in the third party system and returns a list of records that were successfully fetched.

Even when a read method is implemented it is still recommended to implement a get_by_id method as it allows for a better user experience when it comes to testing the pipelines. With this method it is possible to fetch a single or set of records and run the pipeline as those records would come into the pipeline through read or webhook methods.

 

Example:

def get_by_id(self, sync_request: SyncRequest):
       if sync_request.entity.apiName == 'user':
           user_ids = [r.id for r in sync_request.data]
           records = []
           for user_id in user_ids:
                   user = self.client.get("/user”+user_id, headers=self.__auth_headers()).json()
                   records.append(
                       Record(id=user['id'], name='user', values=user, lastModified=updated_at,
                              createdAt=created_at))
            return records

 

Create

Create method accepts a list of Record objects values inside the SyncRequest object under data. The method should contain a logic that creates each record received in the list in the third party system and returns a list of records that were successfully created.

 

Example:

def create(self, sync_request: SyncRequest):
       if sync_request.entity.apiName == 'user':
           user_ids = [r.id for r in sync_request.data]
           results = []
           for user in sync_request.data:
try:
self.client.post("/users", json={'attributes':user.values.get('attributes',{})}, headers=self.__auth_headers()).json()
results.append(Result(success=True, id=user.id, syncariId=user.syncariEntityId))
except SyncariException as e:
results.append(Result(success=False,id=user_id,syncariId=user.syncariEntityId,errors=[e.error_response.json()]))
return results

 

Update

Update method accepts a list of Record objects values inside the SyncRequest object under data. The method should contain a logic that updates each record received in the list in the third party system and returns a list of records that were successfully updated.

 

Example:

def update(self, sync_request: SyncRequest):
       if sync_request.entity.apiName == 'user':
           user_ids = [r.id for r in sync_request.data]
           results = []
           for user in sync_request.data:
try:
self.client.post("/users", json={"id":user.id,'attributes':user.values.get('attributes',{})}, headers=self.__auth_headers()).json()
results.append(Result(success=True, id=user.id, syncariId=user.syncariEntityId))
except SyncariException as e:
results.append(Result(success=False,id=user_id,syncariId=user.syncariEntityId,errors=[e.error_response.json()]))
return results

 

Delete

Delete method accepts a list of Record objects values inside the SyncRequest object under data. The method should contain a logic that deletes each record received in the list in the third party system and returns a list of records that were successfully deleted.

 

Example:

def get_by_id(self, sync_request: SyncRequest):
       if sync_request.entity.apiName == 'user':
           user_ids = [r.id for r in sync_request.data]
           records = []
           for user_id in user_ids:
                   user = self.client.delete("/user”+user_id, headers=self.__auth_headers()).json()
                   records.append(
                       Record(id=user['id'], name='user', values=user, lastModified=updated_at,
                              createdAt=created_at))
            return records

 

Webhook methods

With syncari SDK it is also possible to capture webhooks from third party systems. If possible we recommend implementing both read and webhook data, as this way the updates will be quicker and provide for a better user experience.

 

Extract_webhook_identifier

Extract_webhook_identifier method takes in an argument of a WebhookRequest and it should return a webhook_identifier string. This method is used so that Syncari only processes methods that should be processed. Any event request sent to the Syncari endpoint will get processed by this method and only ones that have matching webhook_identifier to the one that user imputed when configuring the synapse will get processed with the process_webhook method. To have this functionality available it is necessary to add a configuredFields object to the synapse_info method.

 

Example:

def extract_webhook_identifier(self, webhook_request: WebhookRequest) -> str:
      request_data = json.loads(webhook_request.body)
      webhook_identifier = ''
      if not request_data:
          webhook_identifier = ''
      else:
          webhook_identifier = request_data['webhook_id']
      return webhook_identifier

 

Process_webhook

Process_webhook method is used to process the webhook payload data. It Takes in a WebhookRequest object and it should return a list of Syncari records. Please note that only records that have matching webhook_identifiers to the one provided by the user will get processed by this method. 

 

Example:

def process_webhook(self, webhook_request: WebhookRequest) -> List[Record]:
    request_data = json.loads(webhook_request.body)
    return [Record(id=request_data['id'],
                   values=request_data,
                   name='user',
                   lastModified=request_data[‘last_modified’],
                   createdAt=request_data[‘created_at’]])
Share this

Was this article helpful?

4 out of 4 found this helpful