1. import json
  2. import datetime
  3. def aggregate_user_input(input_file, schedule_file):
  4. """
  5. Aggregates user input from an input file based on a schedule defined in a schedule file.
  6. Handles edge cases like missing data, invalid dates, and non-numeric input.
  7. Args:
  8. input_file (str): Path to the file containing user input data (JSON format).
  9. schedule_file (str): Path to the file containing the schedule (JSON format).
  10. Returns:
  11. dict: A dictionary containing aggregated data, where keys are schedule entries
  12. and values are dictionaries of aggregated user inputs.
  13. Returns an empty dictionary if errors occur.
  14. """
  15. try:
  16. with open(input_file, 'r') as f:
  17. user_data = json.load(f)
  18. with open(schedule_file, 'r') as f:
  19. schedule = json.load(f)
  20. except FileNotFoundError as e:
  21. print(f"Error: File not found: {e}")
  22. return {}
  23. except json.JSONDecodeError as e:
  24. print(f"Error: Invalid JSON format: {e}")
  25. return {}
  26. aggregated_data = {}
  27. for schedule_entry in schedule:
  28. schedule_time = datetime.datetime.strptime(schedule_entry['time'], '%Y-%m-%d %H:%M:%S') #parse schedule time
  29. aggregated_data[schedule_entry['id']] = {} #initialize dictionary for each schedule entry
  30. for user_id, input_value in user_data.items():
  31. try:
  32. #check if user_id exists in the scheduled entries
  33. if user_id in schedule_entry['users']:
  34. #check if the date matches the schedule time
  35. if datetime.datetime.strptime(user_id['date'], '%Y-%m-%d') == schedule_time.date():
  36. aggregated_data[schedule_entry['id']][user_id['key']] = float(input_value) #convert input to float
  37. except (ValueError, TypeError) as e:
  38. print(f"Warning: Invalid data for user {user_id}: {e}")
  39. continue # Skip to the next user
  40. return aggregated_data

Add your comment