1. import json
  2. import datetime
  3. def attach_metadata(entry, metadata_source, override_values=None):
  4. """
  5. Attaches metadata to an entry, allowing for manual overrides.
  6. Args:
  7. entry (dict): The entry dictionary to which metadata will be attached.
  8. metadata_source (dict): A dictionary containing the metadata to be attached.
  9. override_values (dict, optional): A dictionary of metadata keys and their
  10. manual override values. Defaults to None.
  11. Returns:
  12. dict: The entry dictionary with attached metadata. Returns the original entry if metadata_source is empty.
  13. """
  14. if not metadata_source:
  15. return entry
  16. # Merge metadata, prioritizing overrides
  17. merged_metadata = metadata_source.copy()
  18. if override_values:
  19. merged_metadata.update(override_values) # Overwrite with provided values
  20. # Add timestamp
  21. merged_metadata["timestamp"] = datetime.datetime.now().isoformat()
  22. # Attach metadata to the entry
  23. entry.update(merged_metadata)
  24. return entry
  25. if __name__ == '__main__':
  26. # Example Usage
  27. entry = {"id": 123, "data": "some data"}
  28. metadata_source = {
  29. "source": "system",
  30. "user": "john.doe",
  31. "environment": "production"
  32. }
  33. override_values = {
  34. "user": "jane.doe", # Override user
  35. "environment": "staging" # Override environment
  36. }
  37. updated_entry = attach_metadata(entry, metadata_source, override_values)
  38. print(json.dumps(updated_entry, indent=4))
  39. # Example with no overrides
  40. entry2 = {"id": 456, "data": "another data"}
  41. metadata_source2 = {
  42. "source": "system",
  43. "user": "john.doe",
  44. "environment": "production"
  45. }
  46. updated_entry2 = attach_metadata(entry2, metadata_source2)
  47. print(json.dumps(updated_entry2, indent=4))
  48. #Example with empty metadata source
  49. entry3 = {"id": 789, "data": "yet another data"}
  50. metadata_source3 = {}
  51. updated_entry3 = attach_metadata(entry3, metadata_source3)
  52. print(json.dumps(updated_entry3, indent=4))

Add your comment