X-Git-Url: http://p2p-next.cs.pub.ro/gitweb/?p=living-lab-site.git;a=blobdiff_plain;f=cis%2Fcis_lb%2Fload_balancer%2Frandomized_suboptimal_lb.py;fp=cis%2Fcis_lb%2Fload_balancer%2Frandomized_suboptimal_lb.py;h=db697b6c211010f897e322b80ba4f9e944bd5206;hp=0000000000000000000000000000000000000000;hb=fc2f2e5044ce1ab0a0d94387a2771d46adb2b10f;hpb=93443dbebf465e060f37cc37c357b845d8153f7b diff --git a/cis/cis_lb/load_balancer/randomized_suboptimal_lb.py b/cis/cis_lb/load_balancer/randomized_suboptimal_lb.py new file mode 100644 index 0000000..db697b6 --- /dev/null +++ b/cis/cis_lb/load_balancer/randomized_suboptimal_lb.py @@ -0,0 +1,129 @@ +import sys +import random +import urllib +import threading +import Queue +import json + +from base import LoadBalancer +import config +import logger + +class HTTPReqWorker(threading.Thread): + """ + Worker thread which requests load of a CIS. + """ + + def __init__(self, id, queue_in, queue_out): + + threading.Thread.__init__(self, \ + name = '%s%02d' % (self.__class__.__name__, id)) + + self.queue_in = queue_in + self.queue_out = queue_out + + def run(self): + + while True: + url = self.queue_in.get() + + try: + f = urllib.urlopen(url + 'get_load') + r = f.read() + parsed_r = json.loads(r) + except IOError: + self.queue_out.put( (url, None) ) + logger.log_msg('%s: Failed to request load to %s' \ + % (self.name, url), \ + logger.LOG_LEVEL_ERROR) + continue + + # Put response load to the output queue. + self.queue_out.put( (url, parsed_r['load']) ) + logger.log_msg('%s: Received load %s from %s' \ + % (self.name, parsed_r['load'], url), \ + logger.LOG_LEVEL_INFO) + +class RandomizedSuboptimalLoadBalancer(LoadBalancer): + + def __init__(self, id, queue): + + LoadBalancer.__init__(self, id, queue) + + # Number of CIS machines that are going to be asked about their load. + self.k = config.RANDOMIZED_SUBOPTIMAL_LB_K + # Queue of load request tasks for HTTPReqWorker. + self.tasks_queue = Queue.Queue() + # Queue of CIS loads populated by HTTPReqWorker-s. + self.loads_queue = Queue.Queue() + + # Start HTTPReqWorker-s. + self.http_req_workers = [] + for i in range(0, config.HTTP_THREADS_COUNT): + http_req_worker = HTTPReqWorker(i, self.tasks_queue, \ + self.loads_queue) + http_req_worker.daemon = True + http_req_worker.start() + self.http_req_workers.append(http_req_worker) + + + def choose(self, urls): + + self.tasks_queue.queue.clear() + self.loads_queue.queue.clear() + + while len(urls) != 0: + # Choose k CIS machines. + k_urls = self.subset(urls) + + # Find out their load by giving tasks to HTTPReqWorker-s. + for url in k_urls: + self.tasks_queue.put(url) + + # Wait for load answers from HTTPReqWorker-s and choose the least + # loaded CIS machine. + best_url = None + best_load = sys.maxint + for i in range(0, self.k): + (url, load) = self.loads_queue.get() + + if load == None: + continue + else: + load = int(load) + + if load < best_load: + logger.log_msg('Got %s %s' % (url, load), \ + logger.LOG_LEVEL_DEBUG) + best_load = load + best_url = url + + if best_url != None: + break + + #del( urls[ urls.index(best_url) ] ) + + logger.log_msg('Returning best_url = "%s"' % best_url, \ + logger.LOG_LEVEL_DEBUG) + return best_url + + + def subset(self, _set): + """ + Returns a subset of _set with at most self.k items and deletes those + items from _set. + """ + + _subset = [] + + for i in range(0, self.k): + if len(_set) == 0: + break + + index = random.randint(0, len(_set) - 1) + item = _set[index] + _subset.append(item) + del(_set[index]) + + return _subset + \ No newline at end of file