CRUD Methods (Create, Update, Get By ID, Delete)

Rok Kovač
Rok Kovač
  • Updated

Create, Update, Get By ID and Delete methods require similar setup and share functionality. Instead of the watermark object, all methods receive a list of syncari records under the data key in sync_request object, on which the platform expects the respective method to perform CRUD operation on.

Get By Id Method

Get By Id is expected to receive the current fields of each record that is passed in the data list. List of Syncari Records will hold the Id, which is the Record Id in the end system. The expected output of the method is a list of the same records with values updated. Whenever the API supports it, we recommend the option to fetch bulk records to minimize the amount of API requests a Synapse needs to make.

Get By ID method is used by the platform in following scenarios:

  • Testing - allows to test the pipeline by providing an external record’s ID
  • Wait step - allows the utilization of the wait step node in pipelines. This queues the record and re-tries to fetch the record until the time setting expires.

If the method is not supported by the API it should be omitted from the synapse or a non supported exception should be raised - this way the platform will gracefully handle the exception.

Example Get by ID method:

def get_by_id(self, sync_request: SyncRequest) - List[Recod]:
    # set variables
    entity_name = sync_request.entity.apiName
    data = sync_request.data
    records = []
    # loop over provided records
    for record in data:
        # get the record values from the api
        resp = self.client.get(f'{entity_name}/{record.id}')
        record = resp.json()
        # set the system dates and convert them to epoch millisecond if applicable
        lastModified = record.get('lastModified')
        createdAt = record.get('createdAt')
        # append the record to records list
        records.append(Record(id=record.get('id), values=record, name=entity_name, lastModified=lastModified, createdAt=createdAt))
    # return list of syncari records
    return records

Create Method

Create method is used on the destination side to create records in a third party system. The Create will be triggered anytime a Syncari Record passes the destination node and the ID mapping was not yet created / record was not yet unified. The method is expected to return a list of Result records, which create the Id mapping - tying the Syncari record to its counterpart version in the end system. On the Result record we can also specify if the API request was successful or not, so the platform can understand if it needs to retry (in cases of rate limits / 429 errors) and/or to signal the user that the action wasn’t successful.

Since syncari client (link to syncari client page) already comes with handling of the common error codes, you can just wrap the API request in a try/except block and pass the error_response under the errors list in the Result record.

Example Create method:

def create(self, sync_request: SyncRequest) - List[Result]:
    # set variables
    entity_name = sync_request.entity.apiName
    data = sync_request.data
    results = []
    # loop over provided records
    for record in data:
        try:
            # try to create a record
            resp = self.client.post(f'{entity_name}', json=record.values)
            record = resp.json()
            # if succesful append to the result list
            results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId))
        except Syncari exception as e:
            # if not succesful append it to the list with the succes flag turned to False and providng the errors
            results.append(Result(syncariId=record.syncariEntityId, success=False, errors=[e.error_response.json()]))
    # return list of syncari results
    return results

Update Method

Similar to the Create method, the Update method is used when a Syncari record passes through the Destination node and the ID mapping / record unification already exists. Commonly the main difference is that during the Update method we provide the unique identifier in the request so we can update the correct corresponding record in the end system.

Example Update method:

def update(self, sync_request: SyncRequest) - List[Result]:
    # set variables
    entity_name = sync_request.entity.apiName
    data = sync_request.data
    results = []
    # loop over provided records
    for record in data:
        try:
          # try to update a record
            resp = self.client.post(f'{entity_name}/{record.id}', json=record.values)
            record = resp.json()
            # if succesful append to the result list
            results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId))
        except Syncari exception as e:
        # if not succesful append it to the list with the succes flag turned to False and providng the errors
             results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId, success=False, errors=[e.error_response.json()]))
    # return list of syncari results
    return results

Delete Method

Delete method is used when the synapse has the option enabled, where the records are deleted if another system deletes the records. Here the use case is similar to the Update method, it’s expected to make the API request to delete the record and provide a Result object per each record.

