instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / UPnP / common / asynchHTTPclient.py
1 # Written by Ingar Arntzen
2 # see LICENSE.txt for license information
3
4 """
5 This module implements the client side of an non-blocking, 
6 http request-response exchange, supported by a TaskRunner.
7 A blocking interace is also provided on top of the non-blocking.
8
9 The implementation also sets up the connection i a non-blocking
10 manner. This essentially makes it a 
11 connect-request-response protocol.
12 """
13
14 import socket
15 import errno
16 import exceptions
17 import os
18 import threadhotel
19
20 ##############################################
21 # BLOCKING HTTP CLIENT
22 ##############################################
23
24 class SynchHTTPClient:
25
26     """
27     This class wraps the AsynchHTTPClient to provide 
28     a traditional blocking API.
29     """
30     FAIL, OK = "FAIL", "OK"
31
32     def __init__(self, asynchhttpclient):
33         self._threadhotel = threadhotel.ThreadHotel()
34         self._asynchclient = asynchhttpclient
35
36     def request(self, host, port, request_data):
37         """
38         Returns tuple (status, reply).
39         - Status indicates whether the request failed or succeded.
40         - If Status is FAIL, the reply explains what went wrong.
41         - Reply is tuple of: (header, body)
42         - If Status is OK, the reply includes the http response.
43         - Reply is tuple of: (error, comment)
44         """
45         rid = self._asynchclient.get_request_id()
46         self._threadhotel.reserve(rid)
47         self._asynchclient.request(rid, host, port, request_data,
48                 self._abort_handler, self._response_handler, timeout=10)
49         return self._threadhotel.wait_reply(rid)
50
51     def _abort_handler(self, rid, error, comment):
52         """Abort handler."""
53         reply = (error, comment)
54         self._threadhotel.wakeup(rid, SynchHTTPClient.FAIL, reply)
55
56     def _response_handler(self, rid, header, body):
57         """Response handler."""
58         reply = (header, body)
59         self._threadhotel.wakeup(rid, SynchHTTPClient.OK, reply)
60
61
62
63 ##############################################
64 #  NON-BLOCKING  HTTP CLIENT
65 ##############################################
66
67 _LOG_TAG = "HTTPClient"
68
69 class AsynchHTTPClient:
70
71     """
72     This class runs non-blocking asynchronous http requests 
73     to multiple HTTP servers at once. Specify a_handler or r_handler
74     for asynchrounous upcalls. If not, the httpClient supports
75     fire-and-forget semantics (from an external point of view). 
76     Internally, the httpClient will not forget a request until it has
77     either timeout out, aborted due to failure or succeeded.
78     """
79
80     def __init__(self, task_runner, logger=None):
81         self._task_runner = task_runner
82         self._request_id = 0
83         self._requests = {} # requestID: (request, aHandler, rHandler) 
84         # Logging
85         self._log_tag = _LOG_TAG
86         self._logger = logger
87
88     ##############################################
89     # PUBLIC API
90     ##############################################
91
92     def get_request_id(self):
93         """Generate new request id."""
94         self._request_id += 1
95         return self._request_id
96
97     def request(self, rid, host, port, request_data,
98                 a_handler=None, r_handler=None, timeout=10):
99         """
100         Issue a new http request.
101
102         host, port -- web server.
103         request_data -- string data including both header and body.
104         a_handler(error, message) -- handler to be invoked if request aborts.
105         r_handler(header, body) -- handler to be invoked with response.
106         """
107         request = HTTPRequest(self._task_runner, rid, recv_timeout=timeout)
108         request.set_abort_handler(self._handle_abort)
109         request.set_response_handler(self._handle_response)
110         self._requests[rid] = (request, a_handler, r_handler)
111         request.dispatch(host, port, request_data)
112         self._log("Request Dispatched [%d]" % rid)
113         return rid
114
115     def close(self):
116         """Stop all requests and close their sockets."""
117         for tup in self._requests.values():
118             request = tup[0]
119             request.close()
120
121     ##############################################
122     # PRIVATE HANDLERS
123     ##############################################
124
125     def _handle_response(self, rid, header, body):
126         """Dispatches responses by invoking given r_handler."""
127         self._log("Response Received [%d]" % rid)
128         request = self._requests[rid][0]
129         r_handler = self._requests[rid][2]
130         del self._requests[rid]
131         request.close()
132         if r_handler:
133             r_handler(rid, header, body)
134
135     def _handle_abort(self, rid, error, why):
136         """Dispatches aborts by invoking given a_handler."""
137         self._log("HTTP Request Aborted [%d]" % rid)
138         request = self._requests[rid][0]
139         a_handler = self._requests[rid][1]
140         del self._requests[rid]
141         request.close()
142         if a_handler:
143             a_handler(rid, error, why)
144
145     ##############################################
146     # PRIVATE UTILITY
147     ##############################################
148
149     def _log(self, msg):
150         """Logger."""
151         if self._logger:
152             self._logger.log(self._log_tag, msg)
153
154
155 ##############################################
156 # HTTP REQUEST
157 ##############################################
158
159 class HTTPRequestError(exceptions.Exception): 
160     """Error associated with the request response protocol."""
161     pass
162
163 class HTTPRequest:
164
165     """
166     This implements a single non-blocking connect-request-response 
167     protocol from an HTTPClient to a HTTPServer.
168     For now, this class does not support sequential requests-responses on the
169     same connection. Neither does it support instance reuse.
170     """
171     STATE_INIT = 0
172     STATE_CONNECT_STARTED = 1
173     STATE_CONNECT_OK = 2
174     STATE_SEND_STARTED = 3
175     STATE_SEND_OK = 4
176     STATE_RECV_STARTED = 5
177     STATE_RECV_OK = 6
178     STATE_DONE = 7
179
180     def __init__(self, task_runner, request_id, 
181                  recv_timeout=10, conn_timeout=1, 
182                  conn_attempts=3, logger=None):
183         self._task_runner = task_runner
184         self._request_id = request_id
185         # Create Socket
186         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
187         self._sock.setblocking(False)
188         # Protocol State
189         self._state = HTTPRequest.STATE_INIT
190         # Request Data
191         self._request_data = None
192         self._bytes_sent = 0
193         # Response Data
194         self._response_data = ""
195         self._recv_count = 0
196         # Tasks
197         self._conn_task = None
198         self._conn_to_task = None
199         self._send_task = None
200         self._recv_task = None
201         self._recv_to_task = None
202         # Connect Attempts
203         self._connect_attempts = 0
204         self._max_connect_attempts = conn_attempts
205         # Send
206         self._bytes = 0
207         self._send_count = 0
208         # Recv
209         self._header = ""
210         self._body = ""
211         self._length = 0
212         self._delimiter = '\r\n\r\n'
213         # Timeouts
214         self._recv_to = recv_timeout
215         self._conn_to = conn_timeout
216         # Handler Upcalls
217         self._recv_handler = lambda requestID, hdr, body:None
218         self._abort_handler = lambda requestID, error, comment : None
219         # Logging
220         self._logger = logger
221         self._log_tag = "Request [%d]" % self._request_id        
222
223     ##############################################
224     # PUBLIC API
225     ##############################################
226
227     def dispatch(self, host, port, request_data):
228         """Dispatch a new request."""
229         if self._state !=  HTTPRequest.STATE_INIT:
230             raise HTTPRequestError, "Illegal Operation given protocol State"
231         self._request_data = request_data
232         self._connect_start(host, port)
233
234     def set_response_handler(self, handler):
235         """Register a response handler."""
236         self._recv_handler = handler
237
238     def set_abort_handler(self, handler):
239         """Register an abort handler."""
240         self._abort_handler = handler
241
242     def close(self):
243         """Cleanup the request-response protocol and close the socket."""
244         if self._conn_task : 
245             self._conn_task.cancel()
246         if self._conn_to_task : 
247             self._conn_to_task.cancel()
248         if self._send_task : 
249             self._send_task.cancel()
250         if self._recv_task : 
251             self._recv_task.cancel()
252         if self._recv_to_task: 
253             self._recv_to_task.cancel()
254         self._state = HTTPRequest.STATE_DONE
255         if self._sock: 
256             try:
257                 self._sock.close()
258             except socket.error:
259                 pass
260             self._sock = None
261
262     ##############################################
263     # PRIVATE UTILITY
264     ##############################################
265
266     def _log(self, msg):
267         """Logging."""
268         if self._logger: 
269             self._logger.log(self._log_tag, msg)
270
271
272     def _get_content_length(self):
273         """Extract body length from HTTP header."""
274         lines = self._header.split('\r\n')
275         if not lines : 
276             return
277         for line in lines[1:]:            
278             if len(line.strip()) > 0:
279                 elem_name, elem_value = line.split(":", 1)
280                 if elem_name.lower() == 'content-length':
281                     return int(elem_value.strip())
282         else: return 0
283  
284     def _http_header_ok(self):
285         """Check that received data is a valid HTTP header."""
286         if len(self._header) > 4 and self._header[:4] == "HTTP":
287             return True
288         else:
289             return False
290
291     def _do(self, method, args=()):
292         """Shorthand for add_task."""
293         return self._task_runner.add_task(method, args)
294     def _do_write(self, file_descriptor, method):
295         """Shorthand for add_write."""
296         return self._task_runner.add_write_task(file_descriptor, method)
297     def _do_read(self, file_descriptor, method):
298         """Shorthand for add_read."""
299         return self._task_runner.add_read_task(file_descriptor, method)
300     def _do_to(self, timeout, method):
301         """Shorthand for add_delay."""
302         return self._task_runner.add_delay_task(timeout, method)
303
304     ##############################################
305     # PRIVATE PROTOCOL METHODS
306     ##############################################
307
308     def _connect_start(self, host, port):
309         """Start non-blocking connect."""
310         self._log("Connect Start")
311         error = self._sock.connect_ex((host, port))
312         if error != errno.EINPROGRESS:
313             self._abort(error, "Non-Blocking Connect Failed")
314             return
315         self._state = HTTPRequest.STATE_CONNECT_STARTED
316         self._conn_task = self._do_write(self._sock.fileno(), 
317                                          self._handle_connect_ok)
318         self._conn_to_task = self._do_to(self._conn_to, 
319                                          self._handle_connect_to)
320
321     def _handle_connect_ok(self):
322         """
323         Handler successful connect. 
324         
325         In fact, certain unsuccessful connects may not be detected 
326         before write is attempted on the socket. 
327         """
328         self._log("Connect OK")
329         if self._state != HTTPRequest.STATE_CONNECT_STARTED:
330             raise HTTPRequestError, "Illegal Operation given protocol State"
331         self._state = HTTPRequest.STATE_CONNECT_OK
332         self._conn_task.cancel()
333         self._conn_to_task.cancel()
334         # Start sending the Request
335         self._do(self._send)
336
337     def _handle_connect_to(self):
338         """Handle connect timeout."""
339         self._log("Connect Timeout")
340         if self._state != HTTPRequest.STATE_CONNECT_STARTED:
341             raise HTTPRequestError, "Illegal Operation given protocol State"
342         self._connect_attempts += 1
343         if self._connect_attempts >= self._max_connect_attempts:
344             # Abort
345             self._conn_task.cancel()
346             self._abort(errno.ETIME, "Connect Timeout")
347         else:
348             # Try again
349             self._conn_to_task = self._do_to(self._conn_to, 
350                                              self._handle_connect_to)
351
352     def _send(self):
353         """
354         Start sending a request.
355         Or continue sending a partially sent request.
356         """
357         self._send_count += 1
358         first_attempt = True if self._send_count == 1 else False
359         if first_attempt and self._state != HTTPRequest.STATE_CONNECT_OK:
360             raise HTTPRequestError, "Illegal Operation given protocol State"
361         elif not first_attempt and \
362                 self._state != HTTPRequest.STATE_SEND_STARTED:
363             raise HTTPRequestError, "Illegal Operation given protocol State"
364         if first_attempt: 
365             self._state = HTTPRequest.STATE_SEND_STARTED
366             self._log("Send Started")
367         else: self._log("Send Continue")
368         
369         # (Continue) Send
370         try:
371             bytes_sent = self._sock.send(self._request_data[self._bytes:])
372         except socket.error, why:
373             if why[0] == errno.EAGAIN:
374                 # Send on full buffer
375                 # Continue sending again once the socket becomes writeable
376                 self._send_continue()
377                 return
378             else:
379                 # Typically EPIPE: Broken Pipe or ECONNREFUSED 
380                 if self._send_task: 
381                     self._send_task.cancel()
382                 self._abort(why[0], "Exception on Send")
383                 return
384
385         # Send Operation returned naturally
386         if bytes > 0:
387             # Something was sent
388             self._bytes += bytes_sent
389             if self._bytes >= len(self._request_data):
390                 # The complete message was sent
391                 self._state = HTTPRequest.STATE_SEND_OK
392                 self._task_runner.add_task(self._handle_send_ok)
393                 return
394             else: 
395                 # Message only partially sent
396                 self._send_continue()
397                 return
398         else:
399             # 0 bytes sent => error
400             if self._send_task : 
401                 self._send_task.cancel()
402             msg = "Sent 0 bytes, yet fd was writeable and no exception occurred"
403             self._abort(errno.EPIPE, msg)
404
405     def _send_continue(self):
406         """Register new write task after request was only partially sent."""
407         # Register a new Write Task
408         if not self._send_task:
409             self._send_task = self._do_write(self._sock.fileno(), 
410                                              self._send)
411
412     def _handle_send_ok(self):
413         """Handle completely sent request."""
414         self._log("Send OK")
415         if self._state != HTTPRequest.STATE_SEND_OK:
416             raise HTTPRequestError, "Illegal Operation given protocol State"
417         # Cancel Send Task
418         if self._send_task: 
419             self._send_task.cancel()
420         # Start waiting for the response
421         self._recv_task = self._do_read(self._sock.fileno(), self._recv)
422         # Define new Receive Timeout
423         self._recv_to_task = self._do_to(self._recv_to, 
424                                          self._handle_recv_to)
425                 
426     def _recv(self):
427         """
428         Start receiveing the response.
429         Or continue to receive a partially received response.
430         """
431         self._recv_count += 1
432         first_attempt = True if self._recv_count == 1 else False
433         if first_attempt and self._state != HTTPRequest.STATE_SEND_OK:
434             raise HTTPRequestError, "Illegal Operation given protocol State"
435         elif not first_attempt and \
436                 self._state != HTTPRequest.STATE_RECV_STARTED:
437             raise HTTPRequestError, "Illegal Operation given protocol State"
438         if first_attempt: 
439             self._state = HTTPRequest.STATE_RECV_STARTED
440             self._log("Recv Started")
441         else:
442             self._log("Recv Continue")
443
444         # Recv a chunk
445         try:
446             data = self._sock.recv(1024)
447         except socket.error, why:
448             if why[0] == errno.EAGAIN:
449                 # EAGAIN: Enter/stay in writeset                
450                 self._recv_continue()
451                 return
452             else:
453                 # EPIPE: Broken Pipe.
454                 if self._recv_task:
455                     self._recv_task.cancel()
456                 self._abort(why[0], "Exception on Recv")
457                 return
458         
459         # Recv completed naturally
460         if data:
461             self._response_data += data        
462             # Parse HTTP response
463             if not self._header:
464                 tokens = self._response_data.split(self._delimiter, 1)
465                 if len(tokens) == 1:
466                     # Header Not Complete
467                     self._recv_continue()
468                     return
469                 else:
470                     # Header Complete
471                     self._header = tokens[0]
472                     if not self._http_header_ok():
473                         self._abort(errno.EBADMSG, "Not HTTP Header")
474                         return
475                     else:
476                         # Header complete and OK
477                         self._length = len(self._header) + \
478                             len(self._delimiter) + self._get_content_length()
479             
480             if self._header:
481                 # Header is received, entire body may not be received
482                 if len(self._response_data) < self._length:
483                     # Entire body not received
484                     self._recv_continue()
485                 else:
486                     # Entire response received (may be too long)
487                     if len(self._response_data) > self._length:
488                         # Truncate
489                         self._response_data = self._response_data[:self._length]
490                     # Entire response received (correct length)
491                     # Cancel Tasks
492                     self._recv_task.cancel()
493                     self._recv_to_task.cancel()
494                     self._state = HTTPRequest.STATE_RECV_OK
495                     self._do(self._handle_recv_ok)
496         else:
497             # Did not block, but received no data => error
498             self._recv_task.cancel()
499             msg = "Recv 0 bytes, yet fd was readable and no exception occurred"
500             self._abort(errno.EPIPE, msg)
501
502     def _recv_continue(self):
503         """Register read task to continue to receive a 
504         partially received response."""
505         # Make sure a ReadTask is registered.
506         if not self._recv_task:
507             self._recv_task = self._do_read(self._sock.fileno(), 
508                                             self._recv)
509
510     def _handle_recv_to(self):
511         """Handle receive timeout."""
512         self._log("Receive Timeout")
513         if self._state != HTTPRequest.STATE_SEND_OK:
514             raise HTTPRequestError, "Illegal Operation given protocol State"
515         # Cancel RecvTask
516         if self._recv_task: 
517             self._recv_task.cancel()
518         self._abort(errno.ETIME, "Receive Timeout")
519
520     def _handle_recv_ok(self):
521         """Handle completely received response."""
522         self._log("Receive OK")
523         if self._state != HTTPRequest.STATE_RECV_OK:
524             raise HTTPRequestError, "Illegal Operation given protocol State"
525         # Upcall
526         body = self._response_data[len(self._header) + len(self._delimiter):]
527         self._state = HTTPRequest.STATE_DONE
528         args = (self._request_id, self._header, body)
529         self._do(self._recv_handler, args)
530
531     def _abort(self, error, comment):
532         """Abort this request-response protocol."""
533         fmt = "Abort [%d] %s '%s' (%s)" 
534         self._log(fmt % (error, errno.errorcode[error], \
535                              os.strerror(error), comment))
536         self._state = HTTPRequest.STATE_DONE
537         args = (self._request_id, error, comment)
538         self._do(self._abort_handler, args)
539
540
541  
542 ##############################################
543 # MAIN
544 ##############################################
545
546 if __name__ == '__main__':
547
548     class _MockLogger:
549         """Mock-up Logger."""
550         def log(self, tag, msg):
551             """Log to stdout."""
552             print tag, msg
553
554     LOGGER = _MockLogger()
555
556     import BaseLib.UPnP.common.taskrunner as taskrunner
557     import sys
558
559     TR = taskrunner.TaskRunner()
560
561     HOME = "192.168.1.235"
562     WORK = "193.156.106.130"
563
564     HOST = WORK
565
566     if len(sys.argv) > 1:
567         PORT = int(sys.argv[1])        
568     else:
569         PORT = 44444
570
571     if len(sys.argv) > 2 and sys.argv[2] == 'home':
572         HOST = HOME
573
574     HTTP_REQUEST = "NOTIFY /path HTTP/1.1\r\nContent-length:0\r\n\r\n"
575
576     def response_handler(rid, header, body):
577         """Response handler."""
578         print rid
579         print header
580         print "----------"
581         print body
582
583     def abort_handler(rid, error, comment):
584         """Abort handler."""
585         fmt = "Abort [%d] %s '%s' (%s) [%d]" 
586         print fmt % (error, errno.errorcode[error], 
587                      os.strerror(error), comment, rid)
588
589
590     class _TestSynchHTTPClient:
591         """Runnable test class for Synchronous HTTPClient."""
592
593         def __init__(self, asynch_httpclient):
594             self._synch_httpclient = SynchHTTPClient(asynch_httpclient)
595
596         def run(self):
597             """Run the blocking connect-request-response protocol."""
598             status, reply = self._synch_httpclient.request(HOST, 
599                                                            PORT, HTTP_REQUEST)
600             if status == SynchHTTPClient.OK:
601                 header, body = reply
602                 print header
603                 print body
604             elif status == SynchHTTPClient.FAIL:
605                 error, msg = reply
606                 print error, msg
607
608     # Test Asynch HTTP Client
609     ASYNCH = AsynchHTTPClient(TR, logger=LOGGER)
610     RID = ASYNCH.get_request_id()
611     ASYNCH.request(RID, HOST, PORT, 
612                    HTTP_REQUEST, abort_handler, response_handler)
613
614     # Test Synch HTTP Client
615     import threading
616     SYNCH = _TestSynchHTTPClient(ASYNCH)
617     THREAD = threading.Thread(target=SYNCH.run)
618     THREAD.start()
619
620     try:
621         TR.run_forever()
622     except KeyboardInterrupt:
623         pass
624