instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / CacheDB / sqlitecachedb.py
1 # Written by Jie Yang
2 # see LICENSE.txt for license information
3
4 import sys
5 import os
6 from time import sleep
7 from base64 import encodestring, decodestring
8 import threading
9 from traceback import print_exc, print_stack
10
11 from BaseLib.Core.simpledefs import INFOHASH_LENGTH
12 from BaseLib.__init__ import LIBRARYNAME
13 from BaseLib.Core.Utilities.unicode import dunno2unicode
14
15 # ONLY USE APSW >= 3.5.9-r1
16 import apsw
17 #support_version = (3,5,9)
18 #support_version = (3,3,13)
19 #apsw_version = tuple([int(r) for r in apsw.apswversion().split('-')[0].split('.')])
20 ##print apsw_version
21 #assert apsw_version >= support_version, "Required APSW Version >= %d.%d.%d."%support_version + " But your version is %d.%d.%d.\n"%apsw_version + \
22 #                        "Please download and install it from http://code.google.com/p/apsw/"
23
24 ##Changed from 4 to 5 by andrea for subtitles support
25 CURRENT_MAIN_DB_VERSION = 5
26
27 TEST_SQLITECACHEDB_UPGRADE = False
28 CREATE_SQL_FILE = None
29 CREATE_SQL_FILE_POSTFIX = os.path.join(LIBRARYNAME, 'schema_sdb_v'+str(CURRENT_MAIN_DB_VERSION)+'.sql')
30 DB_FILE_NAME = 'tribler.sdb'
31 DB_DIR_NAME = 'sqlite'    # db file path = DB_DIR_NAME/DB_FILE_NAME
32 DEFAULT_BUSY_TIMEOUT = 10000
33 MAX_SQL_BATCHED_TO_TRANSACTION = 1000   # don't change it unless carefully tested. A transaction with 1000 batched updates took 1.5 seconds
34 NULL = None
35 icon_dir = None
36 SHOW_ALL_EXECUTE = False
37 costs = []
38 cost_reads = []
39 torrent_dir = None
40 config_dir = None
41 TEST_OVERRIDE = False
42
43
44 DEBUG = False
45
46 class Warning(Exception):
47     pass
48
49 def init(config, db_exception_handler = None):
50     """ create sqlite database """
51     global CREATE_SQL_FILE
52     global icon_dir
53     global torrent_dir
54     global config_dir
55     torrent_dir = os.path.abspath(config['torrent_collecting_dir'])
56     config_dir = config['state_dir']
57     install_dir = config['install_dir']
58     CREATE_SQL_FILE = os.path.join(install_dir,CREATE_SQL_FILE_POSTFIX)
59     sqlitedb = SQLiteCacheDB.getInstance(db_exception_handler)
60     
61     if config['superpeer']:
62         sqlite_db_path = ':memory:'
63     else:   
64         sqlite_db_path = os.path.join(config_dir, DB_DIR_NAME, DB_FILE_NAME)
65     print >>sys.stderr,"cachedb: init: SQL FILE",sqlite_db_path        
66
67     icon_dir = os.path.abspath(config['peer_icon_path'])
68
69     sqlitedb.initDB(sqlite_db_path, CREATE_SQL_FILE)  # the first place to create db in Tribler
70     return sqlitedb
71         
72 def done(config_dir):
73     SQLiteCacheDB.getInstance().close()
74
75 def make_filename(config_dir,filename):
76     if config_dir is None:
77         return filename
78     else:
79         return os.path.join(config_dir,filename)    
80     
81 def bin2str(bin):
82     # Full BASE64-encoded 
83     return encodestring(bin).replace("\n","")
84     
85 def str2bin(str):
86     return decodestring(str)
87
88 def print_exc_plus():
89     """
90     Print the usual traceback information, followed by a listing of all the
91     local variables in each frame.
92     http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52215
93     http://initd.org/pub/software/pysqlite/apsw/3.3.13-r1/apsw.html#augmentedstacktraces
94     """
95
96     tb = sys.exc_info()[2]
97     stack = []
98     
99     while tb:
100         stack.append(tb.tb_frame)
101         tb = tb.tb_next
102
103     print_exc()
104     print >> sys.stderr, "Locals by frame, innermost last"
105
106     for frame in stack:
107         print >> sys.stderr
108         print >> sys.stderr, "Frame %s in %s at line %s" % (frame.f_code.co_name,
109                                              frame.f_code.co_filename,
110                                              frame.f_lineno)
111         for key, value in frame.f_locals.items():
112             print >> sys.stderr, "\t%20s = " % key,
113             #We have to be careful not to cause a new error in our error
114             #printer! Calling str() on an unknown object could cause an
115             #error we don't want.
116             try:                   
117                 print >> sys.stderr, value
118             except:
119                 print >> sys.stderr, "<ERROR WHILE PRINTING VALUE>"
120
121 class safe_dict(dict): 
122     def __init__(self, *args, **kw): 
123         self.lock = threading.RLock() 
124         dict.__init__(self, *args, **kw) 
125         
126     def __getitem__(self, key): 
127         self.lock.acquire()
128         try:
129             return dict.__getitem__(self, key) 
130         finally:
131             self.lock.release()
132             
133     def __setitem__(self, key, value): 
134         self.lock.acquire()
135         try:
136             dict.__setitem__(self, key, value) 
137         finally:
138             self.lock.release()
139             
140     def __delitem__(self, key): 
141         self.lock.acquire()
142         try:
143             dict.__delitem__(self, key) 
144         finally:
145             self.lock.release()
146
147     def __contains__(self, key):
148         self.lock.acquire()
149         try:
150             return dict.__contains__(self, key) 
151         finally:
152             self.lock.release()
153             
154     def values(self):
155         self.lock.acquire()
156         try:
157             return dict.values(self) 
158         finally:
159             self.lock.release()
160
161 class SQLiteCacheDBBase:
162     lock = threading.RLock()
163
164     def __init__(self,db_exception_handler=None):
165         self.exception_handler = db_exception_handler
166         self.cursor_table = safe_dict()    # {thread_name:cur}
167         self.cache_transaction_table = safe_dict()   # {thread_name:[sql]
168         self.class_variables = safe_dict({'db_path':None,'busytimeout':None})  # busytimeout is in milliseconds
169         
170         self.permid_id = safe_dict()    
171         self.infohash_id = safe_dict()
172         self.show_execute = False
173         
174         #TODO: All global variables must be protected to be thread safe?
175         self.status_table = None
176         self.category_table = None
177         self.src_table = None
178         self.applied_pragma_sync_norm = False
179         
180     def __del__(self):
181         self.close()
182     
183     def close(self, clean=False):
184         # only close the connection object in this thread, don't close other thread's connection object
185         thread_name = threading.currentThread().getName()
186         cur = self.getCursor(create=False)
187         
188         if cur:
189             con = cur.getconnection()
190             cur.close()
191             con.close()
192             con = None
193             del self.cursor_table[thread_name]
194             # Arno, 2010-01-25: Remove entry in cache_transaction_table for this thread
195             try:
196                 if thread_name in self.cache_transaction_table.keys(): 
197                     del self.cache_transaction_table[thread_name]
198             except:
199                 print_exc()
200         if clean:    # used for test suite
201             self.permid_id = safe_dict()
202             self.infohash_id = safe_dict()
203             self.exception_handler = None
204             self.class_variables = safe_dict({'db_path':None,'busytimeout':None})
205             self.cursor_table = safe_dict()
206             self.cache_transaction_table = safe_dict()
207             
208             
209     # --------- static functions --------
210     def getCursor(self, create=True):
211         thread_name = threading.currentThread().getName()
212         curs = self.cursor_table
213         cur = curs.get(thread_name, None)    # return [cur, cur, lib] or None
214         #print >> sys.stderr, '-------------- getCursor::', len(curs), time(), curs.keys()
215         if cur is None and create:
216             self.openDB(self.class_variables['db_path'], self.class_variables['busytimeout'])    # create a new db obj for this thread
217             cur = curs.get(thread_name)
218         
219         return cur
220        
221     def openDB(self, dbfile_path=None, busytimeout=DEFAULT_BUSY_TIMEOUT):
222         """ 
223         Open a SQLite database. Only one and the same database can be opened.
224         @dbfile_path       The path to store the database file. 
225                            Set dbfile_path=':memory:' to create a db in memory.
226         @busytimeout       Set the maximum time, in milliseconds, that SQLite will wait if the database is locked. 
227         """
228
229         # already opened a db in this thread, reuse it
230         thread_name = threading.currentThread().getName()
231         #print >>sys.stderr,"sqlcachedb: openDB",dbfile_path,thread_name
232         if thread_name in self.cursor_table:
233             #assert dbfile_path == None or self.class_variables['db_path'] == dbfile_path
234             return self.cursor_table[thread_name]
235
236         assert dbfile_path, "You must specify the path of database file"
237         
238         if dbfile_path.lower() != ':memory:':
239             db_dir,db_filename = os.path.split(dbfile_path)
240             if db_dir and not os.path.isdir(db_dir):
241                 os.makedirs(db_dir)            
242         
243         con = apsw.Connection(dbfile_path)
244         con.setbusytimeout(busytimeout)
245
246         cur = con.cursor()
247         self.cursor_table[thread_name] = cur
248         
249         if not self.applied_pragma_sync_norm:
250             # http://www.sqlite.org/pragma.html
251             # When synchronous is NORMAL, the SQLite database engine will still
252             # pause at the most critical moments, but less often than in FULL 
253             # mode. There is a very small (though non-zero) chance that a power
254             # failure at just the wrong time could corrupt the database in 
255             # NORMAL mode. But in practice, you are more likely to suffer a 
256             # catastrophic disk failure or some other unrecoverable hardware 
257             # fault.
258             #
259             self.applied_pragma_sync_norm = True 
260             cur.execute("PRAGMA synchronous = NORMAL;")
261             
262         return cur
263     
264     def createDBTable(self, sql_create_table, dbfile_path, busytimeout=DEFAULT_BUSY_TIMEOUT):
265         """ 
266         Create a SQLite database.
267         @sql_create_table  The sql statements to create tables in the database. 
268                            Every statement must end with a ';'.
269         @dbfile_path       The path to store the database file. Set dbfile_path=':memory:' to creates a db in memory.
270         @busytimeout       Set the maximum time, in milliseconds, that SQLite will wait if the database is locked.
271                            Default = 10000 milliseconds   
272         """
273         cur = self.openDB(dbfile_path, busytimeout)
274         print dbfile_path
275         cur.execute(sql_create_table)  # it is suggested to include begin & commit in the script
276
277     def initDB(self, sqlite_filepath,
278                create_sql_filename = None, 
279                busytimeout = DEFAULT_BUSY_TIMEOUT,
280                check_version = True,
281                current_db_version = CURRENT_MAIN_DB_VERSION):
282         """ 
283         Create and initialize a SQLite database given a sql script. 
284         Only one db can be opened. If the given dbfile_path is different with the opened DB file, warn and exit
285         @configure_dir     The directory containing 'bsddb' directory 
286         @sql_filename      The path of sql script to create the tables in the database
287                            Every statement must end with a ';'. 
288         @busytimeout       Set the maximum time, in milliseconds, to wait and retry 
289                            if failed to acquire a lock. Default = 5000 milliseconds  
290         """
291         if create_sql_filename is None:
292             create_sql_filename=CREATE_SQL_FILE
293         try:
294             self.lock.acquire()
295
296             # verify db path identity
297             class_db_path = self.class_variables['db_path']
298             if sqlite_filepath is None:     # reuse the opened db file?
299                 if class_db_path is not None:   # yes, reuse it
300                     # reuse the busytimeout
301                     return self.openDB(class_db_path, self.class_variables['busytimeout'])
302                 else:   # no db file opened
303                     raise Exception, "You must specify the path of database file when open it at the first time"
304             else:
305                 if class_db_path is None:   # the first time to open db path, store it
306
307                     #print 'quit now'
308                     #sys.exit(0)
309                     # open the db if it exists (by converting from bsd) and is not broken, otherwise create a new one
310                     # it will update the db if necessary by checking the version number
311                     self.safelyOpenTriblerDB(sqlite_filepath, create_sql_filename, busytimeout, check_version=check_version, current_db_version=current_db_version)
312                     
313                     self.class_variables = {'db_path': sqlite_filepath, 'busytimeout': int(busytimeout)}
314                     
315                     return self.openDB()    # return the cursor, won't reopen the db
316                     
317                 elif sqlite_filepath != class_db_path:  # not the first time to open db path, check if it is the same
318                     raise Exception, "Only one database file can be opened. You have opened %s and are trying to open %s." % (class_db_path, sqlite_filepath) 
319                         
320         finally:
321             self.lock.release()
322
323     def safelyOpenTriblerDB(self, dbfile_path, sql_create, busytimeout=DEFAULT_BUSY_TIMEOUT, check_version=False, current_db_version=None):
324         """
325         open the db if possible, otherwise create a new one
326         update the db if necessary by checking the version number
327         
328         safeOpenDB():    
329             try:
330                 if sqlite db doesn't exist:
331                     raise Error
332                 open sqlite db
333                 read sqlite_db_version
334                 if sqlite_db_version dosen't exist:
335                     raise Error
336             except:
337                 close and delete sqlite db if possible
338                 create new sqlite db file without sqlite_db_version
339                 write sqlite_db_version at last
340                 commit
341                 open sqlite db
342                 read sqlite_db_version
343                 # must ensure these steps after except will not fail, otherwise force to exit
344             
345             if sqlite_db_version < current_db_version:
346                 updateDB(sqlite_db_version, current_db_version)
347                 commit
348                 update sqlite_db_version at last
349                 commit
350         """
351         try:
352             if not os.path.isfile(dbfile_path):
353                 raise Warning("No existing database found. Attempting to creating a new database %s" % repr(dbfile_path))
354             
355             cur = self.openDB(dbfile_path, busytimeout)
356             if check_version:
357                 sqlite_db_version = self.readDBVersion()
358                 if sqlite_db_version == NULL or int(sqlite_db_version)<1:
359                     raise NotImplementedError
360         except Exception, exception:
361             if isinstance(exception, Warning):
362                 # user friendly warning to log the creation of a new database
363                 print >>sys.stderr, exception
364
365             else:
366                 # user unfriendly exception message because something went wrong
367                 print_exc()
368             
369             if os.path.isfile(dbfile_path):
370                 self.close(clean=True)
371                 os.remove(dbfile_path)
372             
373             if os.path.isfile(sql_create):
374                 f = open(sql_create)
375                 sql_create_tables = f.read()
376                 f.close()
377             else:
378                 raise Exception, "Cannot open sql script at %s" % os.path.realpath(sql_create)
379             
380             self.createDBTable(sql_create_tables, dbfile_path, busytimeout)  
381             if check_version:
382                 sqlite_db_version = self.readDBVersion()
383             
384         if check_version:
385             self.checkDB(sqlite_db_version, current_db_version)
386
387     def checkDB(self, db_ver, curr_ver):
388         # read MyDB and check the version number.
389         if not db_ver or not curr_ver:
390             self.updateDB(db_ver,curr_ver)
391             return
392         db_ver = int(db_ver)
393         curr_ver = int(curr_ver)
394         #print "check db", db_ver, curr_ver
395         if db_ver != curr_ver or \
396                (not config_dir is None and os.path.exists(os.path.join(config_dir, "upgradingdb.txt"))): 
397             self.updateDB(db_ver,curr_ver)
398             
399     def updateDB(self,db_ver,curr_ver):
400         pass    #TODO
401
402     def readDBVersion(self):
403         cur = self.getCursor()
404         sql = u"select value from MyInfo where entry='version'"
405         res = self.fetchone(sql)
406         if res:
407             find = list(res)
408             return find[0]    # throw error if something wrong
409         else:
410             return None
411     
412     def writeDBVersion(self, version, commit=True):
413         sql = u"UPDATE MyInfo SET value=? WHERE entry='version'"
414         self.execute_write(sql, [version], commit=commit)
415     
416     def show_sql(self, switch):
417         # temporary show the sql executed
418         self.show_execute = switch 
419     
420     # --------- generic functions -------------
421         
422     def commit(self):
423         self.transaction()
424
425     def _execute(self, sql, args=None):
426         cur = self.getCursor()
427
428         if SHOW_ALL_EXECUTE or self.show_execute:
429             thread_name = threading.currentThread().getName()
430             print >> sys.stderr, '===', thread_name, '===\n', sql, '\n-----\n', args, '\n======\n'
431         try:
432             if args is None:
433                 return cur.execute(sql)
434             else:
435                 return cur.execute(sql, args)
436         except Exception, msg:
437             if True:
438                 print_exc()
439                 print_stack()
440                 print >> sys.stderr, "cachedb: execute error:", Exception, msg 
441                 thread_name = threading.currentThread().getName()
442                 print >> sys.stderr, '===', thread_name, '===\nSQL Type:', type(sql), '\n-----\n', sql, '\n-----\n', args, '\n======\n'
443                 #return None
444                 # ARNODB: this is incorrect, it should reraise the exception
445                 # such that _transaction can rollback or recommit. 
446                 # This bug already reported by Johan
447             raise msg
448         
449
450     def execute_read(self, sql, args=None):
451         # this is only called for reading. If you want to write the db, always use execute_write or executemany
452         return self._execute(sql, args)
453     
454     def execute_write(self, sql, args=None, commit=True):
455         self.cache_transaction(sql, args)
456         if commit:
457             self.commit()
458             
459     def executemany(self, sql, args, commit=True):
460
461         thread_name = threading.currentThread().getName()
462         if thread_name not in self.cache_transaction_table:
463             self.cache_transaction_table[thread_name] = []
464         all = [(sql, arg) for arg in args]
465         self.cache_transaction_table[thread_name].extend(all)
466
467         if commit:
468             self.commit()
469             
470     def cache_transaction(self, sql, args=None):
471         thread_name = threading.currentThread().getName()
472         if thread_name not in self.cache_transaction_table:
473             self.cache_transaction_table[thread_name] = []
474         self.cache_transaction_table[thread_name].append((sql, args))
475                     
476     def transaction(self, sql=None, args=None):
477         if sql:
478             self.cache_transaction(sql, args)
479         
480         thread_name = threading.currentThread().getName()
481         
482         n = 0
483         sql_full = ''
484         arg_list = []
485         sql_queue = self.cache_transaction_table.get(thread_name,None)
486         if sql_queue:
487             while True:
488                 try:
489                     _sql,_args = sql_queue.pop(0)
490                 except IndexError:
491                     break
492                 
493                 _sql = _sql.strip()
494                 if not _sql:
495                     continue
496                 if not _sql.endswith(';'):
497                     _sql += ';'
498                 sql_full += _sql + '\n'
499                 if _args != None:
500                     arg_list += list(_args)
501                 n += 1
502                 
503                 # if too many sql in cache, split them into batches to prevent processing and locking DB for a long time
504                 # TODO: optimize the value of MAX_SQL_BATCHED_TO_TRANSACTION
505                 if n % MAX_SQL_BATCHED_TO_TRANSACTION == 0:
506                     self._transaction(sql_full, arg_list)
507                     sql_full = ''
508                     arg_list = []
509                     
510             self._transaction(sql_full, arg_list)
511             
512     def _transaction(self, sql, args=None):
513         if sql:
514             sql = 'BEGIN TRANSACTION; \n' + sql + 'COMMIT TRANSACTION;'
515             try:
516                 self._execute(sql, args)
517             except Exception,e:
518                 self.commit_retry_if_busy_or_rollback(e,0,sql=sql)
519             
520     def commit_retry_if_busy_or_rollback(self,e,tries,sql=None):
521         """ 
522         Arno:
523         SQL_BUSY errors happen at the beginning of the experiment,
524         very quickly after startup (e.g. 0.001 s), so the busy timeout
525         is not honoured for some reason. After the initial errors,
526         they no longer occur.
527         """
528         print >>sys.stderr,"sqlcachedb: commit_retry: after",str(e),repr(sql)
529         
530         if str(e).startswith("BusyError"):
531             try:
532                 self._execute("COMMIT")
533             except Exception,e2: 
534                 if tries < 5:   #self.max_commit_retries
535                     # Spec is unclear whether next commit will also has 
536                     # 'busytimeout' seconds to try to get a write lock.
537                     sleep(pow(2.0,tries+2)/100.0)
538                     self.commit_retry_if_busy_or_rollback(e2,tries+1)
539                 else:
540                     self.rollback(tries)
541                     raise Exception,e2
542         else:
543             self.rollback(tries)
544             m = "cachedb: TRANSACTION ERROR "+threading.currentThread().getName()+' '+str(e)
545             raise Exception, m
546             
547             
548     def rollback(self, tries):
549         print_exc()
550         try:
551             self._execute("ROLLBACK")
552         except Exception, e:
553             # May be harmless, see above. Unfortunately they don't specify
554             # what the error is when an attempt is made to roll back
555             # an automatically rolled back transaction.
556             m = "cachedb: ROLLBACK ERROR "+threading.currentThread().getName()+' '+str(e)
557             #print >> sys.stderr, 'SQLite Database', m
558             raise Exception, m
559    
560         
561     # -------- Write Operations --------
562     def insert(self, table_name, commit=True, **argv):
563         if len(argv) == 1:
564             sql = 'INSERT INTO %s (%s) VALUES (?);'%(table_name, argv.keys()[0])
565         else:
566             questions = '?,'*len(argv)
567             sql = 'INSERT INTO %s %s VALUES (%s);'%(table_name, tuple(argv.keys()), questions[:-1])
568         self.execute_write(sql, argv.values(), commit)
569     
570     def insertMany(self, table_name, values, keys=None, commit=True):
571         """ values must be a list of tuples """
572
573         questions = u'?,'*len(values[0])
574         if keys is None:
575             sql = u'INSERT INTO %s VALUES (%s);'%(table_name, questions[:-1])
576         else:
577             sql = u'INSERT INTO %s %s VALUES (%s);'%(table_name, tuple(keys), questions[:-1])
578         self.executemany(sql, values, commit=commit)
579     
580     def update(self, table_name, where=None, commit=True, **argv):
581         sql = u'UPDATE %s SET '%table_name
582         arg = []
583         for k,v in argv.iteritems():
584             if type(v) is tuple:
585                 sql += u'%s %s ?,' % (k, v[0])
586                 arg.append(v[1])
587             else:
588                 sql += u'%s=?,' % k
589                 arg.append(v)
590         sql = sql[:-1]
591         if where != None:
592             sql += u' where %s'%where
593         self.execute_write(sql, arg, commit)
594         
595     def delete(self, table_name, commit=True, **argv):
596         sql = u'DELETE FROM %s WHERE '%table_name
597         arg = []
598         for k,v in argv.iteritems():
599             if type(v) is tuple:
600                 sql += u'%s %s ? AND ' % (k, v[0])
601                 arg.append(v[1])
602             else:
603                 sql += u'%s=? AND ' % k
604                 arg.append(v)
605         sql = sql[:-5]
606         self.execute_write(sql, argv.values(), commit)
607     
608     # -------- Read Operations --------
609     def size(self, table_name):
610         num_rec_sql = u"SELECT count(*) FROM %s;"%table_name
611         result = self.fetchone(num_rec_sql)
612         return result
613
614     def fetchone(self, sql, args=None):
615         # returns NULL: if the result is null 
616         # return None: if it doesn't found any match results
617         find = self.execute_read(sql, args)
618         if not find:
619             return NULL
620         else:
621             find = list(find)
622             if len(find) > 0:
623                 find = find[0]
624             else:
625                 return NULL
626         if len(find)>1:
627             return find
628         else:
629             return find[0]
630            
631     def fetchall(self, sql, args=None, retry=0):
632         res = self.execute_read(sql, args)
633         if res != None:
634             find = list(res)
635             return find
636         else:
637             return []   # should it return None?
638     
639     def getOne(self, table_name, value_name, where=None, conj='and', **kw):
640         """ value_name could be a string, a tuple of strings, or '*' 
641         """
642
643         if isinstance(value_name, tuple):
644             value_names = u",".join(value_name)
645         elif isinstance(value_name, list):
646             value_names = u",".join(value_name)
647         else:
648             value_names = value_name
649             
650         if isinstance(table_name, tuple):
651             table_names = u",".join(table_name)
652         elif isinstance(table_name, list):
653             table_names = u",".join(table_name)
654         else:
655             table_names = table_name
656             
657         sql = u'select %s from %s'%(value_names, table_names)
658
659         if where or kw:
660             sql += u' where '
661         if where:
662             sql += where
663             if kw:
664                 sql += u' %s '%conj
665         if kw:
666             arg = []
667             for k,v in kw.iteritems():
668                 if type(v) is tuple:
669                     operator = v[0]
670                     arg.append(v[1])
671                 else:
672                     operator = "="
673                     arg.append(v)
674                 sql += u' %s %s ? ' % (k, operator)
675                 sql += conj
676             sql = sql[:-len(conj)]
677         else:
678             arg = None
679
680         # print >> sys.stderr, 'SQL: %s %s' % (sql, arg)
681         return self.fetchone(sql,arg)
682     
683     def getAll(self, table_name, value_name, where=None, group_by=None, having=None, order_by=None, limit=None, offset=None, conj='and', **kw):
684         """ value_name could be a string, or a tuple of strings 
685             order by is represented as order_by
686             group by is represented as group_by
687         """
688         if isinstance(value_name, tuple):
689             value_names = u",".join(value_name)
690         elif isinstance(value_name, list):
691             value_names = u",".join(value_name)
692         else:
693             value_names = value_name
694         
695         if isinstance(table_name, tuple):
696             table_names = u",".join(table_name)
697         elif isinstance(table_name, list):
698             table_names = u",".join(table_name)
699         else:
700             table_names = table_name
701             
702         sql = u'select %s from %s'%(value_names, table_names)
703         
704         if where or kw:
705             sql += u' where '
706         if where:
707             sql += where
708             if kw:
709                 sql += u' %s '%conj
710         if kw:
711             arg = []
712             for k,v in kw.iteritems():
713                 if type(v) is tuple:
714                     operator = v[0]
715                     arg.append(v[1])
716                 else:
717                     operator = "="
718                     arg.append(v)
719
720                 sql += u' %s %s ?' % (k, operator)
721                 sql += conj
722             sql = sql[:-len(conj)]
723         else:
724             arg = None
725         
726         if group_by != None:
727             sql += u' group by ' + group_by
728         if having != None:
729             sql += u' having ' + having
730         if order_by != None:
731             sql += u' order by ' + order_by    # you should add desc after order_by to reversely sort, i.e, 'last_seen desc' as order_by
732         if limit != None:
733             sql += u' limit %d'%limit
734         if offset != None:
735             sql += u' offset %d'%offset
736
737         try:
738             return self.fetchall(sql, arg) or []
739         except Exception, msg:
740             print >> sys.stderr, "sqldb: Wrong getAll sql statement:", sql
741             raise Exception, msg
742     
743     # ----- Tribler DB operations ----
744
745     #------------- useful functions for multiple handlers ----------
746     def insertPeer(self, permid, update=True, commit=True, **argv):
747         """ Insert a peer. permid is the binary permid.
748         If the peer is already in db and update is True, update the peer.
749         """
750         peer_id = self.getPeerID(permid)
751         peer_existed = False
752         if 'name' in argv:
753             argv['name'] = dunno2unicode(argv['name'])
754         if peer_id != None:
755             peer_existed = True
756             if update:
757                 where=u'peer_id=%d'%peer_id
758                 self.update('Peer', where, commit=commit, **argv)
759         else:
760             self.insert('Peer', permid=bin2str(permid), commit=commit, **argv)
761         return peer_existed
762                 
763     def deletePeer(self, permid=None, peer_id=None, force=True, commit=True):
764         if peer_id is None:
765             peer_id = self.getPeerID(permid)
766             
767         deleted = False
768         if peer_id != None:
769             if force:
770                 self.delete('Peer', peer_id=peer_id, commit=commit)
771             else:
772                 self.delete('Peer', peer_id=peer_id, friend=0, superpeer=0, commit=commit)
773             deleted = not self.hasPeer(permid, check_db=True)
774             if deleted and permid in self.permid_id:
775                 self.permid_id.pop(permid)
776
777         return deleted
778                 
779     def getPeerID(self, permid):
780         assert isinstance(permid, str), permid
781         # permid must be binary
782         if permid in self.permid_id:
783             return self.permid_id[permid]
784         
785         sql_get_peer_id = "SELECT peer_id FROM Peer WHERE permid==?"
786         peer_id = self.fetchone(sql_get_peer_id, (bin2str(permid),))
787         if peer_id != None:
788             self.permid_id[permid] = peer_id
789         
790         return peer_id
791     
792     def hasPeer(self, permid, check_db=False):
793         if not check_db:
794             return bool(self.getPeerID(permid))
795         else:
796             permid_str = bin2str(permid)
797             sql_get_peer_id = "SELECT peer_id FROM Peer WHERE permid==?"
798             peer_id = self.fetchone(sql_get_peer_id, (permid_str,))
799             if peer_id is None:
800                 return False
801             else:
802                 return True
803     
804     def insertInfohash(self, infohash, check_dup=False, commit=True):
805         """ Insert an infohash. infohash is binary """
806         assert isinstance(infohash, str), "INFOHASH has invalid type: %s" % type(infohash)
807         assert len(infohash) == INFOHASH_LENGTH, "INFOHASH has invalid length: %d" % len(infohash)
808         if infohash in self.infohash_id:
809             if check_dup:
810                 print >> sys.stderr, 'sqldb: infohash to insert already exists', `infohash`
811             return
812         
813         infohash_str = bin2str(infohash)
814         sql_insert_torrent = "INSERT INTO Torrent (infohash) VALUES (?)"
815         self.execute_write(sql_insert_torrent, (infohash_str,), commit)
816     
817     def deleteInfohash(self, infohash=None, torrent_id=None, commit=True):
818         assert infohash is None or isinstance(infohash, str), "INFOHASH has invalid type: %s" % type(infohash)
819         assert infohash is None or len(infohash) == INFOHASH_LENGTH, "INFOHASH has invalid length: %d" % len(infohash)
820         if torrent_id is None:
821             torrent_id = self.getTorrentID(infohash)
822             
823         if torrent_id != None:
824             self.delete('Torrent', torrent_id=torrent_id, commit=commit)
825             if infohash in self.infohash_id:
826                 self.infohash_id.pop(infohash)
827     
828     def getTorrentID(self, infohash):
829         assert isinstance(infohash, str), "INFOHASH has invalid type: %s" % type(infohash)
830         assert len(infohash) == INFOHASH_LENGTH, "INFOHASH has invalid length: %d" % len(infohash)
831         if infohash in self.infohash_id:
832             return self.infohash_id[infohash]
833         
834         sql_get_torrent_id = "SELECT torrent_id FROM Torrent WHERE infohash==?"
835         tid = self.fetchone(sql_get_torrent_id, (bin2str(infohash),))
836         if tid != None:
837             self.infohash_id[infohash] = tid
838         return tid
839         
840     def getInfohash(self, torrent_id):
841         sql_get_infohash = "SELECT infohash FROM Torrent WHERE torrent_id==?"
842         arg = (torrent_id,)
843         ret = self.fetchone(sql_get_infohash, arg)
844         ret = str2bin(ret)
845         return ret
846     
847     def getTorrentStatusTable(self):
848         if self.status_table is None:
849             st = self.getAll('TorrentStatus', ('lower(name)', 'status_id'))
850             self.status_table = dict(st)
851         return self.status_table
852     
853     def getTorrentCategoryTable(self):
854         # The key is in lower case
855         if self.category_table is None:
856             ct = self.getAll('Category', ('lower(name)', 'category_id'))
857             self.category_table = dict(ct)
858         return self.category_table
859     
860     def getTorrentSourceTable(self):
861         # Don't use lower case because some URLs are case sensitive
862         if self.src_table is None:
863             st = self.getAll('TorrentSource', ('name', 'source_id'))
864             self.src_table = dict(st)
865         return self.src_table
866
867     def test(self):
868         res1 = self.getAll('Category', '*')
869         res2 = len(self.getAll('Peer', 'name', 'name is not NULL'))
870         return (res1, res2)
871
872
873 class SQLiteCacheDBV5(SQLiteCacheDBBase):
874     def updateDB(self, fromver, tover):
875
876         # bring database up to version 2, if necessary        
877         if fromver < 2:
878             sql = """
879
880 -- Patch for BuddyCast 4
881
882 ALTER TABLE MyPreference ADD COLUMN click_position INTEGER DEFAULT -1;
883 ALTER TABLE MyPreference ADD COLUMN reranking_strategy INTEGER DEFAULT -1;
884 ALTER TABLE Preference ADD COLUMN click_position INTEGER DEFAULT -1;
885 ALTER TABLE Preference ADD COLUMN reranking_strategy INTEGER DEFAULT -1;
886 CREATE TABLE ClicklogSearch (
887                      peer_id INTEGER DEFAULT 0,
888                      torrent_id INTEGER DEFAULT 0,
889                      term_id INTEGER DEFAULT 0,
890                      term_order INTEGER DEFAULT 0
891                      );
892 CREATE INDEX idx_search_term ON ClicklogSearch (term_id);
893 CREATE INDEX idx_search_torrent ON ClicklogSearch (torrent_id);
894
895
896 CREATE TABLE ClicklogTerm (
897                     term_id INTEGER PRIMARY KEY AUTOINCREMENT DEFAULT 0,
898                     term VARCHAR(255) NOT NULL,
899                     times_seen INTEGER DEFAULT 0 NOT NULL
900                     );
901 CREATE INDEX idx_terms_term ON ClicklogTerm(term);  
902     
903 """       
904             
905             self.execute_write(sql, commit=False)
906
907         
908         if fromver < 3:
909             sql = """
910 -- Patch for Local Peer Discovery
911             
912 ALTER TABLE Peer ADD COLUMN is_local integer DEFAULT 0;
913 """       
914             self.execute_write(sql, commit=False)
915
916         if fromver < 4:
917             sql="""
918 -- V2: Patch for VoteCast
919
920 DROP TABLE IF EXISTS ModerationCast;
921 DROP INDEX IF EXISTS moderationcast_idx;
922
923 DROP TABLE IF EXISTS Moderators;
924 DROP INDEX IF EXISTS moderators_idx;
925
926 DROP TABLE IF EXISTS VoteCast;
927 DROP INDEX IF EXISTS votecast_idx;
928
929 CREATE TABLE VoteCast (
930 mod_id text,
931 voter_id text,
932 vote integer,
933 time_stamp integer
934 );
935
936 CREATE INDEX mod_id_idx
937 on VoteCast 
938 (mod_id);
939
940 CREATE INDEX voter_id_idx
941 on VoteCast 
942 (voter_id);
943
944 CREATE UNIQUE INDEX votecast_idx
945 ON VoteCast
946 (mod_id, voter_id);
947             
948 --- patch for BuddyCast 5 : Creation of Popularity table and relevant stuff
949
950 CREATE TABLE Popularity (
951                          torrent_id INTEGER,
952                          peer_id INTEGER,
953                          msg_receive_time NUMERIC,
954                          size_calc_age NUMERIC,
955                          num_seeders INTEGER DEFAULT 0,
956                          num_leechers INTEGER DEFAULT 0,
957                          num_of_sources INTEGER DEFAULT 0
958                      );
959
960 CREATE INDEX Message_receive_time_idx 
961   ON Popularity 
962    (msg_receive_time);
963
964 CREATE INDEX Size_calc_age_idx 
965   ON Popularity 
966    (size_calc_age);
967
968 CREATE INDEX Number_of_seeders_idx 
969   ON Popularity 
970    (num_seeders);
971
972 CREATE INDEX Number_of_leechers_idx 
973   ON Popularity 
974    (num_leechers);
975
976 CREATE UNIQUE INDEX Popularity_idx
977   ON Popularity
978    (torrent_id, peer_id, msg_receive_time);
979
980 -- v4: Patch for ChannelCast, Search
981
982 CREATE TABLE ChannelCast (
983 publisher_id text,
984 publisher_name text,
985 infohash text,
986 torrenthash text,
987 torrentname text,
988 time_stamp integer,
989 signature text
990 );
991
992 CREATE INDEX pub_id_idx
993 on ChannelCast
994 (publisher_id);
995
996 CREATE INDEX pub_name_idx
997 on ChannelCast
998 (publisher_name);
999
1000 CREATE INDEX infohash_ch_idx
1001 on ChannelCast
1002 (infohash);
1003
1004 ----------------------------------------
1005
1006 CREATE TABLE InvertedIndex (
1007 word               text NOT NULL,
1008 torrent_id         integer
1009 );
1010
1011 CREATE INDEX word_idx
1012 on InvertedIndex
1013 (word);
1014
1015 CREATE UNIQUE INDEX invertedindex_idx
1016 on InvertedIndex
1017 (word,torrent_id);
1018
1019 ----------------------------------------
1020
1021 -- Set all similarity to zero because we are using a new similarity
1022 -- function and the old values no longer correspond to the new ones
1023 UPDATE Peer SET similarity = 0;
1024 UPDATE Torrent SET relevance = 0;
1025
1026 """
1027             self.execute_write(sql, commit=False)
1028         if fromver < 5:
1029             sql=\
1030 """
1031 --------------------------------------
1032 -- Creating Subtitles (future RichMetadata) DB
1033 ----------------------------------
1034 CREATE TABLE Metadata (
1035   metadata_id integer PRIMARY KEY ASC AUTOINCREMENT NOT NULL,
1036   publisher_id text NOT NULL,
1037   infohash text NOT NULL,
1038   description text,
1039   timestamp integer NOT NULL,
1040   signature text NOT NULL,
1041   UNIQUE (publisher_id, infohash),
1042   FOREIGN KEY (publisher_id, infohash) 
1043     REFERENCES ChannelCast(publisher_id, infohash) 
1044     ON DELETE CASCADE -- the fk constraint is not enforced by sqlite
1045 );
1046
1047 CREATE INDEX infohash_md_idx
1048 on Metadata(infohash);
1049
1050 CREATE INDEX pub_md_idx
1051 on Metadata(publisher_id);
1052
1053
1054 CREATE TABLE Subtitles (
1055   metadata_id_fk integer,
1056   subtitle_lang text NOT NULL,
1057   subtitle_location text,
1058   checksum text NOT NULL,
1059   UNIQUE (metadata_id_fk,subtitle_lang),
1060   FOREIGN KEY (metadata_id_fk) 
1061     REFERENCES Metadata(metadata_id) 
1062     ON DELETE CASCADE, -- the fk constraint is not enforced by sqlite
1063   
1064   -- ISO639-2 uses 3 characters for lang codes
1065   CONSTRAINT lang_code_length 
1066     CHECK ( length(subtitle_lang) == 3 ) 
1067 );
1068
1069
1070 CREATE INDEX metadata_sub_idx
1071 on Subtitles(metadata_id_fk);
1072
1073 -- Stores the subtitles that peers have as an integer bitmask
1074  CREATE TABLE SubtitlesHave (
1075     metadata_id_fk integer,
1076     peer_id text NOT NULL,
1077     have_mask integer NOT NULL,
1078     received_ts integer NOT NULL, --timestamp indicating when the mask was received
1079     UNIQUE (metadata_id_fk, peer_id),
1080     FOREIGN KEY (metadata_id_fk)
1081       REFERENCES Metadata(metadata_id)
1082       ON DELETE CASCADE, -- the fk constraint is not enforced by sqlite
1083
1084     -- 32 bit unsigned integer
1085     CONSTRAINT have_mask_length
1086       CHECK (have_mask >= 0 AND have_mask < 4294967296)
1087 );
1088
1089 CREATE INDEX subtitles_have_idx
1090 on SubtitlesHave(metadata_id_fk);
1091
1092 -- this index can boost queries
1093 -- ordered by timestamp on the SubtitlesHave DB
1094 CREATE INDEX subtitles_have_ts
1095 on SubtitlesHave(received_ts);
1096
1097 """
1098             self.execute_write(sql, commit=False)
1099             
1100         # updating version stepwise so if this works, we store it
1101         # regardless of later, potentially failing updates
1102         self.writeDBVersion(CURRENT_MAIN_DB_VERSION, commit=False)
1103         self.commit()
1104         
1105         # now the start the process of parsing the torrents to insert into 
1106         # InvertedIndex table. 
1107         if TEST_SQLITECACHEDB_UPGRADE:
1108             state_dir = "."
1109         else:
1110             from BaseLib.Core.Session import Session
1111             session = Session.get_instance()
1112             state_dir = session.get_state_dir()
1113         tmpfilename = os.path.join(state_dir,"upgradingdb.txt")
1114         if fromver < 4 or os.path.exists(tmpfilename):
1115             def upgradeTorrents():
1116                 # fetch some un-inserted torrents to put into the InvertedIndex
1117                 sql = """
1118                 SELECT torrent_id, name, torrent_file_name
1119                 FROM Torrent
1120                 WHERE torrent_id NOT IN (SELECT DISTINCT torrent_id FROM InvertedIndex)
1121                 AND torrent_file_name IS NOT NULL
1122                 LIMIT 20"""
1123                 records = self.fetchall(sql)
1124                 
1125                 if len(records) == 0:
1126                     # upgradation is complete and hence delete the temp file
1127                     os.remove(tmpfilename) 
1128                     if DEBUG: print >> sys.stderr, "DB Upgradation: temp-file deleted", tmpfilename
1129                     return 
1130                     
1131                 for torrent_id, name, torrent_file_name in records:
1132                     try:
1133                         abs_filename = os.path.join(session.get_torrent_collecting_dir(), torrent_file_name)
1134                         if not os.path.exists(abs_filename):
1135                             raise RuntimeError(".torrent file not found. Use fallback.")
1136                         torrentdef = TorrentDef.load(abs_filename)
1137                         torrent_name = torrentdef.get_name_as_unicode()
1138                         keywords = Set(split_into_keywords(torrent_name))
1139                         for filename in torrentdef.get_files_as_unicode():
1140                             keywords.update(split_into_keywords(filename))
1141
1142                     except:
1143                         # failure... most likely the .torrent file
1144                         # is invalid
1145
1146                         # use keywords from the torrent name
1147                         # stored in the database
1148                         torrent_name = dunno2unicode(name)
1149                         keywords = Set(split_into_keywords(torrent_name))
1150
1151                     # store the keywords in the InvertedIndex
1152                     # table in the database
1153                     if len(keywords) > 0:
1154                         values = [(keyword, torrent_id) for keyword in keywords]
1155                         self.executemany(u"INSERT OR REPLACE INTO InvertedIndex VALUES(?, ?)", values, commit=False)
1156                         if DEBUG:
1157                             print >> sys.stderr, "DB Upgradation: Extending the InvertedIndex table with", len(values), "new keywords for", torrent_name
1158
1159                 # now commit, after parsing the batch of torrents
1160                 self.commit()
1161                 
1162                 # upgradation not yet complete; comeback after 5 sec
1163                 tqueue.add_task(upgradeTorrents, 5) 
1164
1165
1166             # Create an empty file to mark the process of upgradation.
1167             # In case this process is terminated before completion of upgradation,
1168             # this file remains even though fromver >= 4 and hence indicating that 
1169             # rest of the torrents need to be inserted into the InvertedIndex!
1170             
1171             # ensure the temp-file is created, if it is not already
1172             try:
1173                 open(tmpfilename, "w")
1174                 if DEBUG: print >> sys.stderr, "DB Upgradation: temp-file successfully created"
1175             except:
1176                 if DEBUG: print >> sys.stderr, "DB Upgradation: failed to create temp-file"
1177             
1178             if DEBUG: print >> sys.stderr, "Upgrading DB .. inserting into InvertedIndex"
1179             from BaseLib.Utilities.TimedTaskQueue import TimedTaskQueue
1180             from sets import Set
1181             from BaseLib.Core.Search.SearchManager import split_into_keywords
1182             from BaseLib.Core.TorrentDef import TorrentDef
1183
1184             # start the upgradation after 10 seconds
1185             tqueue = TimedTaskQueue("UpgradeDB")
1186             tqueue.add_task(upgradeTorrents, 10)
1187
1188 class SQLiteCacheDB(SQLiteCacheDBV5):
1189     __single = None    # used for multithreaded singletons pattern
1190
1191     @classmethod
1192     def getInstance(cls, *args, **kw):
1193         # Singleton pattern with double-checking to ensure that it can only create one object
1194         if cls.__single is None:
1195             cls.lock.acquire()   
1196             try:
1197                 if cls.__single is None:
1198                     cls.__single = cls(*args, **kw)
1199                     #print >>sys.stderr,"SqliteCacheDB: getInstance: created is",cls,cls.__single
1200             finally:
1201                 cls.lock.release()
1202         return cls.__single
1203     
1204     def __init__(self, *args, **kargs):
1205         # always use getInstance() to create this object
1206         
1207         # ARNOCOMMENT: why isn't the lock used on this read?!
1208         
1209         if self.__single != None:
1210             raise RuntimeError, "SQLiteCacheDB is singleton"
1211         SQLiteCacheDBBase.__init__(self, *args, **kargs)
1212     
1213 if __name__ == '__main__':
1214     configure_dir = sys.argv[1]
1215     config = {}
1216     config['state_dir'] = configure_dir
1217     config['install_dir'] = u'.'
1218     config['peer_icon_path'] = u'.'
1219     sqlite_test = init(config)
1220     sqlite_test.test()
1221