Example Delete method:

def update(self, sync_request: SyncRequest) - List[Result]:
    # set variables
    entity_name = sync_request.entity.apiName
    data = sync_request.data
    results = []
    # loop over provided records
    for record in data:
        try:
            # try to delete a record
            resp = self.client.delete(f'{entity_name}/{record.id}')
            record = resp.json()
            # if succesful append to the result list
            results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId))
        except Syncari exception as e:
            # if not succesful append it to the list with the succes flag turned to False and providng the errors
            results.append(Result(id=record.get('id'), syncariId=record.syncariEntityId, success=False, errors=[e.error_response.json()]))
    # return list of syncari results
    return results

Below is an example of our Pipedrive implementation of CRUD methods. Since Pipedrive’s API follows REST standards it is possible to generalize the CRUD functionality in two helper methods to streamline the codebase of the Synapse.

In the example you can see that the CRUD functionality is handled by the two helper methods self.__crud() and self.__post_process_response(). Fist handles the API request made to Pipedrive and the latter handles processing of responses - creating the lists of either Record instances or Result instances.

Example:

def get_by_id(self, sync_request: SyncRequest) - List[Record]:
    if (self.request.entity.apiName == 'leadLabel'):
        raise SyncariException("GET_BY_ID for leadLabel not supported")
    # self.request (SyncRequest)
    return self.__crud(sync_request, CrudOperation.GET)
    # [Record()]

def create(self, sync_request: SyncRequest) - List[Result]:
    # self.request (SyncRequest)
    return self.__crud(sync_request, CrudOperation.CREATE)
    # [Result()]

def update(self, sync_request: SyncRequest) - List[Result]:
    # self.request (SyncRequest)
    return self.__crud(sync_request, CrudOperation.UPDATE)
    # [Result()]

def delete(self, sync_request: SyncRequest) - List[Result]:
    # self.request (SyncRequest)
    return self.__crud(sync_request, CrudOperation.DELETE)

def __crud(self, sync_request: SyncRequest, operation):
    entity_name = sync_request.entity.apiName
    plural_name = sync_request.entity.pluralName
    if not plural_name:
        plural_name = entity_name
    resps = []
    for data in sync_request.data:
        data = Record.parse_obj(data)
        resp = None
        if operation is CrudOperation.GET:
            resp = self.client.get(plural_name + "/" + data.id, params=self.client.auth_config, json=data.values)
        elif operation is CrudOperation.CREATE:
            resp = self.client.post(plural_name, params=self.client.auth_config, json=data.values)
        elif operation is CrudOperation.UPDATE:
            resp = self.client.put(plural_name + "/" + data.id, params=self.client.auth_config, json=data.values)
        elif operation is CrudOperation.DELETE:
            resp = self.client.delete(plural_name + "/" + data.id, params=self.client.auth_config)
        resps.append(resp)
    return self.__post_process_response(sync_request, operation, resps)

def __post_process_response(self, sync_request: SyncRequest, operation: CrudOperation, crud_response):
    eds = []
    indx = 0
    for resp in crud_response:
        if type(resp) is ErrorResponse:
            eds.append(Result(id=sync_request.data[indx].id,syncariId=sync_request.data[indx].syncariEntityId,success=False,errors=[str(resp)]))
        elif 'data' in resp.json():
            row = resp.json()['data']
            if operation is CrudOperation.GET:
                eds.append(self.__process_single_row(sync_request.entity.apiName, self.request.entity, row))
            else:
                id_val = sync_request.data[indx].id
                if row is not None and isinstance(row, dict) and 'id' in row:
                    id_val = row['id']
                eds.append(Result(id=id_val,syncariId=sync_request.data[indx].syncariEntityId,success=True))
        indx += 1
    return eds
Share this

Was this article helpful?

0 out of 0 found this helpful