diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml index fdc108b..64dea8d 100644 --- a/.github/dependabot.yaml +++ b/.github/dependabot.yaml @@ -9,7 +9,6 @@ updates: time: "18:00" # UTC commit-message: prefix: "Upgrade: [dependabot] - " - include: "scope" - package-ecosystem: "pip" directory: "/" @@ -21,4 +20,3 @@ updates: open-pull-requests-limit: 20 commit-message: prefix: "Upgrade: [dependabot] - " - include: "scope" diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d8a934e..6c288c2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -26,4 +26,5 @@ jobs: with: asdfVersion: ${{ needs.get_asdf_version.outputs.version }} reinstall_poetry: true - run_sonar: false + secrets: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 67af967..70776dd 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -30,7 +30,8 @@ jobs: with: asdfVersion: ${{ needs.get_asdf_version.outputs.version }} reinstall_poetry: true - run_sonar: false + secrets: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} tag_release: name: Tag Release (Dry Run) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b80d5b0..72307a4 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,7 +24,8 @@ jobs: with: asdfVersion: ${{ needs.get_asdf_version.outputs.version }} reinstall_poetry: true - run_sonar: false + secrets: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} get_next_version: name: Get Next Version Number for Poetry diff --git a/.trivyignore b/.trivyignore new file mode 100644 index 0000000..d58336d --- /dev/null +++ b/.trivyignore @@ -0,0 +1,5 @@ +# urllib3 - can't upgrade due to spine version restraints +CVE-2025-66418 +CVE-2025-66471 +CVE-2026-21441 +CVE-2025-50181 diff --git a/src/eps_spine_shared/common/dynamodb_datastore.py b/src/eps_spine_shared/common/dynamodb_datastore.py index f9b8574..1113f33 100644 --- a/src/eps_spine_shared/common/dynamodb_datastore.py +++ b/src/eps_spine_shared/common/dynamodb_datastore.py @@ -27,8 +27,8 @@ from eps_spine_shared.logger import EpsLogger from eps_spine_shared.nhsfundamentals.timeutilities import ( TimeFormats, - convertSpineDate, - timeNowAsString, + convert_spine_date, + time_now_as_string, ) @@ -78,6 +78,7 @@ class EpsDynamoDbDataStore: NOTIFICATION_PREFIX = "Notification_" STORE_TIME_DOC_REF_TITLE_PREFIX = "NominatedReleaseRequestMsgRef" DEFAULT_EXPIRY_DAYS = 56 + MAX_NEXT_ACTIVITY_DATE = "99991231" def __init__( self, @@ -134,6 +135,41 @@ def get_expire_at(self, delta, from_datetime=None): return int((from_datetime + delta).timestamp()) + def calculate_record_expire_at( + self, next_activity, next_activity_date_str, creation_datetime_string + ): + """ + If the record next activity is delete or purge, use the nextActivity and nextActivityDate + to calculate its expireAt (ttl) value, otherwise fall-back to the default of 18 months. + """ + creation_datetime = convert_spine_date( + creation_datetime_string, TimeFormats.STANDARD_DATE_TIME_FORMAT + ) + creation_datetime_utc = datetime.combine( + creation_datetime.date(), creation_datetime.time(), timezone.utc + ) + default_expire_at = self.get_expire_at(relativedelta(months=18), creation_datetime_utc) + + if ( + next_activity.lower() not in ["delete", "purge"] + or not next_activity_date_str + or next_activity_date_str == self.MAX_NEXT_ACTIVITY_DATE + ): + return default_expire_at + + delta = relativedelta() if next_activity.lower() == "purge" else relativedelta(months=12) + + next_activity_datetime = convert_spine_date( + next_activity_date_str, TimeFormats.STANDARD_DATE_FORMAT + ) + next_activity_datetime_utc = datetime.combine( + next_activity_datetime.date(), next_activity_datetime.time(), timezone.utc + ) + + next_activity_expire_at = self.get_expire_at(delta, next_activity_datetime_utc) + + return min(next_activity_expire_at, default_expire_at) + def build_document(self, internal_id, document, index): """ Build EPS Document object to be inserted into DynamoDB. @@ -182,6 +218,22 @@ def convert_index_keys_to_lower_case(self, index): return index return {key.lower(): index[key] for key in index} + def parse_next_activity_nad(self, indexes): + """ + Split nextActivityNAD string into sharded next activity and its date. + """ + next_activity_nad = indexes["nextActivityNAD_bin"][0] + next_activity_nad_split = next_activity_nad.split("_") + + next_activity = next_activity_nad_split[0] + next_activity_date_str = ( + next_activity_nad_split[1] if len(next_activity_nad_split) == 2 else None + ) + + shard = randint(1, NEXT_ACTIVITY_DATE_PARTITIONS) + + return next_activity, shard, next_activity_date_str + def build_record(self, prescription_id, record, record_type, indexes): """ Build EPS Record object to be inserted into DynamoDB. @@ -192,36 +244,34 @@ def build_record(self, prescription_id, record, record_type, indexes): indexes = record["indexes"] instances = record["instances"].values() - next_activity_nad = indexes["nextActivityNAD_bin"][0] - next_activity_nad_split = next_activity_nad.split("_") - next_activity = next_activity_nad_split[0] - next_activity_is_purge = next_activity.lower() == "purge" - - next_activity_shard = randint(1, NEXT_ACTIVITY_DATE_PARTITIONS) - sharded_next_activity = f"{next_activity}.{next_activity_shard}" + next_activity, shard, next_activity_date_str = self.parse_next_activity_nad(indexes) scn = record["SCN"] compressed_record = zlib.compress(simplejson.dumps(record).encode("utf-8")) + creation_datetime_string = record["prescription"]["prescriptionTime"] + + expire_at = self.calculate_record_expire_at( + next_activity, next_activity_date_str, creation_datetime_string + ) + item = { Key.PK.name: record_key, Key.SK.name: SortKey.RECORD.value, ProjectedAttribute.BODY.name: compressed_record, - Attribute.NEXT_ACTIVITY.name: sharded_next_activity, + Attribute.NEXT_ACTIVITY.name: f"{next_activity}.{shard}", ProjectedAttribute.SCN.name: scn, ProjectedAttribute.INDEXES.name: self.convert_index_keys_to_lower_case(indexes), + ProjectedAttribute.EXPIRE_AT.name: expire_at, } - if len(next_activity_nad_split) == 2: - item[Attribute.NEXT_ACTIVITY_DATE.name] = next_activity_nad_split[1] + if next_activity_date_str: + item[Attribute.NEXT_ACTIVITY_DATE.name] = next_activity_date_str - if next_activity_is_purge: + if next_activity.lower() == "purge": return item - # POC - Leverage methods in PrescriptionRecord to get some/all of these. - creation_datetime_string = record["prescription"]["prescriptionTime"] nhs_number = record["patient"]["nhsNumber"] - prescriber_org = record["prescription"]["prescribingOrganization"] statuses = list(set([instance["prescriptionStatus"] for instance in instances])) @@ -240,21 +290,12 @@ def build_record(self, prescription_id, record, record_type, indexes): nominated_pharmacy = record.get("nomination", {}).get("nominatedPerformer") - creation_datetime = convertSpineDate( - creation_datetime_string, TimeFormats.STANDARD_DATE_TIME_FORMAT - ) - creation_datetime_utc = datetime.combine( - creation_datetime.date(), creation_datetime.time(), timezone.utc - ) - expire_at = self.get_expire_at(relativedelta(months=18), creation_datetime_utc) - item_update = { Attribute.CREATION_DATETIME.name: creation_datetime_string, Attribute.NHS_NUMBER.name: nhs_number, Attribute.PRESCRIBER_ORG.name: prescriber_org, ProjectedAttribute.STATUS.name: status, Attribute.IS_READY.name: int(is_ready), - ProjectedAttribute.EXPIRE_AT.name: expire_at, } if dispenser_org: item[Attribute.DISPENSER_ORG.name] = dispenser_org @@ -285,7 +326,7 @@ def insert_eps_work_list(self, internal_id, message_id, work_list, index=None): """ Insert EPS WorkList object into the configured table. """ - work_list_indexes = {self.INDEX_WORKLISTDATE: [timeNowAsString()]} + work_list_indexes = {self.INDEX_WORKLISTDATE: [time_now_as_string()]} if index: work_list_indexes = index @@ -599,7 +640,7 @@ def store_batch_claim(self, internal_id, batch_claim_original): claim_id_index_terms = batch_claim["Claim ID List"] handle_time_index_term = batch_claim["Handle Time"] sequence_number = batch_claim["Sequence Number"] - index_scn_value = f"{timeNowAsString()}|{sequence_number}" + index_scn_value = f"{time_now_as_string()}|{sequence_number}" nwssp = "Nwssp Sequence Number" in batch_claim nwssp_sequence_number = batch_claim.get("Nwssp Sequence Number") diff --git a/src/eps_spine_shared/common/dynamodb_index.py b/src/eps_spine_shared/common/dynamodb_index.py index 551ada2..0ccb070 100644 --- a/src/eps_spine_shared/common/dynamodb_index.py +++ b/src/eps_spine_shared/common/dynamodb_index.py @@ -154,7 +154,6 @@ def build_terms(self, items, index_name, term_regex): """ Build terms from items returned by the index query. """ - # POC - Project the body into the index and do away with 'terms' altogether. terms = [] for item in items: index_terms = item.get(ProjectedAttribute.INDEXES.name, {}).get(index_name.lower()) @@ -163,7 +162,6 @@ def build_terms(self, items, index_name, term_regex): [ terms.append((index_term, item[Key.PK.name])) for index_term in index_terms - # POC - term_regex can be replaced by filter expressions for status and releaseVersion. if ((not term_regex) or re.search(term_regex, index_term)) ] return terms diff --git a/src/eps_spine_shared/common/indexes.py b/src/eps_spine_shared/common/indexes.py index 49f53c4..ce39a81 100644 --- a/src/eps_spine_shared/common/indexes.py +++ b/src/eps_spine_shared/common/indexes.py @@ -1,6 +1,6 @@ from eps_spine_shared.errors import EpsSystemError from eps_spine_shared.logger import EpsLogger -from eps_spine_shared.nhsfundamentals.timeutilities import timeNowAsString +from eps_spine_shared.nhsfundamentals.timeutilities import time_now_as_string INDEX_NHSNUMBER_DATE = "nhsNumberDate_bin" INDEX_NHSNUMBER_PRDATE = "nhsNumberPrescriberDate_bin" @@ -243,4 +243,4 @@ def _add_delta_index(self, eps_record, index_dict): """ See build_indexes """ - index_dict[INDEX_DELTA] = [timeNowAsString() + SEPERATOR + str(eps_record.get_scn())] + index_dict[INDEX_DELTA] = [time_now_as_string() + SEPERATOR + str(eps_record.get_scn())] diff --git a/src/eps_spine_shared/common/prescription/line_item.py b/src/eps_spine_shared/common/prescription/line_item.py index 8515ac9..0fe052f 100644 --- a/src/eps_spine_shared/common/prescription/line_item.py +++ b/src/eps_spine_shared/common/prescription/line_item.py @@ -85,17 +85,17 @@ def expire(self, parent_prescription): :type parent_prescription: PrescriptionRecord """ - currentStatus = self.status - if currentStatus not in LineItemStatus.EXPIRY_IMMUTABLE_STATES: - newStatus = LineItemStatus.EXPIRY_LOOKUP[currentStatus] - self.update_status(newStatus) + current_status = self.status + if current_status not in LineItemStatus.EXPIRY_IMMUTABLE_STATES: + new_status = LineItemStatus.EXPIRY_LOOKUP[current_status] + self.update_status(new_status) parent_prescription.logObject.write_log( "EPS0072b", None, { - "internalID": parent_prescription.internalID, + "internalID": parent_prescription.internal_id, "lineItemChanged": self.id, - "previousStatus": currentStatus, - "newStatus": newStatus, + "previousStatus": current_status, + "newStatus": new_status, }, ) diff --git a/src/eps_spine_shared/common/prescription/record.py b/src/eps_spine_shared/common/prescription/record.py index 295b5bf..02637f4 100644 --- a/src/eps_spine_shared/common/prescription/record.py +++ b/src/eps_spine_shared/common/prescription/record.py @@ -16,7 +16,7 @@ ) from eps_spine_shared.logger import EpsLogger from eps_spine_shared.nhsfundamentals.timeutilities import TimeFormats -from eps_spine_shared.spinecore.baseutilities import handleEncodingOddities, quoted +from eps_spine_shared.spinecore.baseutilities import handle_encoding_oddities, quoted from eps_spine_shared.spinecore.changelog import PrescriptionsChangeLogProcessor @@ -158,7 +158,7 @@ def add_event_to_change_log(self, message_id, event_log): event_log[PrescriptionsChangeLogProcessor.SCN] = self.get_scn() length_before = len(self.prescription_record.get(fields.FIELD_CHANGE_LOG, [])) try: - PrescriptionsChangeLogProcessor.updateChangeLog( + PrescriptionsChangeLogProcessor.update_change_log( self.prescription_record, event_log, message_id, self.SCN_MAX ) except Exception as e: # noqa: BLE001 @@ -901,7 +901,7 @@ def _create_cancellation_summary_dict( _cancellation_reasons = str(cancellation_status) _cancellation_id = _cancellation.get(fields.FIELD_CANCELLATION_ID, []) - _scn = PrescriptionsChangeLogProcessor.getSCN( + _scn = PrescriptionsChangeLogProcessor.get_scn( self.prescription_record["changeLog"].get(_cancellation_id, {}) ) for _cancellation_reason in _cancellation.get(fields.FIELD_REASONS, []): @@ -909,7 +909,7 @@ def _create_cancellation_summary_dict( if _subsequent_reason: _cancellation_reasons += "; " _subsequent_reason = True - _cancellation_reasons += str(handleEncodingOddities(_cancellation_text)) + _cancellation_reasons += str(handle_encoding_oddities(_cancellation_text)) if ( _cancellation.get(fields.FIELD_CANCELLATION_TARGET) == "Prescription" diff --git a/src/eps_spine_shared/nhsfundamentals/timeutilities.py b/src/eps_spine_shared/nhsfundamentals/timeutilities.py index 829dafb..e9d1092 100644 --- a/src/eps_spine_shared/nhsfundamentals/timeutilities.py +++ b/src/eps_spine_shared/nhsfundamentals/timeutilities.py @@ -34,54 +34,54 @@ class TimeFormats: } -def _guessCommonDateTimeFormat(timeString, raiseErrorIfUnknown=False): +def _guess_common_datetime_format(time_string, raise_error_if_unknown=False): """ Guess the date time format from the commonly used list Args: - timeString (str): + time_string (str): The datetime string to try determine the format of. - raiseErrorIfUnknown (bool): + raise_error_if_unknown (bool): Determines the action when the format cannot be determined. False (default) will return None, True will raise an error. """ - _format = None - if len(timeString) == 19: + time_format = None + if len(time_string) == 19: try: - datetime.strptime(timeString, TimeFormats.EBXML_FORMAT) - _format = TimeFormats.EBXML_FORMAT + datetime.strptime(time_string, TimeFormats.EBXML_FORMAT) + time_format = TimeFormats.EBXML_FORMAT except ValueError: - _format = TimeFormats.STANDARD_DATE_TIME_UTC_ZONE_FORMAT + time_format = TimeFormats.STANDARD_DATE_TIME_UTC_ZONE_FORMAT else: - _format = _TIMEFORMAT_LENGTH_MAP.get(len(timeString), None) + time_format = _TIMEFORMAT_LENGTH_MAP.get(len(time_string), None) - if not _format and raiseErrorIfUnknown: - raise ValueError("Could not determine datetime format of '{}'".format(timeString)) + if not time_format and raise_error_if_unknown: + raise ValueError("Could not determine datetime format of '{}'".format(time_string)) - return _format + return time_format -def convertSpineDate(dateString, dateFormat=None): +def convert_spine_date(date_string, date_format=None): """ Try to convert a Spine date using the passed format - if it fails - try the most appropriate """ - if dateFormat: + if date_format: try: - dateObject = datetime.strptime(dateString, dateFormat) - return dateObject + date_object = datetime.strptime(date_string, date_format) + return date_object except ValueError: pass - dateFormat = _guessCommonDateTimeFormat(dateString, raiseErrorIfUnknown=True) - return datetime.strptime(dateString, dateFormat) + date_format = _guess_common_datetime_format(date_string, raise_error_if_unknown=True) + return datetime.strptime(date_string, date_format) -def timeNowAsString(_dateFormat=TimeFormats.STANDARD_DATE_TIME_FORMAT): +def time_now_as_string(date_format=TimeFormats.STANDARD_DATE_TIME_FORMAT): """ Return the current date and time as a string in standard format """ - return now().strftime(_dateFormat) + return now().strftime(date_format) def now(): diff --git a/src/eps_spine_shared/spinecore/baseutilities.py b/src/eps_spine_shared/spinecore/baseutilities.py index e841a8a..bd7edde 100644 --- a/src/eps_spine_shared/spinecore/baseutilities.py +++ b/src/eps_spine_shared/spinecore/baseutilities.py @@ -3,7 +3,7 @@ import six -def handleEncodingOddities(text, attemptEscapedReplacement=False): +def handle_encoding_oddities(text, attempt_escaped_replacement=False): """ Strip accents and non-ascii characters from unicode strings """ @@ -26,7 +26,7 @@ def handleEncodingOddities(text, attemptEscapedReplacement=False): # if replacement is not requested, use composed characters # and replace them with question marks when encoding to ascii. # This is only done if using the fallback latin1 encoding as a last resort - if not attemptEscapedReplacement: + if not attempt_escaped_replacement: form = "NFKC" mode = "replace" @@ -40,4 +40,4 @@ def quoted(value): try: return '"' + str(value) + '"' except (UnicodeEncodeError, UnicodeDecodeError): - return '"' + handleEncodingOddities(value) + '"' + return '"' + handle_encoding_oddities(value) + '"' diff --git a/src/eps_spine_shared/spinecore/changelog.py b/src/eps_spine_shared/spinecore/changelog.py index d0a3855..f0a4c06 100644 --- a/src/eps_spine_shared/spinecore/changelog.py +++ b/src/eps_spine_shared/spinecore/changelog.py @@ -36,156 +36,154 @@ class ChangeLogProcessor(object): INVALID_SCN = -1 @classmethod - def logForGeneralUpdate(cls, sCN, internalID=None, xslt=None, rspParameters=None): + def log_for_general_update(cls, scn, internal_id=None, xslt=None, rsp_parameters=None): """ Add a general change log update, nothing specific to a domain """ - if not rspParameters: - rspParameters = {} + if not rsp_parameters: + rsp_parameters = {} logOfChange = {} _timeOfChange = datetime.datetime.now().strftime(TimeFormats.STANDARD_DATE_TIME_FORMAT) logOfChange[cls.TIMESTAMP] = _timeOfChange - logOfChange[cls.SCN] = sCN - logOfChange[cls.INTERNAL_ID] = internalID + logOfChange[cls.SCN] = scn + logOfChange[cls.INTERNAL_ID] = internal_id logOfChange[cls.XSLT] = xslt - logOfChange[cls.RSP_PARAMS] = rspParameters + logOfChange[cls.RSP_PARAMS] = rsp_parameters return logOfChange @classmethod - def updateChangeLog(cls, record, newLog, messageID, prunePoint=None): + def update_change_log(cls, record, new_log, message_id, prune_point=None): """ Take a change log from the record, add the new log to it, and prune to the prune point """ - if not prunePoint: - prunePoint = cls.PRUNE_POINT + if not prune_point: + prune_point = cls.PRUNE_POINT - changeLog = record.get(cls.RECORD_CHANGELOG_REF, {}) - changeLog[messageID] = newLog + change_log = record.get(cls.RECORD_CHANGELOG_REF, {}) + change_log[message_id] = new_log + cls.prune_change_log(change_log, prune_point) - cls.pruneChangeLog(changeLog, prunePoint) - - record[cls.RECORD_CHANGELOG_REF] = changeLog + record[cls.RECORD_CHANGELOG_REF] = change_log return record @classmethod - def pruneChangeLog(cls, changeLog, prunePoint): + def prune_change_log(cls, change_log, prune_point): """ Prune to the prune point """ - if prunePoint != cls.DO_NOT_PRUNE: - _, _highestSCN = cls.getHighestSCN(changeLog) - if _highestSCN != cls.INVALID_SCN: - _scnToPrune = _highestSCN - prunePoint - pruneList = [] - for guid, changeLogEntry in changeLog.items(): - _entrySCN = int(changeLogEntry.get(cls.SCN, cls.INVALID_SCN)) - if _entrySCN < _scnToPrune: - pruneList.append(guid) - - for guid in pruneList: - del changeLog[guid] + if prune_point != cls.DO_NOT_PRUNE: + _, highest_scn = cls.get_highest_scn(change_log) + if highest_scn != cls.INVALID_SCN: + scn_to_prune = highest_scn - prune_point + prune_list = [] + for guid, change_log_entry in change_log.items(): + entry_scn = int(change_log_entry.get(cls.SCN, cls.INVALID_SCN)) + if entry_scn < scn_to_prune: + prune_list.append(guid) + for guid in prune_list: + del change_log[guid] @classmethod - def getHighestSCN(cls, changeLog): + def get_highest_scn(cls, change_log): """ - Return the (guid, scn) from the first changeLog found with the highest SCN + Return the (guid, scn) from the first change_log found with the highest SCN """ - (highestGUID, highestSCN) = (None, cls.INVALID_SCN) - for _guid in changeLog: - _scn = int(changeLog[_guid].get(cls.SCN, cls.INVALID_SCN)) - if _scn > highestSCN: - highestGUID = _guid - highestSCN = _scn - return (highestGUID, highestSCN) + (highest_guid, highest_scn) = (None, cls.INVALID_SCN) + for guid in change_log: + scn = int(change_log[guid].get(cls.SCN, cls.INVALID_SCN)) + if scn > highest_scn: + highest_guid = guid + highest_scn = scn + return (highest_guid, highest_scn) @classmethod - def getSCN(cls, changeLogEntry): + def get_scn(cls, change_log_entry): """ - Retrieve the SCN as an int from the provided changeLog entry + Retrieve the SCN as an int from the provided change_log entry """ - scnNumber = int(changeLogEntry.get(cls.SCN, cls.INVALID_SCN)) - return scnNumber + scn_number = int(change_log_entry.get(cls.SCN, cls.INVALID_SCN)) + return scn_number @classmethod - def listSCNs(cls, changeLog): + def list_scns(cls, change_log): """ - Performs list comprehension on the changeLog dictionary to retrieve all the SCNs from changeLog + Performs list comprehension on the change_log dictionary to retrieve all the SCNs from change_log - Duplicates will be present and changeLog entries with no SCN will be represented with the + Duplicates will be present and change_log entries with no SCN will be represented with the INVALID_SCN constant """ - scnNumberList = [cls.getSCN(changeLog[x]) for x in changeLog] - return scnNumberList + scn_number_list = [cls.get_scn(change_log[x]) for x in change_log] + return scn_number_list @classmethod - def getMaxSCN(cls, changeLog): + def get_max_scn(cls, change_log): """ - Return the highest SCN value from the provided changeLog + Return the highest SCN value from the provided change_log """ - scnNumberList = cls.listSCNs(changeLog) - if not scnNumberList: + scn_number_list = cls.list_scns(change_log) + if not scn_number_list: return cls.INVALID_SCN - highestSCN = max(scnNumberList) - return highestSCN + highest_scn = max(scn_number_list) + return highest_scn @classmethod - def getAllGuidsForSCN(cls, changeLog, searchScn): + def get_all_guids_for_scn(cls, change_log, search_scn): """ - For the provided SCN return the GUID Keys of all the changeLog entries that have that SCN + For the provided SCN return the GUID Keys of all the change_log entries that have that SCN Usually this will be a single GUID, but in the case of tickled records there can be multiple. """ - searchScn = int(searchScn) - guidList = [k for k in changeLog if cls.getSCN(changeLog[k]) == searchScn] - return guidList + search_scn = int(search_scn) + guid_list = [k for k in change_log if cls.get_scn(change_log[k]) == search_scn] + return guid_list @classmethod - def getMaxSCNGuids(cls, changeLog): + def get_max_scn_guids(cls, change_log): """ - Finds the highest SCN in the changeLog and returns all the GUIDs that have that SCN + Finds the highest SCN in the change_log and returns all the GUIDs that have that SCN """ - highestSCN = cls.getMaxSCN(changeLog) - guidList = cls.getAllGuidsForSCN(changeLog, highestSCN) - return guidList + highest_scn = cls.get_max_scn(change_log) + guid_list = cls.get_all_guids_for_scn(change_log, highest_scn) + return guid_list @classmethod - def getAllGuids(cls, changeLog): + def get_all_guids(cls, change_log): """ - Return a list of all the GUID keys from the provided changeLog + Return a list of all the GUID keys from the provided change_log """ - return list(changeLog.keys()) + return list(change_log.keys()) @classmethod - def getLastChangeTime(cls, changeLog): + def get_last_change_time(cls, change_log): """ Returns the last change time """ try: - guid = cls.getMaxSCNGuids(changeLog)[0] + guid = cls.get_max_scn_guids(change_log)[0] except IndexError: return None - return changeLog[guid].get(cls.TIMESTAMP) + return change_log[guid].get(cls.TIMESTAMP) @classmethod - def setInitialChangeLog(cls, record, internalID, reasonGUID=None): + def set_initial_change_log(cls, record, internal_id, reason_guid=None): """ If no change log is present set an initial change log on the record. It may use a GUID as a key or a string explaining the reason for initiating the change log. """ - changeLog = record.get(cls.RECORD_CHANGELOG_REF) - if changeLog: + change_log = record.get(cls.RECORD_CHANGELOG_REF) + if change_log: return scn = int(record.get(cls.RECORD_SCN_REF, cls.INITIAL_SCN)) - if not reasonGUID: - reasonGUID = str(uuid.uuid4()).upper() - changeLog = {} - changeLog[reasonGUID] = cls.logForGeneralUpdate(scn, internalID) + if not reason_guid: + reason_guid = str(uuid.uuid4()).upper() + change_log = {} + change_log[reason_guid] = cls.log_for_general_update(scn, internal_id) - record[cls.RECORD_CHANGELOG_REF] = changeLog + record[cls.RECORD_CHANGELOG_REF] = change_log class DemographicsChangeLogProcessor(ChangeLogProcessor): @@ -197,48 +195,48 @@ class DemographicsChangeLogProcessor(ChangeLogProcessor): RECORD_SCN_REF = "serialChangeNumber" @classmethod - def logForDomainUpdate(cls, updateContext, internalID): + def log_for_domain_update(cls, update_context, internal_id): """ Create a change log for this expected change - requires attributes to be set on context object """ - logOfChange = cls.logForGeneralUpdate( - updateContext.pdsRecord.get(cls.RECORD_SCN_REF, cls.INITIAL_SCN), - internalID, - updateContext.responseDetails.get(cls.XSLT), - updateContext.responseDetails.get(cls.RSP_PARAMS), + log_of_change = cls.log_for_general_update( + update_context.pdsRecord.get(cls.RECORD_SCN_REF, cls.INITIAL_SCN), + internal_id, + update_context.responseDetails.get(cls.XSLT), + update_context.responseDetails.get(cls.RSP_PARAMS), ) - logOfChange[cls.SYS_SDS] = updateContext.agentSystem - logOfChange[cls.PRS_SDS] = updateContext.agentPerson - logOfChange[cls.UPDATES] = updateContext.updatesApplied - logOfChange[cls.NOTIFICATIONS] = updateContext.notificationsToQueue - return logOfChange + log_of_change[cls.SYS_SDS] = update_context.agentSystem + log_of_change[cls.PRS_SDS] = update_context.agentPerson + log_of_change[cls.UPDATES] = update_context.updatesApplied + log_of_change[cls.NOTIFICATIONS] = update_context.notificationsToQueue + return log_of_change @staticmethod - def getHighestGpLinksTransactionNumber(changeLog, sender, recipient): + def get_highest_gp_links_transaction_number(change_log, sender, recipient): """ Return the highest GP Links transaction number which has been included in the change log, or None (if there aren't any). """ - maxNumber = -1 + max_number = -1 - gpLinksKeyPattern = re.compile( + gp_links_key_pattern = re.compile( "^{}_{}_[0-9]+_[0-9]+_(?P[0-9]+)$".format( sender.upper(), recipient.upper() ) ) - for key in changeLog.keys(): # noqa: SIM118 - match = gpLinksKeyPattern.match(key) + for key in change_log.keys(): # noqa: SIM118 + match = gp_links_key_pattern.match(key) # Ignore keys which aren't related to GP Links transactions if match is None: continue - transactionNumber = int(match.group("transactionNumber")) - if transactionNumber > maxNumber: - maxNumber = transactionNumber + transaction_number = int(match.group("transactionNumber")) + if transaction_number > max_number: + max_number = transaction_number - return maxNumber + return max_number class PrescriptionsChangeLogProcessor(ChangeLogProcessor): @@ -284,27 +282,27 @@ class PrescriptionsChangeLogProcessor(ChangeLogProcessor): REGEX_ALPHANUMERIC8 = re.compile(r"^[A-Za-z0-9\-]{1,8}$") @classmethod - def logForDomainUpdate(cls, updateContext, internalID): + def log_for_domain_update(cls, update_context, internal_id): """ Create a change log for this expected change - requires attribute to be set on context object """ - logOfChange = cls.logForGeneralUpdate( - updateContext.epsRecord.get_scn(), - internalID, - updateContext.responseDetails.get(cls.XSLT), - updateContext.responseDetails.get(cls.RSP_PARAMS), + log_of_change = cls.log_for_general_update( + update_context.epsRecord.get_scn(), + internal_id, + update_context.responseDetails.get(cls.XSLT), + update_context.responseDetails.get(cls.RSP_PARAMS), ) - logOfChange = updateContext.workDescriptionObject.createInitialEventLog(logOfChange) + log_of_change = update_context.workDescriptionObject.createInitialEventLog(log_of_change) - _instance = ( - str(updateContext.updateInstance) - if updateContext.updateInstance - else str(updateContext.instanceID) + instance = ( + str(update_context.updateInstance) + if update_context.updateInstance + else str(update_context.instanceID) ) - logOfChange[cls.TIME_PREPARED] = updateContext.handleTime.strftime( + log_of_change[cls.TIME_PREPARED] = update_context.handleTime.strftime( TimeFormats.STANDARD_DATE_TIME_FORMAT ) @@ -312,150 +310,99 @@ def logForDomainUpdate(cls, updateContext, internalID): # superceded by the INS_FROM_STATUS and INS_TO_STATUS fields set below. # The only reference to TO_STATUS seems to be in PrescriptionJsonQueryResponse.cfg # template used by the prescription detail view web service - logOfChange[cls.FROM_STATUS] = updateContext.epsRecord.return_previous_prescription_status( - updateContext.instanceID, False + log_of_change[cls.FROM_STATUS] = ( + update_context.epsRecord.return_previous_prescription_status( + update_context.instanceID, False + ) ) - logOfChange[cls.TO_STATUS] = updateContext.epsRecord.return_prescription_status( - updateContext.instanceID, False + log_of_change[cls.TO_STATUS] = update_context.epsRecord.return_prescription_status( + update_context.instanceID, False ) # Event history lines for UI # **** NOTE THAT THESE ARE WRONG, THEY REFER TO THE FINAL ISSUE, WHICH MAY NOT BE THE ISSUE THAT WAS UPDATED - logOfChange[cls.INSTANCE] = _instance - logOfChange[cls.INS_FROM_STATUS] = ( - updateContext.epsRecord.return_previous_prescription_status(_instance, False) + log_of_change[cls.INSTANCE] = instance + log_of_change[cls.INS_FROM_STATUS] = ( + update_context.epsRecord.return_previous_prescription_status(instance, False) + ) + log_of_change[cls.INS_TO_STATUS] = update_context.epsRecord.return_prescription_status( + instance, False ) - logOfChange[cls.INS_TO_STATUS] = updateContext.epsRecord.return_prescription_status( - _instance, False + log_of_change[cls.AGENT_ROLE_PROFILE_CODE_ID] = update_context.agentRoleProfileCodeId + log_of_change[cls.AGENT_PERSON_ROLE] = update_context.agentPersonRole + org_code = update_context.agentOrganization + has_dispenser_code = ( + hasattr(update_context, "dispenserCode") and update_context.dispenserCode ) - logOfChange[cls.AGENT_ROLE_PROFILE_CODE_ID] = updateContext.agentRoleProfileCodeId - logOfChange[cls.AGENT_PERSON_ROLE] = updateContext.agentPersonRole - orgCode = updateContext.agentOrganization - hasDispenserCode = hasattr(updateContext, "dispenserCode") and updateContext.dispenserCode if ( - not orgCode - and hasDispenserCode - and cls.REGEX_ALPHANUMERIC8.match(updateContext.dispenserCode) + not org_code + and has_dispenser_code + and cls.REGEX_ALPHANUMERIC8.match(update_context.dispenserCode) ): - orgCode = updateContext.dispenserCode - logOfChange[cls.AGENT_PERSON_ORG_CODE] = orgCode - - # To help with troubleshooting, the following change entris are added - _preChangeIssueStatuses = updateContext.epsRecord.return_prechange_issue_status_dict() - _postChangeIssueStatuses = updateContext.epsRecord.create_issue_current_status_dict() - logOfChange[cls.PRE_CHANGE_STATUS_DICT] = _preChangeIssueStatuses - logOfChange[cls.POST_CHANGE_STATUS_DICT] = _postChangeIssueStatuses - logOfChange[cls.CHANGED_ISSUES_LIST] = updateContext.epsRecord.return_changed_issue_list( - _preChangeIssueStatuses, _postChangeIssueStatuses, None, updateContext.changedIssuesList + org_code = update_context.dispenserCode + log_of_change[cls.AGENT_PERSON_ORG_CODE] = org_code + + # To help with troubleshooting, the following change entries are added + pre_change_issue_statuses = update_context.epsRecord.return_prechange_issue_status_dict() + post_change_issue_statuses = update_context.epsRecord.create_issue_current_status_dict() + log_of_change[cls.PRE_CHANGE_STATUS_DICT] = pre_change_issue_statuses + log_of_change[cls.POST_CHANGE_STATUS_DICT] = post_change_issue_statuses + log_of_change[cls.CHANGED_ISSUES_LIST] = update_context.epsRecord.return_changed_issue_list( + pre_change_issue_statuses, + post_change_issue_statuses, + None, + update_context.changedIssuesList, ) # To help with troubleshooting, the following currentIssue values are added - logOfChange[cls.PRE_CHANGE_CURRENT_ISSUE] = ( - updateContext.epsRecord.return_prechange_current_issue() + log_of_change[cls.PRE_CHANGE_CURRENT_ISSUE] = ( + update_context.epsRecord.return_prechange_current_issue() ) - logOfChange[cls.POST_CHANGE_CURRENT_ISSUE] = updateContext.epsRecord.current_issue_number - if hasattr(updateContext, cls.TOUCHED) and updateContext.touched: - logOfChange[cls.TOUCHED] = updateContext.touched + log_of_change[cls.POST_CHANGE_CURRENT_ISSUE] = update_context.epsRecord.current_issue_number + if hasattr(update_context, cls.TOUCHED) and update_context.touched: + log_of_change[cls.TOUCHED] = update_context.touched - return logOfChange + return log_of_change @classmethod - def pruneChangeLog(cls, changeLog, prunePoint): + def prune_change_log(cls, change_log, prune_point): """ - Prune if other the prune point Prune the change log where there is a series of change log entries for the same interactionID - and the change is neither recent nor part of the early history - The intention if we get a repeating interaction we don't continue to explode the + The intention is that if we get a repeating interaction we don't continue to explode the changeLog with all the history """ - invertedChangeLog = {} - maxSCN = 0 - for guid, changeLogEntry in changeLog.items(): - _SCN = int(changeLogEntry.get(cls.SCN, cls.INVALID_SCN)) - invertedChangeLog[_SCN] = (guid, changeLogEntry.get(cls.INTERACTION_ID)) - maxSCN = max(maxSCN, _SCN) - - if maxSCN <= prunePoint: + inverted_change_log = {} + max_scn = 0 + for guid, change_log_entry in change_log.items(): + scn = int(change_log_entry.get(cls.SCN, cls.INVALID_SCN)) + inverted_change_log[scn] = (guid, change_log_entry.get(cls.INTERACTION_ID)) + max_scn = max(max_scn, scn) + if max_scn <= prune_point: # Don't make any changes return - _iclSCNKeys = list(invertedChangeLog.keys()) - _iclSCNKeys.sort(reverse=True) - _guidsToPrune = [] - for _iclSCN in _iclSCNKeys: - if _iclSCN > (maxSCN - cls.MIN_RECENTHISTORY) or _iclSCN < cls.MIN_INITIALHISTORY: + icl_scn_keys = list(inverted_change_log.keys()) + icl_scn_keys.sort(reverse=True) + guids_to_prune = [] + for icl_scn in icl_scn_keys: + if icl_scn > (max_scn - cls.MIN_RECENTHISTORY) or icl_scn < cls.MIN_INITIALHISTORY: continue - _thisIntID = invertedChangeLog.get(_iclSCN, (None, None))[1] - (_previousGUID, _previousIntID) = invertedChangeLog.get(_iclSCN - 1, (None, None)) - _oneBeforeIntID = invertedChangeLog.get(_iclSCN - 2, (None, None))[1] + this_int_id = inverted_change_log.get(icl_scn, (None, None))[1] + (previous_guid, previous_int_id) = inverted_change_log.get(icl_scn - 1, (None, None)) + one_before_int_id = inverted_change_log.get(icl_scn - 2, (None, None))[1] if ( - _thisIntID - and _thisIntID in cls.REPEATING_ACTIONS - and _thisIntID == _previousIntID - and _previousIntID == _oneBeforeIntID + this_int_id + and this_int_id in cls.REPEATING_ACTIONS + and this_int_id == previous_int_id + and previous_int_id == one_before_int_id ): - _guidsToPrune.append(_previousGUID) + guids_to_prune.append(previous_guid) - for guid in _guidsToPrune: - del changeLog[guid] + for guid in guids_to_prune: + del change_log[guid] - if len(changeLog) > prunePoint: + if len(change_log) > prune_point: # If we have breached the prune point but can't safely prune - stop before # The un-pruned record becomes an issue raise EpsSystemError(EpsSystemError.SYSTEM_FAILURE) - - -class ClinicalsChangeLogProcessor(ChangeLogProcessor): - """ - Change Log Processor specifically for clinicals patient records - """ - - SYS_SDS = "agentSystemSDS" - PRS_SDS = "agentPerson" - PRUNE_POINT = 48 - - @classmethod - def logForDomainUpdate(cls, updateContext, internalID, interactionID=None): - """ - Create a change log for this expected change - requires attributes to be set on - context object - """ - logOfChange = cls.logForGeneralUpdate( - updateContext.patientRecord.get_scn(), - internalID, - updateContext.responseDetails.get(cls.XSLT), - updateContext.responseDetails.get(cls.RSP_PARAMS), - ) - - logOfChange[cls.TIME_PREPARED] = updateContext.handleTime.strftime( - TimeFormats.STANDARD_DATE_TIME_FORMAT - ) - logOfChange[cls.INTERACTION_ID] = interactionID - logOfChange[cls.SYS_SDS] = updateContext.agentSystem - logOfChange[cls.PRS_SDS] = updateContext.agentPerson - return logOfChange - - @classmethod - def logForNotificationUpdate(cls, interactionID, updateTime, scn, internalID): - """ - Create a change log for this expected change from a notification worker - doesn't use - context and sets a subset of the items used by logForDomainUpdate - """ - logOfChange = cls.logForGeneralUpdate(scn, internalID) - logOfChange[cls.TIME_PREPARED] = updateTime.strftime(TimeFormats.STANDARD_DATE_TIME_FORMAT) - logOfChange[cls.INTERACTION_ID] = interactionID - return logOfChange - - @classmethod - def logForTickleClinicalRecord(cls, updateContext, interactionID, internalID): - """ - Create a change log for this expected change from a notification worker - doesn't use - context and sets a subset of the items used by logForDomainUpdate - """ - logOfChange = cls.logForGeneralUpdate(updateContext.patientRecord.get_scn(), internalID) - logOfChange[cls.TIME_PREPARED] = updateContext.handleTime.strftime( - TimeFormats.STANDARD_DATE_TIME_FORMAT - ) - logOfChange[cls.INTERACTION_ID] = interactionID - logOfChange[cls.SYS_SDS] = "SYSTEM" - return logOfChange diff --git a/tests/common/dynamodb_datastore_test.py b/tests/common/dynamodb_datastore_test.py index b9aa249..d16de2c 100644 --- a/tests/common/dynamodb_datastore_test.py +++ b/tests/common/dynamodb_datastore_test.py @@ -579,6 +579,115 @@ def test_record_expire_at_date_format(self): expire_at = built_record["expireAt"] self.assertEqual(expire_at, expected_timestamp) + def test_record_expire_at_next_activity_delete(self): + """ + Test that the expireAt will be calculated based on the nextActivityDate + when a record has a next activity of delete. + """ + prescription_id, nhs_number = self.get_new_record_keys() + + record = self.get_record(nhs_number, "20260101101112") + record["indexes"]["nextActivityNAD_bin"] = ["delete_20260101"] + + expected_timestamp = int( + datetime(year=2027, month=1, day=1, tzinfo=timezone.utc).timestamp() + ) + + built_record = self.datastore.build_record(prescription_id, record, None, None) + + expire_at = built_record["expireAt"] + self.assertEqual(expire_at, expected_timestamp) + + def test_record_expire_at_next_activity_purge(self): + """ + Test that the expireAt will be calculated based on the nextActivityDate + when a record has a next activity of purge. + """ + prescription_id, nhs_number = self.get_new_record_keys() + + record = self.get_record(nhs_number, "20260101101112") + record["indexes"]["nextActivityNAD_bin"] = ["purge_20260101"] + + expected_timestamp = int( + datetime(year=2026, month=1, day=1, tzinfo=timezone.utc).timestamp() + ) + + built_record = self.datastore.build_record(prescription_id, record, None, None) + + expire_at = built_record["expireAt"] + self.assertEqual(expire_at, expected_timestamp) + + def test_record_expire_at_no_date_element(self): + """ + Test that the expireAt will be set to default when a record has a next activity of purge + but no date element in nextActivityNAD_bin. + """ + prescription_id, nhs_number = self.get_new_record_keys() + + date_time = datetime( + year=2025, + month=9, + day=11, + hour=10, + minute=11, + second=12, + microsecond=123456, + tzinfo=timezone.utc, + ) + date_string = datetime.strftime(date_time, TimeFormats.STANDARD_DATE_FORMAT) + record = self.get_record(nhs_number, date_string) + record["indexes"]["nextActivityNAD_bin"] = ["purge"] + + expected_timestamp = int( + datetime(year=2027, month=3, day=11, tzinfo=timezone.utc).timestamp() + ) + + built_record = self.datastore.build_record(prescription_id, record, None, None) + + expire_at = built_record["expireAt"] + self.assertEqual(expire_at, expected_timestamp) + + def test_record_expire_at_max_date(self): + """ + Test that the expireAt will be set to default when a record has a next activity of purge + but the date element in nextActivityNAD_bin is the max date of 99991231. + """ + prescription_id, nhs_number = self.get_new_record_keys() + + date_time = datetime( + year=2025, + month=9, + day=11, + hour=10, + minute=11, + second=12, + microsecond=123456, + tzinfo=timezone.utc, + ) + date_string = datetime.strftime(date_time, TimeFormats.STANDARD_DATE_FORMAT) + record = self.get_record(nhs_number, date_string) + record["indexes"]["nextActivityNAD_bin"] = ["purge_99991231"] + + expected_timestamp = int( + datetime(year=2027, month=3, day=11, tzinfo=timezone.utc).timestamp() + ) + + built_record = self.datastore.build_record(prescription_id, record, None, None) + + expire_at = built_record["expireAt"] + self.assertEqual(expire_at, expected_timestamp) + + @parameterized.expand([("delete_20260101", "delete", "20260101"), ("delete", "delete", None)]) + def test_parse_next_activity_nad(self, next_activity_nad, expected_activity, expected_date): + """ + Test parsing nextActivityNAD_bin to obtain nextActivity and nextActivityDate. + """ + indexes = {"nextActivityNAD_bin": [next_activity_nad]} + next_activity, _, next_activity_date = self.datastore.parse_next_activity_nad(indexes) + + self.assertEqual(next_activity, expected_activity) + self.assertEqual(next_activity_date, expected_date) + def test_document_expire_at(self): """ Test that the expireAt attribute added to a document diff --git a/tests/common/dynamodb_index_test.py b/tests/common/dynamodb_index_test.py index 757bc7c..a253ea1 100644 --- a/tests/common/dynamodb_index_test.py +++ b/tests/common/dynamodb_index_test.py @@ -138,6 +138,22 @@ def test_build_terms_with_regex(self): self.assertEqual(len(terms), 1) + def test_build_terms_with_cleared_indexes(self): + """ + Test that items with cleared indexes attributes (only nextActivityNAD_bin) aren't included by buildTerms. + """ + items = [ + { + Key.PK.name: self.generate_prescription_id(), + ProjectedAttribute.INDEXES.name: { + indexes.INDEX_NEXTACTIVITY.lower(): ["purge_20260101"] + }, + } + ] + terms = self.datastore.indexes.build_terms(items, indexes.INDEX_NHSNUMBER_DATE, None) + + self.assertEqual(len(terms), 0) + def test_return_terms_by_nhs_number_date(self): """ Test querying against the nhsNumberDate index and returning nhsNumberDate records. diff --git a/tests/nhsfundamentals/timeutilities_test.py b/tests/nhsfundamentals/timeutilities_test.py index 07c0380..016ec77 100644 --- a/tests/nhsfundamentals/timeutilities_test.py +++ b/tests/nhsfundamentals/timeutilities_test.py @@ -6,9 +6,9 @@ from eps_spine_shared.nhsfundamentals.timeutilities import ( TimeFormats, - _guessCommonDateTimeFormat, - convertSpineDate, - timeNowAsString, + _guess_common_datetime_format, + convert_spine_date, + time_now_as_string, ) @@ -25,13 +25,13 @@ class TimeUtilitiesTests(TestCase): ("gmt_start", "2021-10-31 02:00:00", "20211031020000"), ] ) - def testTimeNowAsString(self, _, utcNow, expected): + def test_time_now_as_string(self, _, utc_now, expected): """ - Check timeNowAsString returns standard spine format by default matching UTC time. + Check time_now_as_string returns standard spine format by default matching UTC time. """ - with mock.patch("eps_spine_shared.nhsfundamentals.timeutilities.now") as mockNow: - mockNow.return_value = datetime.strptime(utcNow, "%Y-%m-%d %H:%M:%S") - result = timeNowAsString() + with mock.patch("eps_spine_shared.nhsfundamentals.timeutilities.now") as mock_now: + mock_now.return_value = datetime.strptime(utc_now, "%Y-%m-%d %H:%M:%S") + result = time_now_as_string() self.assertEqual(expected, result) @parameterized.expand( @@ -55,64 +55,64 @@ def testTimeNowAsString(self, _, utcNow, expected): ("other", "202", None), ] ) - def testGuessCommonDateTimeFormat_Default(self, _, timeString, expected): + def test_guess_common_datetime_format_default(self, _, time_string, expected): """ Check time format determined from date time string using default settings """ - result = _guessCommonDateTimeFormat(timeString) + result = _guess_common_datetime_format(time_string) self.assertEqual(expected, result) - def testGuessCommonDateTimeFormat_NoneIfUnknown(self): + def test_guess_common_datetime_format_none_if_unknown(self): """ Check time format determined from date time string specifying to return none if could not be determined """ - result = _guessCommonDateTimeFormat("202", False) + result = _guess_common_datetime_format("202", False) self.assertIsNone(result) - def testGuessCommonDateTimeFormat_ErrorIfUnknown_FormatUnknown(self): + def test_guess_common_datetime_format_error_if_unknown_format_unknown(self): """ Check time format determined from date time string with an unknown format, with raise error true """ with self.assertRaises(ValueError): - _ = _guessCommonDateTimeFormat("202", True) + _ = _guess_common_datetime_format("202", True) - def testGuessCommonDateTimeFormat_ErrorIfUnknown_FormatKnown(self): + def test_guess_common_datetime_format_error_if_unknown_format_known(self): """ Check time format determined from date time string with a known format, with raise error true """ - result = _guessCommonDateTimeFormat("2020", True) + result = _guess_common_datetime_format("2020", True) self.assertEqual(TimeFormats.STANDARD_DATE_FORMAT_YEAR_ONLY, result) class DateFormatTest(TestCase): """ - There is a safety method called convertSpineDate which will convert a date string if + There is a safety method called convert_spine_date which will convert a date string if there is doubt over the actual format being used """ - def _formatTester(self, dateFormat, withFormat=False): + def _format_tester(self, date_format, with_format=False): """ Test the format of a date """ - _now = datetime.now() - _nowAsString = _now.strftime(dateFormat) - if withFormat: - _newNow = convertSpineDate(_nowAsString, dateFormat) + now = datetime.now() + now_as_string = now.strftime(date_format) + if with_format: + new_now = convert_spine_date(now_as_string, date_format) else: - _newNow = convertSpineDate(_nowAsString) + new_now = convert_spine_date(now_as_string) - if _newNow > _now: - return _newNow - _now - return _now - _newNow + if new_now > now: + return new_now - now + return now - new_now - def testEbxml(self): + def test_ebxml(self): """ TimeFormats.EBXML_FORMAT """ - delta = self._formatTester(TimeFormats.EBXML_FORMAT) + delta = self._format_tester(TimeFormats.EBXML_FORMAT) self.assertLessEqual(delta.seconds, 1) - def testStandardUTC(self): + def test_standard_utc(self): """ STANDARD_DATE_TIME_UTC_ZONE_FORMAT = '%Y%m%d%H%M%S+0000' STANDARD_DATE_TIME_FORMAT = '%Y%m%d%H%M%S' @@ -125,40 +125,40 @@ def testStandardUTC(self): DAY_MONTH_YEAR_WITH_SLASHES_FORMAT = '%d/%m/%Y' TWO_DIGIT_YEAR_AND_WEEK_FORMAT = '%y%W' """ - delta = self._formatTester(TimeFormats.STANDARD_DATE_TIME_UTC_ZONE_FORMAT) + delta = self._format_tester(TimeFormats.STANDARD_DATE_TIME_UTC_ZONE_FORMAT) self.assertLessEqual(delta.seconds, 1) - def testStandardDT(self): + def test_standard_dt(self): """ The value of STANDARD_DATE_TIME_FORMAT = '%Y%m%d%H%M%S' """ - delta = self._formatTester(TimeFormats.STANDARD_DATE_TIME_FORMAT) + delta = self._format_tester(TimeFormats.STANDARD_DATE_TIME_FORMAT) self.assertLessEqual(delta.seconds, 1) - def testStandardDTMS(self): + def test_standard_dt_ms(self): """ The value of SPINE_DATETIME_MS_FORMAT = '%Y%m%d%H%M%S.%f' """ - delta = self._formatTester(TimeFormats.SPINE_DATETIME_MS_FORMAT) + delta = self._format_tester(TimeFormats.SPINE_DATETIME_MS_FORMAT) self.assertLessEqual(delta.seconds, 1) - def testStandardHL7(self): + def test_standard_hl7(self): """ The value of HL7_DATETIME_FORMAT = '%Y%m%dT%H%M%S.%f' """ - delta = self._formatTester(TimeFormats.HL7_DATETIME_FORMAT) + delta = self._format_tester(TimeFormats.HL7_DATETIME_FORMAT) self.assertLessEqual(delta.seconds, 1) - def testStandardDate(self): + def test_standard_date(self): """ The value of SPINE_DATE_FORMAT = '%Y%m%d' """ - delta = self._formatTester(TimeFormats.SPINE_DATE_FORMAT) + delta = self._format_tester(TimeFormats.SPINE_DATE_FORMAT) self.assertLessEqual(delta.days, 1) - def testStandardDT_withFormat(self): + def test_standard_dt_with_format(self): """ The value of STANDARD_DATE_TIME_FORMAT = '%Y%m%d%H%M%S' """ - delta = self._formatTester(TimeFormats.STANDARD_DATE_TIME_FORMAT, True) + delta = self._format_tester(TimeFormats.STANDARD_DATE_TIME_FORMAT, True) self.assertLessEqual(delta.seconds, 1) diff --git a/tests/spinecore/changelog_test.py b/tests/spinecore/changelog_test.py index c93be6f..9dd3e3b 100644 --- a/tests/spinecore/changelog_test.py +++ b/tests/spinecore/changelog_test.py @@ -1,7 +1,3 @@ -""" -Created on 11 Feb 2014 -""" - import copy import sys import unittest @@ -26,208 +22,229 @@ class ChangeLogProcessorTest(unittest.TestCase): Tests for the ChangeLogProcessor """ - def testGeneralLogEntry_Empty(self): + def test_general_log_entry_empty(self): """ test producing a general log with empty inputs """ - logOfChange = ChangeLogProcessor.logForGeneralUpdate(1) - del logOfChange["Timestamp"] + log_of_change = ChangeLogProcessor.log_for_general_update(1) + del log_of_change["Timestamp"] - _expectedLog = {} - _expectedLog["SCN"] = 1 - _expectedLog["InternalID"] = None - _expectedLog["Source XSLT"] = None - _expectedLog["Response Parameters"] = {} - self.assertEqual(logOfChange, _expectedLog) + expected_log = {} + expected_log["SCN"] = 1 + expected_log["InternalID"] = None + expected_log["Source XSLT"] = None + expected_log["Response Parameters"] = {} + self.assertEqual(log_of_change, expected_log) - def testPruningOfChangeLog(self): + def test_pruning_of_change_log(self): """ Add a new entry into change log and show it is correctly pruned """ - _expectedChangeLog = copy.copy(CHANGE_LOG_TO_PRUNE) - + expected_change_log = copy.copy(CHANGE_LOG_TO_PRUNE) record = {} record["changeLog"] = copy.copy(CHANGE_LOG_TO_PRUNE) - newLog = {"SCN": 12, "InternalID": "INTERNALID"} - _newRecord = ChangeLogProcessor.updateChangeLog(record, newLog, "GUID9", 6) - _newChangeLog = _newRecord["changeLog"] + new_log = {"SCN": 12, "InternalID": "INTERNALID"} + new_record = ChangeLogProcessor.update_change_log(record, new_log, "GUID9", 6) + new_change_log = new_record["changeLog"] - del _expectedChangeLog["GUID1"] - del _expectedChangeLog["GUID2"] - del _expectedChangeLog["GUID3"] - del _expectedChangeLog["GUID5"] - _expectedChangeLog["GUID9"] = newLog + del expected_change_log["GUID1"] + del expected_change_log["GUID2"] + del expected_change_log["GUID3"] + del expected_change_log["GUID5"] + expected_change_log["GUID9"] = new_log - self.assertDictEqual(_newChangeLog, _expectedChangeLog) + self.assertDictEqual(new_change_log, expected_change_log) - def testNotPruningOfChangeLog(self): + def test_not_pruning_of_change_log(self): """ Add a new entry into change log and show that when DO_NOT_PRUNE is used it does not prune """ - _expectedChangeLog = copy.copy(CHANGE_LOG_TO_PRUNE) - + expected_change_log = copy.copy(CHANGE_LOG_TO_PRUNE) record = {} record["changeLog"] = copy.copy(CHANGE_LOG_TO_PRUNE) - newLog = {"SCN": 12, "InternalID": "INTERNALID"} - _newRecord = ChangeLogProcessor.updateChangeLog( - record, newLog, "GUID9", ChangeLogProcessor.DO_NOT_PRUNE + new_log = {"SCN": 12, "InternalID": "INTERNALID"} + new_record = ChangeLogProcessor.update_change_log( + record, new_log, "GUID9", ChangeLogProcessor.DO_NOT_PRUNE ) - _newChangeLog = _newRecord["changeLog"] + new_change_log = new_record["changeLog"] - _expectedChangeLog["GUID9"] = newLog + expected_change_log["GUID9"] = new_log - self.assertDictEqual(_newChangeLog, _expectedChangeLog) + self.assertDictEqual(new_change_log, expected_change_log) - def testHighestSCN(self): + def test_highest_scn(self): """ test highest guid and scn returned """ - (guid, scn) = ChangeLogProcessor.getHighestSCN(CHANGE_LOG_TO_PRUNE) + (guid, scn) = ChangeLogProcessor.get_highest_scn(CHANGE_LOG_TO_PRUNE) self.assertEqual(guid, "GUID8") self.assertEqual(scn, 10) record = {} record["changeLog"] = copy.copy(CHANGE_LOG_TO_PRUNE) - newLog = {"SCN": 12, "InternalID": "INTERNALID"} - _newRecord = ChangeLogProcessor.updateChangeLog(record, newLog, "GUID9", 6) - _newChangeLog = _newRecord["changeLog"] + new_log = {"SCN": 12, "InternalID": "INTERNALID"} + new_record = ChangeLogProcessor.update_change_log(record, new_log, "GUID9", 6) + new_change_log = new_record["changeLog"] - (guid, scn) = ChangeLogProcessor.getHighestSCN(_newChangeLog) + (guid, scn) = ChangeLogProcessor.get_highest_scn(new_change_log) self.assertEqual(guid, "GUID9") self.assertEqual(scn, 12) - def testGetSCN(self): + def test_get_scn(self): """ test return of SCN from changeLog entry """ - changeLogEntry = {"SCN": 1} - scn = ChangeLogProcessor.getSCN(changeLogEntry) + change_log_entry = {"SCN": 1} + scn = ChangeLogProcessor.get_scn(change_log_entry) self.assertEqual(scn, 1) - changeLogEntry = {"SCN": "1"} - scn = ChangeLogProcessor.getSCN(changeLogEntry) + + change_log_entry = {"SCN": "1"} + scn = ChangeLogProcessor.get_scn(change_log_entry) self.assertEqual(scn, 1) - changeLogEntry = {} - scn = ChangeLogProcessor.getSCN(changeLogEntry) + + change_log_entry = {} + scn = ChangeLogProcessor.get_scn(change_log_entry) self.assertEqual(scn, ChangeLogProcessor.INVALID_SCN) - changeLogEntry = {"SCN": sys.maxsize} - scn = ChangeLogProcessor.getSCN(changeLogEntry) + + change_log_entry = {"SCN": sys.maxsize} + scn = ChangeLogProcessor.get_scn(change_log_entry) self.assertEqual(scn, sys.maxsize) - def testListSCNs(self): + def test_list_scns(self): """ test the return of the list of SCNs present in a changeLog """ - changeLog = {"ABCD": {"SCN": 1}, "EFGH": {"SCN": 2}, "IJKL": {"SCN": 3}} - scnList = sorted(ChangeLogProcessor.listSCNs(changeLog)) - self.assertEqual(scnList, [1, 2, 3]) + change_log = {"ABCD": {"SCN": 1}, "EFGH": {"SCN": 2}, "IJKL": {"SCN": 3}} + scn_list = sorted(ChangeLogProcessor.list_scns(change_log)) + self.assertEqual(scn_list, [1, 2, 3]) - changeLog = {} - scnList = ChangeLogProcessor.listSCNs(changeLog) - scnList.sort() - self.assertEqual(scnList, []) + change_log = {} + scn_list = ChangeLogProcessor.list_scns(change_log) + scn_list.sort() + self.assertEqual(scn_list, []) - changeLog = {"ABCD": {}} - scnList = ChangeLogProcessor.listSCNs(changeLog) - scnList.sort() - self.assertEqual(scnList, [ChangeLogProcessor.INVALID_SCN]) + change_log = {"ABCD": {}} + scn_list = ChangeLogProcessor.list_scns(change_log) + scn_list.sort() + self.assertEqual(scn_list, [ChangeLogProcessor.INVALID_SCN]) - def testGetMaxSCN(self): + def test_get_max_scn(self): """ Test retrieval of the highest SCN from changeLog """ - changeLog = {"ABCD": {"SCN": 1}, "IJKL": {"SCN": 3}, "EFGH": {"SCN": 2}} - highestSCN = ChangeLogProcessor.getMaxSCN(changeLog) - self.assertEqual(highestSCN, 3) - - changeLog = {"ABCD": {"SCN": 1}, "EFGH": {"SCN": 2}, "IJKL": {"SCN": 3}, "ZZZZ": {"SCN": 3}} - highestSCN = ChangeLogProcessor.getMaxSCN(changeLog) - self.assertEqual(highestSCN, 3) - - changeLog = {"ABCD": {}} - highestSCN = ChangeLogProcessor.getMaxSCN(changeLog) - self.assertEqual(highestSCN, ChangeLogProcessor.INVALID_SCN) - - def testGetAllGuidsForSCN(self): + change_log = {"ABCD": {"SCN": 1}, "IJKL": {"SCN": 3}, "EFGH": {"SCN": 2}} + highest_scn = ChangeLogProcessor.get_max_scn(change_log) + self.assertEqual(highest_scn, 3) + + change_log = { + "ABCD": {"SCN": 1}, + "EFGH": {"SCN": 2}, + "IJKL": {"SCN": 3}, + "ZZZZ": {"SCN": 3}, + } + highest_scn = ChangeLogProcessor.get_max_scn(change_log) + self.assertEqual(highest_scn, 3) + + change_log = {"ABCD": {}} + highest_scn = ChangeLogProcessor.get_max_scn(change_log) + self.assertEqual(highest_scn, ChangeLogProcessor.INVALID_SCN) + + def test_get_all_guids_for_scn(self): """ test retrieval of list of GUIDS that are keys for changelog entries which have a particular SCN """ - changeLog = {"ABCD": {"SCN": 1}, "EFGH": {"SCN": 2}, "IJKL": {"SCN": 3}, "ZZZZ": {"SCN": 3}} - guidList = sorted(ChangeLogProcessor.getAllGuidsForSCN(changeLog, 1)) - self.assertEqual(guidList, ["ABCD"]) - - guidList = ChangeLogProcessor.getAllGuidsForSCN(changeLog, 3) - guidList.sort() - self.assertEqual(guidList, ["IJKL", "ZZZZ"]) - - guidList = ChangeLogProcessor.getAllGuidsForSCN(changeLog, "3") - guidList.sort() - self.assertEqual(guidList, ["IJKL", "ZZZZ"]) - - guidList = ChangeLogProcessor.getAllGuidsForSCN(changeLog, "7") - guidList.sort() - self.assertEqual(guidList, []) - - def testGetMaxSCNGuids(self): + change_log = { + "ABCD": {"SCN": 1}, + "EFGH": {"SCN": 2}, + "IJKL": {"SCN": 3}, + "ZZZZ": {"SCN": 3}, + } + guid_list = sorted(ChangeLogProcessor.get_all_guids_for_scn(change_log, 1)) + self.assertEqual(guid_list, ["ABCD"]) + + guid_list = ChangeLogProcessor.get_all_guids_for_scn(change_log, 3) + guid_list.sort() + self.assertEqual(guid_list, ["IJKL", "ZZZZ"]) + + guid_list = ChangeLogProcessor.get_all_guids_for_scn(change_log, "3") + guid_list.sort() + self.assertEqual(guid_list, ["IJKL", "ZZZZ"]) + + guid_list = ChangeLogProcessor.get_all_guids_for_scn(change_log, "7") + guid_list.sort() + self.assertEqual(guid_list, []) + + def test_get_max_scn_guids(self): """ test retrieval of all GUIDS that have the highest SCN in the changeLog entry """ - changeLog = {"ABCD": {"SCN": 1}, "IJKL": {"SCN": 3}, "EFGH": {"SCN": 2}} - guidList = sorted(ChangeLogProcessor.getMaxSCNGuids(changeLog)) - self.assertEqual(guidList, ["IJKL"]) - - changeLog = {"ABCD": {"SCN": 1}, "EFGH": {"SCN": 2}, "IJKL": {"SCN": 3}, "ZZZZ": {"SCN": 3}} - guidList = ChangeLogProcessor.getMaxSCNGuids(changeLog) - guidList.sort() - self.assertEqual(guidList, ["IJKL", "ZZZZ"]) - - changeLog = {"ABCD": {}, "EFGH": {}} - guidList = ChangeLogProcessor.getMaxSCNGuids(changeLog) - guidList.sort() - self.assertEqual(guidList, ["ABCD", "EFGH"]) - - changeLog = {"ABCD": {}, "EFGH": {}, "IJKL": {"SCN": 3}} - guidList = ChangeLogProcessor.getMaxSCNGuids(changeLog) - guidList.sort() - self.assertEqual(guidList, ["IJKL"]) - - changeLog = {} - guidList = ChangeLogProcessor.getMaxSCNGuids(changeLog) - self.assertEqual(guidList, []) - - def testGetAllGuids(self): + change_log = {"ABCD": {"SCN": 1}, "IJKL": {"SCN": 3}, "EFGH": {"SCN": 2}} + guid_list = sorted(ChangeLogProcessor.get_max_scn_guids(change_log)) + self.assertEqual(guid_list, ["IJKL"]) + + change_log = { + "ABCD": {"SCN": 1}, + "EFGH": {"SCN": 2}, + "IJKL": {"SCN": 3}, + "ZZZZ": {"SCN": 3}, + } + guid_list = ChangeLogProcessor.get_max_scn_guids(change_log) + guid_list.sort() + self.assertEqual(guid_list, ["IJKL", "ZZZZ"]) + + change_log = {"ABCD": {}, "EFGH": {}} + guid_list = ChangeLogProcessor.get_max_scn_guids(change_log) + guid_list.sort() + self.assertEqual(guid_list, ["ABCD", "EFGH"]) + + change_log = {"ABCD": {}, "EFGH": {}, "IJKL": {"SCN": 3}} + guid_list = ChangeLogProcessor.get_max_scn_guids(change_log) + guid_list.sort() + self.assertEqual(guid_list, ["IJKL"]) + + change_log = {} + guid_list = ChangeLogProcessor.get_max_scn_guids(change_log) + self.assertEqual(guid_list, []) + + def test_get_all_guids(self): """ test getting the list of all GUID keys for a changeLog """ - changeLog = {"ABCD": {"SCN": 1}, "EFGH": {"SCN": 2}, "IJKL": {"SCN": 3}, "ZZZZ": {"SCN": 3}} - guidList = sorted(ChangeLogProcessor.getAllGuids(changeLog)) - self.assertEqual(guidList, ["ABCD", "EFGH", "IJKL", "ZZZZ"]) - - changeLog = {"ABCD": {}, "EFGH": {}} - guidList = ChangeLogProcessor.getAllGuids(changeLog) - guidList.sort() - self.assertEqual(guidList, ["ABCD", "EFGH"]) - - changeLog = {} - guidList = ChangeLogProcessor.getAllGuids(changeLog) - self.assertEqual(guidList, []) - - def testSettingInitialChangeLogOnDataMigration(self): + change_log = { + "ABCD": {"SCN": 1}, + "EFGH": {"SCN": 2}, + "IJKL": {"SCN": 3}, + "ZZZZ": {"SCN": 3}, + } + guid_list = sorted(ChangeLogProcessor.get_all_guids(change_log)) + self.assertEqual(guid_list, ["ABCD", "EFGH", "IJKL", "ZZZZ"]) + + change_log = {"ABCD": {}, "EFGH": {}} + guid_list = ChangeLogProcessor.get_all_guids(change_log) + guid_list.sort() + self.assertEqual(guid_list, ["ABCD", "EFGH"]) + + change_log = {} + guid_list = ChangeLogProcessor.get_all_guids(change_log) + self.assertEqual(guid_list, []) + + def test_setting_initial_change_log_on_data_migration(self): """ Set an initial change log onto a record which does not have one """ record = {} - internalID = "INTERNALID" - reasonGUID = "DataMigration" - ChangeLogProcessor.setInitialChangeLog(record, internalID, reasonGUID) + internal_id = "INTERNALID" + reason_guid = "DataMigration" + ChangeLogProcessor.set_initial_change_log(record, internal_id, reason_guid) - _changeLog = record[ChangeLogProcessor.RECORD_CHANGELOG_REF] - del _changeLog["DataMigration"]["Timestamp"] + change_log = record[ChangeLogProcessor.RECORD_CHANGELOG_REF] + del change_log["DataMigration"]["Timestamp"] self.assertDictEqual( - _changeLog, + change_log, { "DataMigration": { "SCN": 1, @@ -270,15 +287,15 @@ class PrescriptionChangeLogProcessorTest(unittest.TestCase): Tests for the ChangeLogProcessor """ - def testPrunePrescriptionChangeLog(self): + def test_prune_prescription_change_log(self): """ Prune the record as expected """ - _changeLog = copy.copy(PR_CHANGE_LOG_TO_PRUNE) + change_log = copy.copy(PR_CHANGE_LOG_TO_PRUNE) - PrescriptionsChangeLogProcessor.pruneChangeLog(_changeLog, 80) + PrescriptionsChangeLogProcessor.prune_change_log(change_log, 80) - _presentGUIDs = [ + present_guids = [ "GUID1", "GUID2", "GUID3", @@ -294,27 +311,26 @@ def testPrunePrescriptionChangeLog(self): "GUIDZ", ] - for guid in _presentGUIDs: - self.assertIn(guid, list(_changeLog.keys())) + for guid in present_guids: + self.assertIn(guid, list(change_log.keys())) - self.assertEqual(len(_presentGUIDs), len(list(_changeLog.keys()))) + self.assertEqual(len(present_guids), len(list(change_log.keys()))) - def testPrunePrescriptionChangeLog_HghPrunePoint(self): + def test_prune_prescription_change_log_high_prune_point(self): """ Increase the prune point, and confirm now no pruning """ - _changeLog = copy.copy(PR_CHANGE_LOG_TO_PRUNE) - - PrescriptionsChangeLogProcessor.pruneChangeLog(_changeLog, 180) - self.assertDictEqual(_changeLog, PR_CHANGE_LOG_TO_PRUNE) + change_log = copy.copy(PR_CHANGE_LOG_TO_PRUNE) + PrescriptionsChangeLogProcessor.prune_change_log(change_log, 180) + self.assertDictEqual(change_log, PR_CHANGE_LOG_TO_PRUNE) - def testUnprunableChangeLog(self): + def test_unprunable_change_log(self): """ Make the change log unprunable below the prune point """ - _changeLog = copy.copy(PR_CHANGE_LOG_TO_PRUNE) + change_log = copy.copy(PR_CHANGE_LOG_TO_PRUNE) for scn in range(100, 200): - _changeLog["GUID" + str(scn)] = {"SCN": scn, "interactionID": "PORX_IN090101UK09"} + change_log["GUID" + str(scn)] = {"SCN": scn, "interactionID": "PORX_IN090101UK09"} with self.assertRaises(EpsSystemError): - PrescriptionsChangeLogProcessor.pruneChangeLog(_changeLog, 50) + PrescriptionsChangeLogProcessor.prune_change_log(change_log, 50)