Create, Update, Get By ID and Delete methods require similar setup and share functionality. Instead of the watermark object, all methods receive a list of syncari records under the data key in sync_request object, on which the platform expects the respective method to perform CRUD operation on.
Get By Id Method
Get By Id is expected to receive the current fields of each record that is passed in the data list. List of Syncari Records will hold the Id, which is the Record Id in the end system. The expected output of the method is a list of the same records with values updated. Whenever the API supports it, we recommend the option to fetch bulk records to minimize the amount of API requests a Synapse needs to make.
Get By ID method is used by the platform in following scenarios:
- Testing - allows to test the pipeline by providing an external record’s ID
- Retry Record Action - allows the utilization of the wait step node in pipelines. This queues the record and re-tries to fetch the record until the time setting expires.
If the method is not supported by the API it should be omitted from the synapse or a non supported exception should be raised - this way the platform will gracefully handle the exception.
Example Get by ID method:
def get_by_id(self, sync_request: SyncRequest) - List[Recod]: # set variables entity_name = sync_request.entity.apiName data = sync_request.data records = [] # loop over provided records for record in data: # get the record values from the api resp = self.client.get(f'{entity_name}/{record.id}') record = resp.json() # set the system dates and convert them to epoch millisecond if applicable lastModified = record.get('lastModified') createdAt = record.get('createdAt') # append the record to records list records.append(Record(id=record.get('id'), values=record, name=entity_name, lastModified=lastModified, createdAt=createdAt)) # return list of syncari records return records
Create Method
Create method is used on the destination side to create records in a third party system. The Create will be triggered anytime a Syncari Record passes the destination node and the ID mapping was not yet created / record was not yet unified. The method is expected to return a list of Result records, which create the Id mapping - tying the Syncari record to its counterpart version in the end system. On the Result record we can also specify if the API request was successful or not, so the platform can understand if it needs to retry (in cases of rate limits / 429 errors) and/or to signal the user that the action wasn’t successful.
Since syncari client (link to syncari client page) already comes with handling of the common error codes, you can just wrap the API request in a try/except block and pass the error_response under the errors list in the Result record.
Example Create method:
def create(self, sync_request: SyncRequest) - List[Result]: # set variables entity_name = sync_request.entity.apiName data = sync_request.data results = [] # loop over provided records for record in data: try: # try to create a record resp = self.client.post(f'{entity_name}', json=record.values) record = resp.json() # if succesful append to the result list results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId)) except Syncari exception as e: # if not succesful append it to the list with the succes flag turned to False and providng the errors results.append(Result(syncariId=record.syncariEntityId, success=False, errors=[e.error_response.json()])) # return list of syncari results return results
Update Method
Similar to the Create method, the Update method is used when a Syncari record passes through the Destination node and the ID mapping / record unification already exists. Commonly the main difference is that during the Update method we provide the unique identifier in the request so we can update the correct corresponding record in the end system.
Example Update method:
def update(self, sync_request: SyncRequest) - List[Result]: # set variables entity_name = sync_request.entity.apiName data = sync_request.data results = [] # loop over provided records for record in data: try: # try to update a record resp = self.client.post(f'{entity_name}/{record.id}', json=record.values) record = resp.json() # if succesful append to the result list results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId)) except Syncari exception as e: # if not succesful append it to the list with the succes flag turned to False and providng the errors results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId, success=False, errors=[e.error_response.json()])) # return list of syncari results return results
Delete Method
Delete method is used when the synapse has the option enabled, where the records are deleted if another system deletes the records. Here the use case is similar to the Update method, it’s expected to make the API request to delete the record and provide a Result object per each record.
Example Delete method:
def update(self, sync_request: SyncRequest) - List[Result]: # set variables entity_name = sync_request.entity.apiName data = sync_request.data results = [] # loop over provided records for record in data: try: # try to delete a record resp = self.client.delete(f'{entity_name}/{record.id}') record = resp.json() # if succesful append to the result list results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId)) except Syncari exception as e: # if not succesful append it to the list with the succes flag turned to False and providng the errors results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId, success=False, errors=[e.error_response.json()])) # return list of syncari results return results
Below is an example of our Pipedrive implementation of CRUD methods. Since Pipedrive’s API follows REST standards it is possible to generalize the CRUD functionality in two helper methods to streamline the codebase of the Synapse.
In the example you can see that the CRUD functionality is handled by the two helper methods self.__crud() and self.__post_process_response(). Fist handles the API request made to Pipedrive and the latter handles processing of responses - creating the lists of either Record instances or Result instances.
Example:
def get_by_id(self, sync_request: SyncRequest) - List[Record]: if (self.request.entity.apiName == 'leadLabel'): raise SyncariException("GET_BY_ID for leadLabel not supported") # self.request (SyncRequest) return self.__crud(sync_request, CrudOperation.GET) # [Record()] def create(self, sync_request: SyncRequest) - List[Result]: # self.request (SyncRequest) return self.__crud(sync_request, CrudOperation.CREATE) # [Result()] def update(self, sync_request: SyncRequest) - List[Result]: # self.request (SyncRequest) return self.__crud(sync_request, CrudOperation.UPDATE) # [Result()] def delete(self, sync_request: SyncRequest) - List[Result]: # self.request (SyncRequest) return self.__crud(sync_request, CrudOperation.DELETE) def __crud(self, sync_request: SyncRequest, operation): entity_name = sync_request.entity.apiName plural_name = sync_request.entity.pluralName if not plural_name: plural_name = entity_name resps = [] for data in sync_request.data: data = Record.parse_obj(data) resp = None if operation is CrudOperation.GET: resp = self.client.get(plural_name + "/" + data.id, params=self.client.auth_config, json=data.values) elif operation is CrudOperation.CREATE: resp = self.client.post(plural_name, params=self.client.auth_config, json=data.values) elif operation is CrudOperation.UPDATE: resp = self.client.put(plural_name + "/" + data.id, params=self.client.auth_config, json=data.values) elif operation is CrudOperation.DELETE: resp = self.client.delete(plural_name + "/" + data.id, params=self.client.auth_config) resps.append(resp) return self.__post_process_response(sync_request, operation, resps) def __post_process_response(self, sync_request: SyncRequest, operation: CrudOperation, crud_response): eds = [] indx = 0 for resp in crud_response: if type(resp) is ErrorResponse: eds.append(Result(id=sync_request.data[indx].id,syncariId=sync_request.data[indx].syncariEntityId,success=False,errors=[str(resp)])) elif 'data' in resp.json(): row = resp.json()['data'] if operation is CrudOperation.GET: eds.append(self.__process_single_row(sync_request.entity.apiName, self.request.entity, row)) else: id_val = sync_request.data[indx].id if row is not None and isinstance(row, dict) and 'id' in row: id_val = row['id'] eds.append(Result(id=id_val,syncariId=sync_request.data[indx].syncariEntityId,success=True)) indx += 1 return eds