8 from base import LoadBalancer
12 class HTTPReqWorker(threading.Thread):
14 Worker thread which requests load of a CIS.
17 def __init__(self, id, queue_in, queue_out):
19 threading.Thread.__init__(self, \
20 name = '%s%02d' % (self.__class__.__name__, id))
22 self.queue_in = queue_in
23 self.queue_out = queue_out
28 url = self.queue_in.get()
31 f = urllib.urlopen(url + 'get_load')
33 parsed_r = json.loads(r)
35 self.queue_out.put( (url, None) )
36 logger.log_msg('%s: Failed to request load to %s' \
38 logger.LOG_LEVEL_ERROR)
41 # Put response load to the output queue.
42 self.queue_out.put( (url, parsed_r['load']) )
43 logger.log_msg('%s: Received load %s from %s' \
44 % (self.name, parsed_r['load'], url), \
45 logger.LOG_LEVEL_INFO)
47 class RandomizedSuboptimalLoadBalancer(LoadBalancer):
49 def __init__(self, id, queue):
51 LoadBalancer.__init__(self, id, queue)
53 # Number of CIS machines that are going to be asked about their load.
54 self.k = config.RANDOMIZED_SUBOPTIMAL_LB_K
55 # Queue of load request tasks for HTTPReqWorker.
56 self.tasks_queue = Queue.Queue()
57 # Queue of CIS loads populated by HTTPReqWorker-s.
58 self.loads_queue = Queue.Queue()
60 # Start HTTPReqWorker-s.
61 self.http_req_workers = []
62 for i in range(0, config.HTTP_THREADS_COUNT):
63 http_req_worker = HTTPReqWorker(i, self.tasks_queue, \
65 http_req_worker.daemon = True
66 http_req_worker.start()
67 self.http_req_workers.append(http_req_worker)
70 def choose(self, urls):
72 self.tasks_queue.queue.clear()
73 self.loads_queue.queue.clear()
76 # Choose k CIS machines.
77 k_urls = self.subset(urls)
79 # Find out their load by giving tasks to HTTPReqWorker-s.
81 self.tasks_queue.put(url)
83 # Wait for load answers from HTTPReqWorker-s and choose the least
86 best_load = sys.maxint
87 for i in range(0, self.k):
88 (url, load) = self.loads_queue.get()
96 logger.log_msg('Got %s %s' % (url, load), \
97 logger.LOG_LEVEL_DEBUG)
104 #del( urls[ urls.index(best_url) ] )
106 logger.log_msg('Returning best_url = "%s"' % best_url, \
107 logger.LOG_LEVEL_DEBUG)
111 def subset(self, _set):
113 Returns a subset of _set with at most self.k items and deletes those
119 for i in range(0, self.k):
123 index = random.randint(0, len(_set) - 1)