diff options
Diffstat (limited to 'prediction_storage.py')
| -rw-r--r-- | prediction_storage.py | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/prediction_storage.py b/prediction_storage.py new file mode 100644 index 0000000..95688c9 --- /dev/null +++ b/prediction_storage.py @@ -0,0 +1,151 @@ +# prediction_storage.py
+import json
+import uuid
+from datetime import datetime
+
+class Prediction:
+ def __init__(self, launch_date, duration_years, predicted_date, keyword_args=None, prediction_id=None, timestamp=None, description=None):
+ self.id = prediction_id if prediction_id else str(uuid.uuid4())
+ self.launch_date = launch_date if isinstance(launch_date, datetime) else datetime.fromisoformat(launch_date)
+ self.duration_years = duration_years
+ self.predicted_date = predicted_date if isinstance(predicted_date, datetime) else datetime.fromisoformat(predicted_date)
+ self.keyword_args = keyword_args or {}
+ self.timestamp = timestamp if timestamp else datetime.now()
+ self.description = description
+
+ def to_dict(self):
+ # Convert keyword_args dates to ISO format strings
+ serialized_keyword_args = {}
+ for keyword, events in self.keyword_args.items():
+ serialized_events = []
+ for event in events:
+ # Assuming each event is a tuple of (start_date, end_date, id)
+ start_date = event[0].isoformat() if isinstance(event[0], datetime) else event[0]
+ end_date = event[1].isoformat() if isinstance(event[1], datetime) else event[1]
+ serialized_events.append((start_date, end_date, event[2]))
+ serialized_keyword_args[keyword] = serialized_events
+
+ return {
+ 'id': self.id,
+ 'launch_date': self.launch_date.isoformat(),
+ 'duration_years': self.duration_years,
+ 'predicted_date': self.predicted_date.isoformat(),
+ 'keyword_args': serialized_keyword_args,
+ 'timestamp': self.timestamp.isoformat(),
+ 'description': self.description
+ }
+
+ @classmethod
+ def from_dict(cls, data):
+ # Deserialize keyword_args dates from ISO format strings
+ keyword_args = {}
+ for keyword, events in data.get('keyword_args', {}).items():
+ deserialized_events = []
+ for event in events:
+ # Convert string dates back to datetime objects
+ start_date = datetime.fromisoformat(event[0]) if isinstance(event[0], str) else event[0]
+ end_date = datetime.fromisoformat(event[1]) if isinstance(event[1], str) else event[1]
+ deserialized_events.append((start_date, end_date, event[2]))
+ keyword_args[keyword] = deserialized_events
+
+ return cls(
+ launch_date=data['launch_date'],
+ duration_years=data['duration_years'],
+ predicted_date=data['predicted_date'],
+ keyword_args=keyword_args,
+ prediction_id=data['id'],
+ timestamp=datetime.fromisoformat(data['timestamp']),
+ description=data.get('description')
+ )
+
+ def __repr__(self):
+ return (f"Prediction(id={self.id}, launch_date={self.launch_date}, "
+ f"duration_years={self.duration_years}, predicted_date={self.predicted_date}, "
+ f"timestamp={self.timestamp}, description={repr(self.description)})")
+
+class PredictionStorage:
+ def __init__(self, filename=None):
+ self.filename = filename
+ self.predictions = []
+ if filename:
+ self.load_predictions()
+
+ def load_predictions(self):
+ """Load predictions from file."""
+ if not self.filename:
+ return
+
+ try:
+ with open(self.filename, 'r') as f:
+ data = json.load(f)
+ self.predictions = [Prediction.from_dict(pred) for pred in data]
+ except (FileNotFoundError, json.JSONDecodeError) as e:
+ print(f"Error reading predictions file {self.filename}: {e}")
+ self.predictions = []
+
+ def save_predictions(self):
+ """Save predictions to file."""
+ if not self.filename:
+ return
+
+ try:
+ with open(self.filename, 'w') as f:
+ json.dump([pred.to_dict() for pred in self.predictions], f, indent=4)
+ except Exception as e:
+ print(f"Error writing to predictions file {self.filename}: {e}")
+
+ def add_prediction(self, launch_date, duration_years, predicted_date, keyword_args=None, description=None):
+ """Add a new prediction."""
+ prediction = Prediction(
+ launch_date=launch_date,
+ duration_years=duration_years,
+ predicted_date=predicted_date,
+ keyword_args=keyword_args,
+ description=description
+ )
+ self.predictions.append(prediction)
+ self.save_predictions()
+ return prediction
+
+ def get_prediction_by_id(self, prediction_id):
+ """Get a prediction by its ID."""
+ return next((pred for pred in self.predictions if pred.id == prediction_id), None)
+
+ def list_predictions(self):
+ """List all predictions."""
+ return self.predictions
+
+ def search_predictions(self, start_date=None, end_date=None, keyword=None):
+ """Search predictions by date range or keyword."""
+ results = self.predictions
+
+ if start_date:
+ start_dt = start_date if isinstance(start_date, datetime) else datetime.fromisoformat(start_date)
+ results = [p for p in results if p.predicted_date >= start_dt]
+
+ if end_date:
+ end_dt = end_date if isinstance(end_date, datetime) else datetime.fromisoformat(end_date)
+ results = [p for p in results if p.predicted_date <= end_dt]
+
+ if keyword:
+ results = [p for p in results if keyword in p.keyword_args]
+
+ return results
+
+ def delete_prediction(self, prediction_id):
+ """Delete a prediction by ID."""
+ prediction = self.get_prediction_by_id(prediction_id)
+ if prediction:
+ self.predictions.remove(prediction)
+ self.save_predictions()
+ return prediction
+ return None
+
+ def update_prediction_description(self, prediction_id, description):
+ """Update a prediction's description."""
+ prediction = self.get_prediction_by_id(prediction_id)
+ if prediction:
+ prediction.description = description
+ self.save_predictions()
+ return prediction
+ return None
\ No newline at end of file |
