1 # Written by Ingar Arntzen
2 # see LICENSE.txt for license information
5 This module implements the client side of an non-blocking,
6 http request-response exchange, supported by a TaskRunner.
7 A blocking interace is also provided on top of the non-blocking.
9 The implementation also sets up the connection i a non-blocking
10 manner. This essentially makes it a
11 connect-request-response protocol.
20 ##############################################
21 # BLOCKING HTTP CLIENT
22 ##############################################
24 class SynchHTTPClient:
27 This class wraps the AsynchHTTPClient to provide
28 a traditional blocking API.
30 FAIL, OK = "FAIL", "OK"
32 def __init__(self, asynchhttpclient):
33 self._threadhotel = threadhotel.ThreadHotel()
34 self._asynchclient = asynchhttpclient
36 def request(self, host, port, request_data):
38 Returns tuple (status, reply).
39 - Status indicates whether the request failed or succeded.
40 - If Status is FAIL, the reply explains what went wrong.
41 - Reply is tuple of: (header, body)
42 - If Status is OK, the reply includes the http response.
43 - Reply is tuple of: (error, comment)
45 rid = self._asynchclient.get_request_id()
46 self._threadhotel.reserve(rid)
47 self._asynchclient.request(rid, host, port, request_data,
48 self._abort_handler, self._response_handler, timeout=10)
49 return self._threadhotel.wait_reply(rid)
51 def _abort_handler(self, rid, error, comment):
53 reply = (error, comment)
54 self._threadhotel.wakeup(rid, SynchHTTPClient.FAIL, reply)
56 def _response_handler(self, rid, header, body):
57 """Response handler."""
58 reply = (header, body)
59 self._threadhotel.wakeup(rid, SynchHTTPClient.OK, reply)
63 ##############################################
64 # NON-BLOCKING HTTP CLIENT
65 ##############################################
67 _LOG_TAG = "HTTPClient"
69 class AsynchHTTPClient:
72 This class runs non-blocking asynchronous http requests
73 to multiple HTTP servers at once. Specify a_handler or r_handler
74 for asynchrounous upcalls. If not, the httpClient supports
75 fire-and-forget semantics (from an external point of view).
76 Internally, the httpClient will not forget a request until it has
77 either timeout out, aborted due to failure or succeeded.
80 def __init__(self, task_runner, logger=None):
81 self._task_runner = task_runner
83 self._requests = {} # requestID: (request, aHandler, rHandler)
85 self._log_tag = _LOG_TAG
88 ##############################################
90 ##############################################
92 def get_request_id(self):
93 """Generate new request id."""
95 return self._request_id
97 def request(self, rid, host, port, request_data,
98 a_handler=None, r_handler=None, timeout=10):
100 Issue a new http request.
102 host, port -- web server.
103 request_data -- string data including both header and body.
104 a_handler(error, message) -- handler to be invoked if request aborts.
105 r_handler(header, body) -- handler to be invoked with response.
107 request = HTTPRequest(self._task_runner, rid, recv_timeout=timeout)
108 request.set_abort_handler(self._handle_abort)
109 request.set_response_handler(self._handle_response)
110 self._requests[rid] = (request, a_handler, r_handler)
111 request.dispatch(host, port, request_data)
112 self._log("Request Dispatched [%d]" % rid)
116 """Stop all requests and close their sockets."""
117 for tup in self._requests.values():
121 ##############################################
123 ##############################################
125 def _handle_response(self, rid, header, body):
126 """Dispatches responses by invoking given r_handler."""
127 self._log("Response Received [%d]" % rid)
128 request = self._requests[rid][0]
129 r_handler = self._requests[rid][2]
130 del self._requests[rid]
133 r_handler(rid, header, body)
135 def _handle_abort(self, rid, error, why):
136 """Dispatches aborts by invoking given a_handler."""
137 self._log("HTTP Request Aborted [%d]" % rid)
138 request = self._requests[rid][0]
139 a_handler = self._requests[rid][1]
140 del self._requests[rid]
143 a_handler(rid, error, why)
145 ##############################################
147 ##############################################
152 self._logger.log(self._log_tag, msg)
155 ##############################################
157 ##############################################
159 class HTTPRequestError(exceptions.Exception):
160 """Error associated with the request response protocol."""
166 This implements a single non-blocking connect-request-response
167 protocol from an HTTPClient to a HTTPServer.
168 For now, this class does not support sequential requests-responses on the
169 same connection. Neither does it support instance reuse.
172 STATE_CONNECT_STARTED = 1
174 STATE_SEND_STARTED = 3
176 STATE_RECV_STARTED = 5
180 def __init__(self, task_runner, request_id,
181 recv_timeout=10, conn_timeout=1,
182 conn_attempts=3, logger=None):
183 self._task_runner = task_runner
184 self._request_id = request_id
186 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
187 self._sock.setblocking(False)
189 self._state = HTTPRequest.STATE_INIT
191 self._request_data = None
194 self._response_data = ""
197 self._conn_task = None
198 self._conn_to_task = None
199 self._send_task = None
200 self._recv_task = None
201 self._recv_to_task = None
203 self._connect_attempts = 0
204 self._max_connect_attempts = conn_attempts
212 self._delimiter = '\r\n\r\n'
214 self._recv_to = recv_timeout
215 self._conn_to = conn_timeout
217 self._recv_handler = lambda requestID, hdr, body:None
218 self._abort_handler = lambda requestID, error, comment : None
220 self._logger = logger
221 self._log_tag = "Request [%d]" % self._request_id
223 ##############################################
225 ##############################################
227 def dispatch(self, host, port, request_data):
228 """Dispatch a new request."""
229 if self._state != HTTPRequest.STATE_INIT:
230 raise HTTPRequestError, "Illegal Operation given protocol State"
231 self._request_data = request_data
232 self._connect_start(host, port)
234 def set_response_handler(self, handler):
235 """Register a response handler."""
236 self._recv_handler = handler
238 def set_abort_handler(self, handler):
239 """Register an abort handler."""
240 self._abort_handler = handler
243 """Cleanup the request-response protocol and close the socket."""
245 self._conn_task.cancel()
246 if self._conn_to_task :
247 self._conn_to_task.cancel()
249 self._send_task.cancel()
251 self._recv_task.cancel()
252 if self._recv_to_task:
253 self._recv_to_task.cancel()
254 self._state = HTTPRequest.STATE_DONE
262 ##############################################
264 ##############################################
269 self._logger.log(self._log_tag, msg)
272 def _get_content_length(self):
273 """Extract body length from HTTP header."""
274 lines = self._header.split('\r\n')
277 for line in lines[1:]:
278 if len(line.strip()) > 0:
279 elem_name, elem_value = line.split(":", 1)
280 if elem_name.lower() == 'content-length':
281 return int(elem_value.strip())
284 def _http_header_ok(self):
285 """Check that received data is a valid HTTP header."""
286 if len(self._header) > 4 and self._header[:4] == "HTTP":
291 def _do(self, method, args=()):
292 """Shorthand for add_task."""
293 return self._task_runner.add_task(method, args)
294 def _do_write(self, file_descriptor, method):
295 """Shorthand for add_write."""
296 return self._task_runner.add_write_task(file_descriptor, method)
297 def _do_read(self, file_descriptor, method):
298 """Shorthand for add_read."""
299 return self._task_runner.add_read_task(file_descriptor, method)
300 def _do_to(self, timeout, method):
301 """Shorthand for add_delay."""
302 return self._task_runner.add_delay_task(timeout, method)
304 ##############################################
305 # PRIVATE PROTOCOL METHODS
306 ##############################################
308 def _connect_start(self, host, port):
309 """Start non-blocking connect."""
310 self._log("Connect Start")
311 error = self._sock.connect_ex((host, port))
312 if error != errno.EINPROGRESS:
313 self._abort(error, "Non-Blocking Connect Failed")
315 self._state = HTTPRequest.STATE_CONNECT_STARTED
316 self._conn_task = self._do_write(self._sock.fileno(),
317 self._handle_connect_ok)
318 self._conn_to_task = self._do_to(self._conn_to,
319 self._handle_connect_to)
321 def _handle_connect_ok(self):
323 Handler successful connect.
325 In fact, certain unsuccessful connects may not be detected
326 before write is attempted on the socket.
328 self._log("Connect OK")
329 if self._state != HTTPRequest.STATE_CONNECT_STARTED:
330 raise HTTPRequestError, "Illegal Operation given protocol State"
331 self._state = HTTPRequest.STATE_CONNECT_OK
332 self._conn_task.cancel()
333 self._conn_to_task.cancel()
334 # Start sending the Request
337 def _handle_connect_to(self):
338 """Handle connect timeout."""
339 self._log("Connect Timeout")
340 if self._state != HTTPRequest.STATE_CONNECT_STARTED:
341 raise HTTPRequestError, "Illegal Operation given protocol State"
342 self._connect_attempts += 1
343 if self._connect_attempts >= self._max_connect_attempts:
345 self._conn_task.cancel()
346 self._abort(errno.ETIME, "Connect Timeout")
349 self._conn_to_task = self._do_to(self._conn_to,
350 self._handle_connect_to)
354 Start sending a request.
355 Or continue sending a partially sent request.
357 self._send_count += 1
358 first_attempt = True if self._send_count == 1 else False
359 if first_attempt and self._state != HTTPRequest.STATE_CONNECT_OK:
360 raise HTTPRequestError, "Illegal Operation given protocol State"
361 elif not first_attempt and \
362 self._state != HTTPRequest.STATE_SEND_STARTED:
363 raise HTTPRequestError, "Illegal Operation given protocol State"
365 self._state = HTTPRequest.STATE_SEND_STARTED
366 self._log("Send Started")
367 else: self._log("Send Continue")
371 bytes_sent = self._sock.send(self._request_data[self._bytes:])
372 except socket.error, why:
373 if why[0] == errno.EAGAIN:
374 # Send on full buffer
375 # Continue sending again once the socket becomes writeable
376 self._send_continue()
379 # Typically EPIPE: Broken Pipe or ECONNREFUSED
381 self._send_task.cancel()
382 self._abort(why[0], "Exception on Send")
385 # Send Operation returned naturally
388 self._bytes += bytes_sent
389 if self._bytes >= len(self._request_data):
390 # The complete message was sent
391 self._state = HTTPRequest.STATE_SEND_OK
392 self._task_runner.add_task(self._handle_send_ok)
395 # Message only partially sent
396 self._send_continue()
399 # 0 bytes sent => error
401 self._send_task.cancel()
402 msg = "Sent 0 bytes, yet fd was writeable and no exception occurred"
403 self._abort(errno.EPIPE, msg)
405 def _send_continue(self):
406 """Register new write task after request was only partially sent."""
407 # Register a new Write Task
408 if not self._send_task:
409 self._send_task = self._do_write(self._sock.fileno(),
412 def _handle_send_ok(self):
413 """Handle completely sent request."""
415 if self._state != HTTPRequest.STATE_SEND_OK:
416 raise HTTPRequestError, "Illegal Operation given protocol State"
419 self._send_task.cancel()
420 # Start waiting for the response
421 self._recv_task = self._do_read(self._sock.fileno(), self._recv)
422 # Define new Receive Timeout
423 self._recv_to_task = self._do_to(self._recv_to,
424 self._handle_recv_to)
428 Start receiveing the response.
429 Or continue to receive a partially received response.
431 self._recv_count += 1
432 first_attempt = True if self._recv_count == 1 else False
433 if first_attempt and self._state != HTTPRequest.STATE_SEND_OK:
434 raise HTTPRequestError, "Illegal Operation given protocol State"
435 elif not first_attempt and \
436 self._state != HTTPRequest.STATE_RECV_STARTED:
437 raise HTTPRequestError, "Illegal Operation given protocol State"
439 self._state = HTTPRequest.STATE_RECV_STARTED
440 self._log("Recv Started")
442 self._log("Recv Continue")
446 data = self._sock.recv(1024)
447 except socket.error, why:
448 if why[0] == errno.EAGAIN:
449 # EAGAIN: Enter/stay in writeset
450 self._recv_continue()
453 # EPIPE: Broken Pipe.
455 self._recv_task.cancel()
456 self._abort(why[0], "Exception on Recv")
459 # Recv completed naturally
461 self._response_data += data
462 # Parse HTTP response
464 tokens = self._response_data.split(self._delimiter, 1)
466 # Header Not Complete
467 self._recv_continue()
471 self._header = tokens[0]
472 if not self._http_header_ok():
473 self._abort(errno.EBADMSG, "Not HTTP Header")
476 # Header complete and OK
477 self._length = len(self._header) + \
478 len(self._delimiter) + self._get_content_length()
481 # Header is received, entire body may not be received
482 if len(self._response_data) < self._length:
483 # Entire body not received
484 self._recv_continue()
486 # Entire response received (may be too long)
487 if len(self._response_data) > self._length:
489 self._response_data = self._response_data[:self._length]
490 # Entire response received (correct length)
492 self._recv_task.cancel()
493 self._recv_to_task.cancel()
494 self._state = HTTPRequest.STATE_RECV_OK
495 self._do(self._handle_recv_ok)
497 # Did not block, but received no data => error
498 self._recv_task.cancel()
499 msg = "Recv 0 bytes, yet fd was readable and no exception occurred"
500 self._abort(errno.EPIPE, msg)
502 def _recv_continue(self):
503 """Register read task to continue to receive a
504 partially received response."""
505 # Make sure a ReadTask is registered.
506 if not self._recv_task:
507 self._recv_task = self._do_read(self._sock.fileno(),
510 def _handle_recv_to(self):
511 """Handle receive timeout."""
512 self._log("Receive Timeout")
513 if self._state != HTTPRequest.STATE_SEND_OK:
514 raise HTTPRequestError, "Illegal Operation given protocol State"
517 self._recv_task.cancel()
518 self._abort(errno.ETIME, "Receive Timeout")
520 def _handle_recv_ok(self):
521 """Handle completely received response."""
522 self._log("Receive OK")
523 if self._state != HTTPRequest.STATE_RECV_OK:
524 raise HTTPRequestError, "Illegal Operation given protocol State"
526 body = self._response_data[len(self._header) + len(self._delimiter):]
527 self._state = HTTPRequest.STATE_DONE
528 args = (self._request_id, self._header, body)
529 self._do(self._recv_handler, args)
531 def _abort(self, error, comment):
532 """Abort this request-response protocol."""
533 fmt = "Abort [%d] %s '%s' (%s)"
534 self._log(fmt % (error, errno.errorcode[error], \
535 os.strerror(error), comment))
536 self._state = HTTPRequest.STATE_DONE
537 args = (self._request_id, error, comment)
538 self._do(self._abort_handler, args)
542 ##############################################
544 ##############################################
546 if __name__ == '__main__':
549 """Mock-up Logger."""
550 def log(self, tag, msg):
554 LOGGER = _MockLogger()
556 import BaseLib.UPnP.common.taskrunner as taskrunner
559 TR = taskrunner.TaskRunner()
561 HOME = "192.168.1.235"
562 WORK = "193.156.106.130"
566 if len(sys.argv) > 1:
567 PORT = int(sys.argv[1])
571 if len(sys.argv) > 2 and sys.argv[2] == 'home':
574 HTTP_REQUEST = "NOTIFY /path HTTP/1.1\r\nContent-length:0\r\n\r\n"
576 def response_handler(rid, header, body):
577 """Response handler."""
583 def abort_handler(rid, error, comment):
585 fmt = "Abort [%d] %s '%s' (%s) [%d]"
586 print fmt % (error, errno.errorcode[error],
587 os.strerror(error), comment, rid)
590 class _TestSynchHTTPClient:
591 """Runnable test class for Synchronous HTTPClient."""
593 def __init__(self, asynch_httpclient):
594 self._synch_httpclient = SynchHTTPClient(asynch_httpclient)
597 """Run the blocking connect-request-response protocol."""
598 status, reply = self._synch_httpclient.request(HOST,
600 if status == SynchHTTPClient.OK:
604 elif status == SynchHTTPClient.FAIL:
608 # Test Asynch HTTP Client
609 ASYNCH = AsynchHTTPClient(TR, logger=LOGGER)
610 RID = ASYNCH.get_request_id()
611 ASYNCH.request(RID, HOST, PORT,
612 HTTP_REQUEST, abort_handler, response_handler)
614 # Test Synch HTTP Client
616 SYNCH = _TestSynchHTTPClient(ASYNCH)
617 THREAD = threading.Thread(target=SYNCH.run)
622 except KeyboardInterrupt: