summaryrefslogtreecommitdiff
path: root/prediction_storage.py
blob: 95688c9a17cd694e2eac68b4b77c17822b89f346 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# prediction_storage.py
import json
import uuid
from datetime import datetime

class Prediction:
    def __init__(self, launch_date, duration_years, predicted_date, keyword_args=None, prediction_id=None, timestamp=None, description=None):
        self.id = prediction_id if prediction_id else str(uuid.uuid4())
        self.launch_date = launch_date if isinstance(launch_date, datetime) else datetime.fromisoformat(launch_date)
        self.duration_years = duration_years
        self.predicted_date = predicted_date if isinstance(predicted_date, datetime) else datetime.fromisoformat(predicted_date)
        self.keyword_args = keyword_args or {}
        self.timestamp = timestamp if timestamp else datetime.now()
        self.description = description
    
    def to_dict(self):
        # Convert keyword_args dates to ISO format strings
        serialized_keyword_args = {}
        for keyword, events in self.keyword_args.items():
            serialized_events = []
            for event in events:
                # Assuming each event is a tuple of (start_date, end_date, id)
                start_date = event[0].isoformat() if isinstance(event[0], datetime) else event[0]
                end_date = event[1].isoformat() if isinstance(event[1], datetime) else event[1]
                serialized_events.append((start_date, end_date, event[2]))
            serialized_keyword_args[keyword] = serialized_events
            
        return {
            'id': self.id,
            'launch_date': self.launch_date.isoformat(),
            'duration_years': self.duration_years,
            'predicted_date': self.predicted_date.isoformat(),
            'keyword_args': serialized_keyword_args,
            'timestamp': self.timestamp.isoformat(),
            'description': self.description
        }
    
    @classmethod
    def from_dict(cls, data):
        # Deserialize keyword_args dates from ISO format strings
        keyword_args = {}
        for keyword, events in data.get('keyword_args', {}).items():
            deserialized_events = []
            for event in events:
                # Convert string dates back to datetime objects
                start_date = datetime.fromisoformat(event[0]) if isinstance(event[0], str) else event[0]
                end_date = datetime.fromisoformat(event[1]) if isinstance(event[1], str) else event[1]
                deserialized_events.append((start_date, end_date, event[2]))
            keyword_args[keyword] = deserialized_events
        
        return cls(
            launch_date=data['launch_date'],
            duration_years=data['duration_years'],
            predicted_date=data['predicted_date'],
            keyword_args=keyword_args,
            prediction_id=data['id'],
            timestamp=datetime.fromisoformat(data['timestamp']),
            description=data.get('description')
        )
    
    def __repr__(self):
        return (f"Prediction(id={self.id}, launch_date={self.launch_date}, "
                f"duration_years={self.duration_years}, predicted_date={self.predicted_date}, "
                f"timestamp={self.timestamp}, description={repr(self.description)})")

class PredictionStorage:
    def __init__(self, filename=None):
        self.filename = filename
        self.predictions = []
        if filename:
            self.load_predictions()
    
    def load_predictions(self):
        """Load predictions from file."""
        if not self.filename:
            return
        
        try:
            with open(self.filename, 'r') as f:
                data = json.load(f)
                self.predictions = [Prediction.from_dict(pred) for pred in data]
        except (FileNotFoundError, json.JSONDecodeError) as e:
            print(f"Error reading predictions file {self.filename}: {e}")
            self.predictions = []
    
    def save_predictions(self):
        """Save predictions to file."""
        if not self.filename:
            return
        
        try:
            with open(self.filename, 'w') as f:
                json.dump([pred.to_dict() for pred in self.predictions], f, indent=4)
        except Exception as e:
            print(f"Error writing to predictions file {self.filename}: {e}")
    
    def add_prediction(self, launch_date, duration_years, predicted_date, keyword_args=None, description=None):
        """Add a new prediction."""
        prediction = Prediction(
            launch_date=launch_date,
            duration_years=duration_years,
            predicted_date=predicted_date,
            keyword_args=keyword_args,
            description=description
        )
        self.predictions.append(prediction)
        self.save_predictions()
        return prediction
    
    def get_prediction_by_id(self, prediction_id):
        """Get a prediction by its ID."""
        return next((pred for pred in self.predictions if pred.id == prediction_id), None)
    
    def list_predictions(self):
        """List all predictions."""
        return self.predictions
    
    def search_predictions(self, start_date=None, end_date=None, keyword=None):
        """Search predictions by date range or keyword."""
        results = self.predictions
        
        if start_date:
            start_dt = start_date if isinstance(start_date, datetime) else datetime.fromisoformat(start_date)
            results = [p for p in results if p.predicted_date >= start_dt]
        
        if end_date:
            end_dt = end_date if isinstance(end_date, datetime) else datetime.fromisoformat(end_date)
            results = [p for p in results if p.predicted_date <= end_dt]
        
        if keyword:
            results = [p for p in results if keyword in p.keyword_args]
        
        return results
    
    def delete_prediction(self, prediction_id):
        """Delete a prediction by ID."""
        prediction = self.get_prediction_by_id(prediction_id)
        if prediction:
            self.predictions.remove(prediction)
            self.save_predictions()
            return prediction
        return None
    
    def update_prediction_description(self, prediction_id, description):
        """Update a prediction's description."""
        prediction = self.get_prediction_by_id(prediction_id)
        if prediction:
            prediction.description = description
            self.save_predictions()
            return prediction
        return None