Read Method

Rok Kovač
Rok Kovač
  • Updated

Read method is used to implement three functionalities of Syncari sync studio - incremental live sync, testing by Time frame and historical resyncing of data.

On a high level the method will receive a sync_request (link to reference) object as an argument that will include the entity (link to reference) information with a watermark (link to reference) object that holds the timeframe in between which the records are expected to be returned. Once the records are returned by the custom synapse they’ll show up in the entity pipeline from the Source node (this applies for all use cases).

Watermark object in a sync_request will contain three types of flag to indicate what type of read invocation the Syncari platform is expected to run:

  • isResync -> when set to True a historical resync was triggered
  • isTest -> when set to True a time frame test was initiated
  • Initial -> when the sync was triggered for the first time, since the synapse was connected.

When the incremental sync is running all of the flags are set to False.

In the case of a live sync the start and end time values are by default one minute apart (this can be adjusted in the product to widen the time frame), in a Time frame test the timestamps are defined by the product by the user and in a historical data resync the values are from configured point to the current timestamp (when resyncing all the data the start value will begin with 0).

The method can be used for combination of use cases, bellow are the most common ones:

  • Incremental, Time frame test and resync are handled by the read method.
def read(self, sync_request: SyncRequest) - ReadResponse:
    # logic to handle incremental sync
    # logic to handle resync
    # logic to handle test
    records = []
    return ReadResponse(watermark=watermark,data=records,offsetType=OffsetType.RECORD_COUNT)
  • Time frame test and resync are handled by the read method. Incremental sync is handled by webhooks.
def read(self, sync_request: SyncRequest) - ReadResponse:
    watermark = sync_request.watermark
    if watermark.isResync or if watermark.isInitial or if watermark.isTest:
        # logic to handle incremental sync
        # logic to handle resync
        # logic to handle test
        records = []
        return ReadResponse(watermark=watermark,data=records,offsetType=OffsetType.RECORD_COUNT)
    else:
        return ReadResponse(watermark=watermark,data=[],offsetType=OffsetType.RECORD_COUNT)
  • Time frame test isn’t supported, Resync is handled by the read method. Incremental Sync is handled by webhooks. (this is a common use case where querying by date is not possible via the API).
def read(self, sync_request: SyncRequest) - ReadResponse:
    watermark = sync_request.watermark
    if watermark.isResync or if watermark.isInitial:
        # logic to handle incremental sync
        # logic to handle resync
        # logic to handle test
        records = []
        return ReadResponse(watermark=watermark,data=records,offsetType=OffsetType.RECORD_COUNT)
    elif watermark.isTest:
        raise SyncariException(ErrorResponse(status_code=400,message="Live test by date range is not supported for pipedrive "))
    else:
        return ReadResponse(watermark=watermark,data=[],offsetType=OffsetType.RECORD_COUNT)

In the second and third case please note that the method still needs to return a ReadResponse object with an empty data list, during the incremental live sync. 

Syncari uses Epoch time in milliseconds as the platform time. This is used in the start and end timestamps as well as on the creatededAt and modifiedAt dates on the Syncari record.

Logic during the method execution isn’t strictly enforced but in most use cases the logic will include:

  • Making an API request to an endpoint,
  • Converting the response to a list of Python dicts,
  • Create instances of Syncari Record class from received records
  • Handle Pagination

When creating a Syncari record it’s recommended to map the deleted field of the end system to the deleted field on the Syncari Record Class. If deletes can’t be mapped errors could occur on the destination side when the platform is trying to initiate an update. For more information please read: link to FAQ how does syncari handle deletes.

If you are using a timestamp in the source system as a watermark, pay attention to expected format, timezones. Some systems also use a secondary index to drive such queries, and may not have all the current data. Consider going back a little from the watermark value Syncari sends. This will minimize problems around missing records.

Example Code

Bellow is an example of a read method with using offset to paginate records: 

LIMIT = 1000

def read(self, sync_request: SyncRequest) - ReadResponse:
    # set variables
    entity_name = sync_request.entity.apiName
    watermark = sync_request.watermark
    start_date = watermark.start
    end_date = watermark.end
    records = []
    # set offset to previous one or start from 0
    offset = watermark.offset if watermark.offset else 0
    # fetch records and convert response to list of dicts
    resp = self.client.get(f"{sync_request.entity.pluralName}?updated_after={start}&updated_before={end}offset={offset}")
    resp_json = resp.json()
    # create syncari records
    for record in resp_json:
        # handle conversion to epoch in miliseconds if needed
