CIS-LB: Randomized suboptimal load balancing implemented without low timeout features.
[living-lab-site.git] / cis / cis_lb / load_balancer / randomized_suboptimal_lb.py
1 import sys
2 import random
3 import urllib
4 import threading
5 import Queue
6 import json
7
8 from base import LoadBalancer
9 import config
10 import logger
11
12 class HTTPReqWorker(threading.Thread):
13     """
14     Worker thread which requests load of a CIS.
15     """
16     
17     def __init__(self, id, queue_in, queue_out):
18
19         threading.Thread.__init__(self, \
20                 name = '%s%02d' % (self.__class__.__name__, id))
21         
22         self.queue_in = queue_in
23         self.queue_out = queue_out
24     
25     def run(self):
26         
27         while True:
28             url = self.queue_in.get()
29             
30             try:
31                 f = urllib.urlopen(url + 'get_load')
32                 r = f.read()
33                 parsed_r = json.loads(r)
34             except IOError:
35                 self.queue_out.put( (url, None) )
36                 logger.log_msg('%s: Failed to request load to %s' \
37                         % (self.name, url), \
38                         logger.LOG_LEVEL_ERROR)
39                 continue
40             
41             # Put response load to the output queue.
42             self.queue_out.put( (url, parsed_r['load']) )
43             logger.log_msg('%s: Received load %s from %s' \
44                         % (self.name, parsed_r['load'], url), \
45                     logger.LOG_LEVEL_INFO)
46
47 class RandomizedSuboptimalLoadBalancer(LoadBalancer):
48     
49     def __init__(self, id, queue):
50         
51         LoadBalancer.__init__(self, id, queue)
52         
53         # Number of CIS machines that are going to be asked about their load.
54         self.k = config.RANDOMIZED_SUBOPTIMAL_LB_K
55         # Queue of load request tasks for HTTPReqWorker.
56         self.tasks_queue = Queue.Queue()
57         # Queue of CIS loads populated by HTTPReqWorker-s.
58         self.loads_queue = Queue.Queue()
59         
60         # Start HTTPReqWorker-s.
61         self.http_req_workers = []
62         for i in range(0, config.HTTP_THREADS_COUNT):
63             http_req_worker = HTTPReqWorker(i, self.tasks_queue, \
64                     self.loads_queue)
65             http_req_worker.daemon = True
66             http_req_worker.start()
67             self.http_req_workers.append(http_req_worker)
68     
69     
70     def choose(self, urls):
71         
72         self.tasks_queue.queue.clear()
73         self.loads_queue.queue.clear()
74         
75         while len(urls) != 0:
76             # Choose k CIS machines.
77             k_urls = self.subset(urls)
78             
79             # Find out their load by giving tasks to HTTPReqWorker-s.
80             for url in k_urls:
81                 self.tasks_queue.put(url)
82             
83             # Wait for load answers from HTTPReqWorker-s and choose the least
84             # loaded CIS machine.
85             best_url = None
86             best_load = sys.maxint
87             for i in range(0, self.k):
88                 (url, load) = self.loads_queue.get()
89                 
90                 if load == None:
91                     continue
92                 else:
93                     load = int(load)
94                 
95                 if load < best_load:
96                     logger.log_msg('Got %s %s' % (url, load), \
97                             logger.LOG_LEVEL_DEBUG)
98                     best_load = load
99                     best_url = url
100             
101             if best_url != None:
102                 break
103         
104         #del( urls[ urls.index(best_url) ] )
105         
106         logger.log_msg('Returning best_url = "%s"' % best_url, \
107                 logger.LOG_LEVEL_DEBUG)
108         return best_url
109     
110     
111     def subset(self, _set):
112         """
113         Returns a subset of _set with at most self.k items and deletes those
114         items from _set.
115         """
116         
117         _subset = []
118         
119         for i in range(0, self.k):
120             if len(_set) == 0:
121                 break
122             
123             index = random.randint(0, len(_set) - 1)
124             item = _set[index]
125             _subset.append(item)
126             del(_set[index])
127         
128         return _subset
129