From fc2f2e5044ce1ab0a0d94387a2771d46adb2b10f Mon Sep 17 00:00:00 2001 From: =?utf8?q?C=C4=83lin-Andrei=20Burloiu?= Date: Mon, 20 Feb 2012 18:28:17 +0200 Subject: [PATCH] CIS-LB: Randomized suboptimal load balancing implemented without low timeout features. --- cis/cis_lb/config.py | 7 +- cis/cis_lb/load_balancer/base.py | 4 +- .../load_balancer/randomized_suboptimal_lb.py | 129 ++++++++++++++++++ cis/dummy_cis.py | 3 +- 4 files changed, 139 insertions(+), 4 deletions(-) create mode 100644 cis/cis_lb/load_balancer/randomized_suboptimal_lb.py diff --git a/cis/cis_lb/config.py b/cis/cis_lb/config.py index 208c957..c152e1c 100644 --- a/cis/cis_lb/config.py +++ b/cis/cis_lb/config.py @@ -16,8 +16,11 @@ CIS_URLS = [ \ # http:///video/cis_error . WS_ERROR = 'http://p2p-next.cs.pub.ro/devel/video/cis_error' -import load_balancer.random_lb -LOAD_BALANCER = load_balancer.random_lb.RandomLoadBalancer +#import load_balancer.random_lb +import load_balancer.randomized_suboptimal_lb +#LOAD_BALANCER = load_balancer.random_lb.RandomLoadBalancer +LOAD_BALANCER = load_balancer.randomized_suboptimal_lb.RandomizedSuboptimalLoadBalancer +RANDOMIZED_SUBOPTIMAL_LB_K = 3 import logger diff --git a/cis/cis_lb/load_balancer/base.py b/cis/cis_lb/load_balancer/base.py index e81baee..f44ab1b 100644 --- a/cis/cis_lb/load_balancer/base.py +++ b/cis/cis_lb/load_balancer/base.py @@ -24,6 +24,7 @@ class LoadBalancer(threading.Thread): (request, data) = self.queue.get() urls = config.CIS_URLS[:] code = json.loads(data)['code'] + success = False while len(urls) != 0: cis = self.choose(urls) @@ -37,12 +38,13 @@ class LoadBalancer(threading.Thread): logger.LOG_LEVEL_ERROR) continue + success = True logger.log_msg('#%s: Request forwarded to %s' \ % (code, cis), \ logger.LOG_LEVEL_INFO) break - if len(urls) == 0: + if len(urls) == 0 and not success: logger.log_msg('#%s: Failed to forward request to any CIS' \ % code, \ logger.LOG_LEVEL_FATAL) diff --git a/cis/cis_lb/load_balancer/randomized_suboptimal_lb.py b/cis/cis_lb/load_balancer/randomized_suboptimal_lb.py new file mode 100644 index 0000000..db697b6 --- /dev/null +++ b/cis/cis_lb/load_balancer/randomized_suboptimal_lb.py @@ -0,0 +1,129 @@ +import sys +import random +import urllib +import threading +import Queue +import json + +from base import LoadBalancer +import config +import logger + +class HTTPReqWorker(threading.Thread): + """ + Worker thread which requests load of a CIS. + """ + + def __init__(self, id, queue_in, queue_out): + + threading.Thread.__init__(self, \ + name = '%s%02d' % (self.__class__.__name__, id)) + + self.queue_in = queue_in + self.queue_out = queue_out + + def run(self): + + while True: + url = self.queue_in.get() + + try: + f = urllib.urlopen(url + 'get_load') + r = f.read() + parsed_r = json.loads(r) + except IOError: + self.queue_out.put( (url, None) ) + logger.log_msg('%s: Failed to request load to %s' \ + % (self.name, url), \ + logger.LOG_LEVEL_ERROR) + continue + + # Put response load to the output queue. + self.queue_out.put( (url, parsed_r['load']) ) + logger.log_msg('%s: Received load %s from %s' \ + % (self.name, parsed_r['load'], url), \ + logger.LOG_LEVEL_INFO) + +class RandomizedSuboptimalLoadBalancer(LoadBalancer): + + def __init__(self, id, queue): + + LoadBalancer.__init__(self, id, queue) + + # Number of CIS machines that are going to be asked about their load. + self.k = config.RANDOMIZED_SUBOPTIMAL_LB_K + # Queue of load request tasks for HTTPReqWorker. + self.tasks_queue = Queue.Queue() + # Queue of CIS loads populated by HTTPReqWorker-s. + self.loads_queue = Queue.Queue() + + # Start HTTPReqWorker-s. + self.http_req_workers = [] + for i in range(0, config.HTTP_THREADS_COUNT): + http_req_worker = HTTPReqWorker(i, self.tasks_queue, \ + self.loads_queue) + http_req_worker.daemon = True + http_req_worker.start() + self.http_req_workers.append(http_req_worker) + + + def choose(self, urls): + + self.tasks_queue.queue.clear() + self.loads_queue.queue.clear() + + while len(urls) != 0: + # Choose k CIS machines. + k_urls = self.subset(urls) + + # Find out their load by giving tasks to HTTPReqWorker-s. + for url in k_urls: + self.tasks_queue.put(url) + + # Wait for load answers from HTTPReqWorker-s and choose the least + # loaded CIS machine. + best_url = None + best_load = sys.maxint + for i in range(0, self.k): + (url, load) = self.loads_queue.get() + + if load == None: + continue + else: + load = int(load) + + if load < best_load: + logger.log_msg('Got %s %s' % (url, load), \ + logger.LOG_LEVEL_DEBUG) + best_load = load + best_url = url + + if best_url != None: + break + + #del( urls[ urls.index(best_url) ] ) + + logger.log_msg('Returning best_url = "%s"' % best_url, \ + logger.LOG_LEVEL_DEBUG) + return best_url + + + def subset(self, _set): + """ + Returns a subset of _set with at most self.k items and deletes those + items from _set. + """ + + _subset = [] + + for i in range(0, self.k): + if len(_set) == 0: + break + + index = random.randint(0, len(_set) - 1) + item = _set[index] + _subset.append(item) + del(_set[index]) + + return _subset + \ No newline at end of file diff --git a/cis/dummy_cis.py b/cis/dummy_cis.py index cb94a1d..5ccf928 100755 --- a/cis/dummy_cis.py +++ b/cis/dummy_cis.py @@ -2,6 +2,7 @@ import web import sys +import json urls = ( '/(.*)', 'Hello' @@ -13,7 +14,7 @@ print 'load is %s' % LOAD app = web.application(urls, globals()) class Hello: - def GET(self, name): + def GET(self, request): if request == 'get_load': resp = {"load": LOAD} web.header('Content-Type', 'application/json') -- 2.20.1