lastModified = record.get('lastModified') createdAt = record.get('createdAt') records.append(Record(id=record.get('id), values=record, name=entity_name, lastModified=lastModified, createdAt=createdAt, deleted=record.get('deleted'))) # handle pagination
if len(records) < LIMIT:
watermark.offset = None
else:
watermark.offset += LIMIT
# return ReadResponse object
return ReadResponse(watermark=watermark,data=records,offsetType=OffsetType.RECORD_COUNT)

Best Parctice

It’s best practice to keep the batches of records at a maximum of a 1000 records per batch to ensure the pipelines are performant. If this is not possible due to the end system’s API limitations please consult with our support team regarding your specific use case. Usually those limits aren’t hit in the incremental live sync, but are important to implement for historical resyncs and cases where there are bulk data updates in the end system we are integrating with.

Custom Synapse framework supports majority of pagination flavors: offset, pages and cursors. In the watermark object (link to reference) there are two fields offset and cursor to support handling pagination. They can be modified when the method executes and will also persist the value for each entity respectively. Once the pages were processed the field that was used to handle pagination should be set to None, this way the platform will understand that the query was processed and will move on.

Best practice on a production synapse is to have a logic implemented that checks if the field that we are using for pagination has a value or not, depending on this either make an initial query or go to the next available page. Once records are processed and before we return the ReadResponse object, we should check if the paginated field needs to be modified or cleared.

Once pagination is handled the method should return a ReadResponse object that contains the data - list of Syncari records, Watermark object and Pagination type. Possible pagination types are: NONE, PAGE_NUMBER, RECORD_COUNT, TIMESTAMP, CUSTOM. 

Pipedrive Example Code

In the example method below we can see an example implementation of the read method where it is only used on historical resyncs and initial syncs. Testing by Time frame is not supported by the API, hence there is an exception implemented and the incremental sync is being handled by webhooks instead. The records are being processed with a helper method self.__process_single_row(), which handles date conversion and reference field mappings. 

Example:

def read(self, sync_request: SyncRequest) - ReadResponse:
   entity_name = sync_request.entity.apiName
   watermark = sync_request.watermark
   self.print(self.read.__name__, sync_request)
   if watermark.isTest:
       raise SyncariException(ErrorResponse(status_code=400,message="Live test by date range is not supported for pipedrive "))
   # Nothing to query for incremental sync. Everything is handled by webhooks.
   if not watermark.isResync and not watermark.initial: # lead/leadlabel does not have webhook support. and entity_name not in ['lead','leadLabel']:
       return ReadResponse(watermark=watermark,data=[],offsetType=OffsetType.RECORD_COUNT)
   auth_data = {'api_token':self.connection.authConfig.accessToken}
   if entity_name != 'user':
       auth_data['sort'] = 'update_time'
   # start token for pagination
   auth_data['start'] = watermark.offset
   resp = self.client.get(sync_request.entity.pluralName, params=auth_data)
   eds = []
   if 'data' in resp.json() and resp.json()['data'] is not None:
       for row in resp.json()['data']:
           eds.append(self.__process_single_row(entity_name, sync_request.entity, row))
   # Set/Reset the watermark offset.
   watermark.offset = 0
   if 'additional_data' in resp.json() and resp.json()['additional_data'] is not None:
       if 'pagination' in resp.json()['additional_data']:
           pagination = resp.json()['additional_data']['pagination']
           if 'next_start' in pagination:
               watermark.offset = pagination['next_start']
   return ReadResponse(watermark=watermark,data=eds,offsetType=OffsetType.RECORD_COUNT)


def __process_single_row(self, entity_name, entity, row):
   created_field = 'add_time'
   updated_field = 'update_time'
   if entity_name == 'user':
       created_field = 'created'
       updated_field = 'modified'
   date_format = '%Y-%m-%d %H:%M:%S'
   if entity_name in ['lead', 'leadLabel']:
       date_format = '%Y-%m-%dT%H:%M:%S.%f%z'
   # Handle date fields.
   if created_field in row and row[created_field] is not None:
       utc_created_at =  timegm(time.strptime(row[created_field], date_format))
   else:
       utc_created_at =  int(time.time())
        
   if updated_field in row and row[updated_field] is not None:
       utc_updated_at = timegm(time.strptime(row[updated_field], date_format))
   else:
       utc_updated_at = int(time.time())
   reference_fields = []
   if entity:
       for attribute in entity.attributes:
           if attribute.dataType is DataType.REFERENCE:
               reference_fields.append(attribute.apiName)
   for reference_field in reference_fields:
       if reference_field in row and row[reference_field] is not None and isinstance(row[reference_field], dict):
           if 'id' in row[reference_field]:
               row[reference_field] = row[reference_field]['id']
                elif 'value' in row[reference_field]:
                    row[reference_field] = row[reference_field]['value']
   for primary_value_field in ['email', 'phone']:
       self.__process_primary_value_fields(primary_value_field, row)
   if entity_name == 'lead':
       self.__process_lead_record(row)
   return Record(name=entity_name,id=row['id'],syncariEntityId='', deleted=False,isNew=False,lastModified=utc_updated_at,createdAt=utc_created_at,values=row)

 

Share this

Was this article helpful?

0 out of 0 found this helpful