instrumentation: add next-share/
[cs-p2p-next.git] / instrumentation / next-share / BaseLib / Core / Statistics / ChannelCrawler.py
1 # Written by Boudewijn Schoon\r
2 # see LICENSE.txt for license information\r
3 \r
4 import sys\r
5 import cPickle\r
6 from time import strftime\r
7 \r
8 from BaseLib.Core.Overlay.SecureOverlay import OLPROTO_VER_THIRTEENTH\r
9 # OLPROTO_VER_SEVENTH --> Sixth public release, >= 4.5.0, supports CRAWLER_REQUEST and CRAWLER_REPLY messages\r
10 # OLPROTO_VER_EIGHTH  --> Seventh public release, >= 5.0, supporting BuddyCast with clicklog info.\r
11 \r
12 from BaseLib.Core.BitTornado.BT1.MessageID import CRAWLER_CHANNEL_QUERY\r
13 from BaseLib.Core.CacheDB.sqlitecachedb import SQLiteCacheDB\r
14 from BaseLib.Core.Utilities.utilities import show_permid, show_permid_short\r
15 from BaseLib.Core.Statistics.Crawler import Crawler\r
16 \r
17 DEBUG = False\r
18 \r
19 class ChannelCrawler:\r
20     __single = None\r
21 \r
22     @classmethod\r
23     def get_instance(cls, *args, **kargs):\r
24         if not cls.__single:\r
25             cls.__single = cls(*args, **kargs)\r
26         return cls.__single\r
27 \r
28     def __init__(self):\r
29         self._sqlite_cache_db = SQLiteCacheDB.getInstance()\r
30 \r
31         crawler = Crawler.get_instance()\r
32         if crawler.am_crawler():\r
33             self._file = open("channelcrawler.txt", "a")\r
34             self._file.write("".join(("# ", "*" * 80, "\n# ", strftime("%Y/%m/%d %H:%M:%S"), " Crawler started\n")))\r
35             self._file.flush()\r
36         else:\r
37             self._file = None\r
38 \r
39     def query_initiator(self, permid, selversion, request_callback):\r
40         """\r
41         Established a new connection. Send a CRAWLER_CHANNEL_QUERY request.\r
42         @param permid The Tribler peer permid\r
43         @param selversion The oberlay protocol version\r
44         @param request_callback Call this function one or more times to send the requests: request_callback(message_id, payload)\r
45         """\r
46         if DEBUG: print >>sys.stderr, "channelcrawler: query_initiator", show_permid_short(permid)\r
47         sql = []\r
48         if selversion >= OLPROTO_VER_THIRTEENTH:\r
49             sql.extend(("SELECT 'channel_files', publisher_id, count(*) FROM ChannelCast group by publisher_id",\r
50                         "SELECT 'my_votes', mod_id, voter_id, vote, time_stamp FROM VoteCast where voter_id='" + show_permid(permid) + "' order by time_stamp desc limit 100",\r
51                         "SELECT 'other_votes', mod_id, voter_id, vote, time_stamp FROM VoteCast where voter_id<>'" + show_permid(permid) + "' order by time_stamp desc limit 100"))\r
52 \r
53         request_callback(CRAWLER_CHANNEL_QUERY, ";".join(sql), callback=self._after_request_callback)\r
54 \r
55     def _after_request_callback(self, exc, permid):\r
56         """\r
57         Called by the Crawler with the result of the request_callback\r
58         call in the query_initiator method.\r
59         """\r
60         if not exc:\r
61             if DEBUG: print >>sys.stderr, "channelcrawler: request send to", show_permid_short(permid)\r
62             self._file.write("; ".join((strftime("%Y/%m/%d %H:%M:%S"), "REQUEST", show_permid(permid), "\n")))\r
63             self._file.flush()\r
64 \r
65     def handle_crawler_request(self, permid, selversion, channel_id, message, reply_callback):\r
66         """\r
67         Received a CRAWLER_CHANNEL_QUERY request.\r
68         @param permid The Crawler permid\r
69         @param selversion The overlay protocol version\r
70         @param channel_id Identifies a CRAWLER_REQUEST/CRAWLER_REPLY pair\r
71         @param message The message payload\r
72         @param reply_callback Call this function once to send the reply: reply_callback(payload [, error=123])\r
73         """\r
74         if DEBUG:\r
75             print >> sys.stderr, "channelcrawler: handle_crawler_request", show_permid_short(permid), message\r
76 \r
77         # execute the sql\r
78         try:\r
79             cursor = self._sqlite_cache_db.execute_read(message)\r
80 \r
81         except Exception, e:\r
82             reply_callback(str(e), error=1)\r
83         else:\r
84             if cursor:\r
85                 reply_callback(cPickle.dumps(list(cursor), 2))\r
86             else:\r
87                 reply_callback("error", error=2)\r
88 \r
89     def handle_crawler_reply(self, permid, selversion, channel_id, channel_data, error, message, request_callback):\r
90         """\r
91         Received a CRAWLER_CHANNEL_QUERY reply.\r
92         @param permid The Crawler permid\r
93         @param selversion The overlay protocol version\r
94         @param channel_id Identifies a CRAWLER_REQUEST/CRAWLER_REPLY pair\r
95         @param error The error value. 0 indicates success.\r
96         @param message The message payload\r
97         @param request_callback Call this function one or more times to send the requests: request_callback(message_id, payload)\r
98         """\r
99         if error:\r
100             if DEBUG:\r
101                 print >> sys.stderr, "channelcrawler: handle_crawler_reply", error, message\r
102 \r
103             self._file.write("; ".join((strftime("%Y/%m/%d %H:%M:%S"), "  REPLY", show_permid(permid), str(error), message, "\n")))\r
104             self._file.flush()\r
105 \r
106         else:\r
107             if DEBUG:\r
108                 print >> sys.stderr, "channelcrawler: handle_crawler_reply", show_permid_short(permid), cPickle.loads(message)\r
109 \r
110             self._file.write("; ".join((strftime("%Y/%m/%d %H:%M:%S"), "  REPLY", show_permid(permid), str(error), str(cPickle.loads(message)), "\n")))\r
111             self._file.flush()\r
112 \r