root/tracslimtimerplugin/0.10/tracslimtimer/slimtimer.py

Revision 2203, 11.0 kB (checked in by tst, 2 years ago)

TracSlimTimerPlugin:

Publishing version 0.1.0 minus docs to trac hacks

Line 
1 import string
2 import StringIO
3 import httplib
4 import time
5 import re
6 import datetime
7 import elementtree.ElementTree as ET
8
9 # SlimTimerAPI
10
11 #
12 # A task
13 #
14 # We sync this with SlimTimer using an explicit call to update(). This is
15 # a little less elegant than doing it through overloads of __setattr__ but it
16 # better suited to batch changes and our use of the API
17 #
18 class SlimTimerTask:
19
20     def __init__(self, session, name, id=0):
21         self.__session  = session
22         self.name       = name
23         self.id         = id
24         self.tags       = []
25         self.coworkers  = []
26         self.reporters  = []
27         self.complete   = False
28
29         # These fields are read-only.
30         # TODO: Overload __setattr__ and throw an Exception if these are ever
31         # set
32         self.hours      = 0
33         self.owner      = ''
34         self.updated_at = 0
35         self.created_at = 0
36         self.completed_on = 0
37
38     def update(self):
39         result = self.__session.update_task(self)
40         # I have no idea about Python but this seems to provide the semantics
41         # we want here, rather than self = result
42         self.__dict__ = result.__dict__
43
44     def delete(self):
45         # If we haven't been created then there's nothing to delete
46         if (self.id):
47             self.__session.delete_task(self)
48
49 #
50 # A time entry
51 #
52 # At the moment this is just a glorified dictionary. In the future it should
53 # probably have a link back to the session so it can be live.
54 #
55 class SlimTimerEntry:
56
57     def __init__(self):
58         self.id         = 0
59         self.start_time = None
60         self.end_time   = None
61         self.duration   = 0
62         self.tags       = ''
63         self.comments   = ''
64         self.task        = None
65
66 #
67 # The session
68 #
69 class SlimTimerSession:
70
71     def __init__(self, username, password, apikey):
72         self.__username = username
73         self.__password = password
74         self.__apikey   = apikey
75
76         self.__token    = ''
77         self.__userid   = ''
78
79         self.__conn     = httplib.HTTPConnection("www.slimtimer.com")
80
81         self._logon()
82
83     def __del__(self):
84         self.__conn.close()
85
86     def get_task_by_id(self, id):
87
88         url = "%s/tasks/%s?%s" % \
89               (self._get_base_url(), id, self._get_url_params())
90
91         self.__conn.request("GET", url, "", { "Accept": "application/xml" })
92         response = self.__conn.getresponse()
93
94         data = response.read()
95
96         if not response.status == 200:
97             return None
98
99         return self._parse_task(ET.fromstring(data))
100
101     def get_task_by_name(self, name, completed='both'):
102
103         completed = string.lower(completed)
104         completed = {'both': 'yes',
105                      'yes': 'only',
106                      'no': 'no',
107                      'true': 'only',
108                      'false': 'no'}[completed]
109
110         url = "%s/tasks?%s&show_completed=%s" % \
111               (self._get_base_url(), self._get_url_params(), completed)
112
113         self.__conn.request("GET", url, "", { "Accept": "application/xml" })
114         response = self.__conn.getresponse()
115
116         data = response.read()
117
118         if not response.status == 200:
119             return None
120
121         for task in ET.fromstring(data).findall("task"):
122             if task.findtext("name") == name:
123                 return self._parse_task(task)
124
125         return None
126
127     def update_task(self, task):
128         """
129         Updates the given task or creates it if the task ID is 0
130         """
131         create = task.id == 0
132
133         xml = self._serialise_task(task)
134
135         method = ['PUT','POST'][create]
136
137         if create:
138             url = "%s/tasks?%s" % \
139                   (self._get_base_url(), self._get_url_params())
140         else:
141             url = "%s/tasks/%s?%s" % \
142                   (self._get_base_url(), task.id, self._get_url_params())
143
144         headers = { "Accept":"application/xml",
145                     "Content-Type":"application/xml" }
146         self.__conn.request(method, url, xml, headers)
147         response = self.__conn.getresponse()
148
149         data = response.read()
150
151         if not response.status == 200:
152             raise Exception("Could not update/create task."\
153                     " Response was [%s]: %s" % (response.status, data))
154
155         return self._parse_task(ET.fromstring(data))
156
157     def delete_task(self, task):
158
159         url = "%s/tasks/%s?%s" % \
160               (self._get_base_url(), task.id, self._get_url_params())
161
162         self.__conn.request("DELETE", url, "",
163                             { "Accept": "application/xml" })
164         response = self.__conn.getresponse()
165
166         if not response.status == 200:
167             raise Exception("Task not found for deletion")
168
169         # We seem to need to reset the connection after a delete
170         self._reset_connection()
171
172     def get_time_entries(self, range_start = None, range_end = None):
173
174         result = []
175
176         # Prepare range filter
177         filters = []
178         if range_start:
179             filters.append("range_start=%s" % self._format_date(range_start))
180         if range_end:
181             filters.append("range_end=%s" % self._format_date(range_end))
182         filter_str = '&'.join(filters)
183         if filter_str:
184             filter_str = '&' + filter_str
185
186         url = "%s/time_entries?%s%s" % \
187               (self._get_base_url(), self._get_url_params(), filter_str)
188
189         self.__conn.request("GET", url, "", { "Accept": "application/xml" })
190         response = self.__conn.getresponse()
191
192         data = response.read()
193
194         if not response.status == 200:
195             return None
196
197         for entry in ET.fromstring(data).findall("time-entry"):
198             result.append(self._parse_time_entry(entry))
199
200         return result
201
202     def get_username(self):
203         return self.__username
204
205     # Internal methods
206
207     def _logon(self):
208         """ Get an access token and user id """
209
210         # Lazy operation
211         if self.__token and self.__userid:
212             return (self.__token, self.__userid)
213
214         # Parameter checking
215         if not self.__username or not self.__apikey:
216             raise Exception("Invalid username or API key")
217
218         # Build request
219         request = '<request><user><email>%s</email>\
220             <password>%s</password></user><api-key>%s</api-key>\
221             </request>' % (self.__username, self.__password, self.__apikey)
222
223         headers = { "Accept":"application/xml",
224                     "Content-Type":"application/xml" }
225         self.__conn.request("POST", "/users/token", request, headers)
226         response = self.__conn.getresponse()
227
228         data = response.read()
229
230         if response.status != 200:
231             raise Exception("Server returned error: %s)" % data)
232
233         result = ET.fromstring(data)
234         self.__token = result.findtext("access-token")
235         self.__userid = result.findtext("user-id")
236
237         return (self.__token, self.__userid)
238
239     def _reset_connection(self):
240         """ Establish a new connection """
241
242         self.__userid = 0
243         self.__token  = 0
244         self.__conn.close()
245
246         self.__conn   = httplib.HTTPConnection("www.slimtimer.com")
247         self._logon()
248
249     def _get_base_url(self):
250         """ Get the start of the URL """
251
252         # This should have been established by _logon
253         assert self.__userid
254
255         return "/users/%s" % self.__userid
256
257     def _get_url_params(self):
258         """ Get common URL parameters """
259
260         # These should have been established by _logon
261         assert self.__apikey
262         assert self.__token
263
264         return "api_key=%s&access_token=%s" % (self.__apikey, self.__token)
265
266     def _parse_task(self, task_element):
267         id   = int(task_element.findtext("id"))
268         name = task_element.findtext("name")
269
270         task = SlimTimerTask(self, name, id)
271
272         tags_text = task_element.findtext("tags")
273         if (tags_text): task.tags = self._parse_tags(tags_text)
274
275         task.coworkers = self._parse_people(task_element.find("coworkers"))
276         task.reporters = self._parse_people(task_element.find("reporters"))
277
278         task.complete = not task_element.findtext("completed-on") == ""
279         task.hours = float(task_element.findtext("hours"))
280
281         owners = self._parse_people(task_element.find("owners"))
282         if len(owners):
283             task.owner = owners[0]
284
285         task.created_at = self._parse_date(task_element.findtext("created-at"))
286         task.updated_at = self._parse_date(task_element.findtext("updated-at"))
287
288         if task.complete:
289             task.completed_on = \
290                 self._parse_date(task_element.findtext("completed-on"))
291
292         return task
293
294     def _parse_tags(self, tags_text):
295         pat = r'"[^"]*"|[^," \t][^,"]+[^," \t]'
296         return re.findall(pat, tags_text)
297
298     def _parse_people(self, list_element):
299         emails = []
300         for person in list_element.findall("person"):
301             emails.append(self._parse_person(person)['email'])
302         return emails
303
304     def _parse_person(self, person_element):
305         person = {}
306         person['name'] = person_element.findtext("name")
307         person['userid'] = person_element.findtext("user-id")
308         person['email'] = person_element.findtext("email")
309         return person
310
311     def _parse_date(self, date_text):
312         try:
313             return datetime.datetime(*(time.strptime(date_text,
314                                         "%Y-%m-%dT%H:%M:%SZ")[0:6]))
315         except:
316             return None
317
318     def _format_date(self, date):
319         return date.strftime("%Y-%m-%dT%H:%M:%SZ")
320
321     def _parse_time_entry(self, entry_element):
322         entry = SlimTimerEntry()
323         entry.id = int(entry_element.findtext("id"))
324         entry.start_time = \
325             self._parse_date(entry_element.findtext("start-time"))
326         entry.end_time = self._parse_date(entry_element.findtext("end-time"))
327         entry.duration = int(entry_element.findtext("duration-in-seconds"))
328         entry.tags     = entry_element.findtext("tags")
329         entry.comments = entry_element.findtext("comments")
330         entry.task = self._parse_task(entry_element.find("task"))
331
332         return entry
333
334     def _serialise_task(self, task):
335
336         xml_task = ET.Element("task")
337
338         if task.id != 0:
339             id = ET.SubElement(xml_task, "id")
340             id.set("type", "integer")
341             id.text = str(task.id)
342
343         name = ET.SubElement(xml_task, "name")
344         name.text = task.name
345
346         if len(task.tags):
347             tags = ET.SubElement(xml_task, "tags")
348             tags.text = string.join(task.tags, ",")
349
350         if len(task.coworkers):
351             coworkers = ET.SubElement(xml_task, "coworker_emails")
352             coworkers.text = string.join(task.coworkers, ",")
353
354         if len(task.reporters):
355             reporters = ET.SubElement(xml_task, "reporter_emails")
356             reporters.text = string.join(task.reporters, ",")
357
358         completed = ET.SubElement(xml_task, "completed_on")
359         if task.complete:
360             completed.text = time.strftime("%Y-%m-%d %H:%M:%S",
361                                            time.gmtime())
362         else:
363             completed.text = ""
364
365         xml = StringIO.StringIO()
366         ET.ElementTree(xml_task).write(xml)
367         result = xml.getvalue()
368         xml.close()
369
370         return result
Note: See TracBrowser for help on using the browser.