1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# prediction_storage.py
import json
import uuid
from datetime import datetime
class Prediction:
def __init__(self, launch_date, duration_years, predicted_date, keyword_args=None, prediction_id=None, timestamp=None, description=None):
self.id = prediction_id if prediction_id else str(uuid.uuid4())
self.launch_date = launch_date if isinstance(launch_date, datetime) else datetime.fromisoformat(launch_date)
self.duration_years = duration_years
self.predicted_date = predicted_date if isinstance(predicted_date, datetime) else datetime.fromisoformat(predicted_date)
self.keyword_args = keyword_args or {}
self.timestamp = timestamp if timestamp else datetime.now()
self.description = description
def to_dict(self):
# Convert keyword_args dates to ISO format strings
serialized_keyword_args = {}
for keyword, events in self.keyword_args.items():
serialized_events = []
for event in events:
# Assuming each event is a tuple of (start_date, end_date, id)
start_date = event[0].isoformat() if isinstance(event[0], datetime) else event[0]
end_date = event[1].isoformat() if isinstance(event[1], datetime) else event[1]
serialized_events.append((start_date, end_date, event[2]))
serialized_keyword_args[keyword] = serialized_events
return {
'id': self.id,
'launch_date': self.launch_date.isoformat(),
'duration_years': self.duration_years,
'predicted_date': self.predicted_date.isoformat(),
'keyword_args': serialized_keyword_args,
'timestamp': self.timestamp.isoformat(),
'description': self.description
}
@classmethod
def from_dict(cls, data):
# Deserialize keyword_args dates from ISO format strings
keyword_args = {}
for keyword, events in data.get('keyword_args', {}).items():
deserialized_events = []
for event in events:
# Convert string dates back to datetime objects
start_date = datetime.fromisoformat(event[0]) if isinstance(event[0], str) else event[0]
end_date = datetime.fromisoformat(event[1]) if isinstance(event[1], str) else event[1]
deserialized_events.append((start_date, end_date, event[2]))
keyword_args[keyword] = deserialized_events
return cls(
launch_date=data['launch_date'],
duration_years=data['duration_years'],
predicted_date=data['predicted_date'],
keyword_args=keyword_args,
prediction_id=data['id'],
timestamp=datetime.fromisoformat(data['timestamp']),
description=data.get('description')
)
def __repr__(self):
return (f"Prediction(id={self.id}, launch_date={self.launch_date}, "
f"duration_years={self.duration_years}, predicted_date={self.predicted_date}, "
f"timestamp={self.timestamp}, description={repr(self.description)})")
class PredictionStorage:
def __init__(self, filename=None):
self.filename = filename
self.predictions = []
if filename:
self.load_predictions()
def load_predictions(self):
"""Load predictions from file."""
if not self.filename:
return
try:
with open(self.filename, 'r') as f:
data = json.load(f)
self.predictions = [Prediction.from_dict(pred) for pred in data]
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Error reading predictions file {self.filename}: {e}")
self.predictions = []
def save_predictions(self):
"""Save predictions to file."""
if not self.filename:
return
try:
with open(self.filename, 'w') as f:
json.dump([pred.to_dict() for pred in self.predictions], f, indent=4)
except Exception as e:
print(f"Error writing to predictions file {self.filename}: {e}")
def add_prediction(self, launch_date, duration_years, predicted_date, keyword_args=None, description=None):
"""Add a new prediction."""
prediction = Prediction(
launch_date=launch_date,
duration_years=duration_years,
predicted_date=predicted_date,
keyword_args=keyword_args,
description=description
)
self.predictions.append(prediction)
self.save_predictions()
return prediction
def get_prediction_by_id(self, prediction_id):
"""Get a prediction by its ID."""
return next((pred for pred in self.predictions if pred.id == prediction_id), None)
def list_predictions(self):
"""List all predictions."""
return self.predictions
def search_predictions(self, start_date=None, end_date=None, keyword=None):
"""Search predictions by date range or keyword."""
results = self.predictions
if start_date:
start_dt = start_date if isinstance(start_date, datetime) else datetime.fromisoformat(start_date)
results = [p for p in results if p.predicted_date >= start_dt]
if end_date:
end_dt = end_date if isinstance(end_date, datetime) else datetime.fromisoformat(end_date)
results = [p for p in results if p.predicted_date <= end_dt]
if keyword:
results = [p for p in results if keyword in p.keyword_args]
return results
def delete_prediction(self, prediction_id):
"""Delete a prediction by ID."""
prediction = self.get_prediction_by_id(prediction_id)
if prediction:
self.predictions.remove(prediction)
self.save_predictions()
return prediction
return None
def update_prediction_description(self, prediction_id, description):
"""Update a prediction's description."""
prediction = self.get_prediction_by_id(prediction_id)
if prediction:
prediction.description = description
self.save_predictions()
return prediction
return None
|