AsyncURLFetcher
Fetch urls async using workerthread.py
# This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Async URL Fetcher library (c) 2004 Christian Heimes """ # Depends on Python 2.3+ ################################ # configuration WAIT_TIME = 60.0 # reloop ever 60 seconds MAX_THREADS = 1 # run up to 5 parallel fetcher threads TIMEOUT = 300 # timeout an url after 5 minutes ERROR_THRESHOLD = 5 ERROR_DELAY = 300 #WAIT_TIME = 10 #TIMEOUT = 60 ################################ import threading import thread import time import urllib2 from workerthread import WorkerThread, ReadWriteLock try: from zLOG import LOG, INFO, PROBLEM except ImportError: INFO = 'INFO' PROBLEM = 'PROBLEM' def LOG(prd, level, msg): print "%s: %s - %s" % (prd, level, msg) def log(msg, level=INFO): LOG('AsyncURLFetcher', level, msg) class URL(object): """An url object """ __slots__ = ('__url', '__valid', '__last_updated', '__errors', '__errormsg', '__data', '__implements__', ) __implements__ = None #XXX IURL def __init__(self, url, valid): self.__url = url self.__valid = valid self.__last_updated = None self.__errors = 0 self.__errormsg = '' self.__data = '' def _setData(self, data): self.__last_updated = time.time() self.__data = data def _setErrorMsg(self, msg): if self.__errors < ERROR_THRESHOLD: self.__errors+=1 self.__errormsg = msg def __repr__(self): return '<URL object for %s at %s' % (self.url, hex(id(self))) def __str__(self): return self.url url = property(lambda self: self.__url, None, None, 'Url to fetch') data = property(lambda self: self.__data, _setData, None, 'Data fetched from the url') error = property(lambda self: self.__errormsg, _setErrorMsg, None, 'Last error message') def outdated(self): """Is data outdated? Based on the last update time and amount of errors """ if self.__errors > 0: # decrease error amount self.__errors-=1 if self.__errors >= ERROR_THRESHOLD: # add a delay if we are over the max error threshold timeout = self.__valid + ERROR_DELAY else: timeout = self.__valid if self.__last_updated: return self.__last_updated + timeout < time.time() else: return True class URLContainer(object): """Storage object for urls """ __slots__ = ('__lock', '__data', '__urls', '__position', '__implements__', ) __implements__ = None #XXX IURLContainer def __init__(self): self.__lock = ReadWriteLock() # dict of url objects: url -> object self.__data = {} # list of objects self.__urls = [] self.__position = 0 def __len__(self): self.__lock.acquire_read() try: return len(self.__data) finally: self.__lock.release_read() def __contains__(self, url): self.__lock.acquire_read() try: return url in self.__data finally: self.__lock.release_read() def __iter__(self): return iter(self.__data.values()) def __getitem__(self, url): self.__lock.acquire_read() try: return self.__data[key] finally: self.__lock.release_read() def __delitem__(self, url): idx = self.__url.index(key) self.__lock.acquire_write() try: del self.__data[key] del self.__url[idx] assert (len(self.__data) == len(self.__urls)) finally: self.__lock.release_write() def registerURL(self, url, valid=TIMEOUT): """Adds a new url and forces it to be fetched next """ assert (len(self.__data) == len(self.__urls)) if not isinstance(url, URL): url = URL(url, valid) if url.url in self: return self.__lock.acquire_write() try: self.__data[url.url] = url self.__urls.append(url.url) finally: self.__lock.release_write() def unregisterURL(self, url): """Unregisters an url """ if not isinstance(url, URL): url = url.url del self[url] def listURLs(self): """Lists all url names """ return self.__urls def updateURL(self, url, data): """Updates the data of an registered url """ # XXX interface if hasattr(url, 'url'): url = url.url # XXX log('Updating %s with %s kb of data' % (url, len(data) / 1024)) self.__lock.acquire_write() try: if url not in self.__data: return self.__data[url].data = data finally: self.__lock.release_write() def updateError(self, url, error): """Sets the last error of an registered url """ log('Error updating %s: %s' % (url, error)) self.__lock.acquire_write() try: if url not in self.__data: return self.__data[url].error = error finally: self.__lock.release_write() def next(self): """Get the next url or next forced url May raise an IndexError at the end """ # XXX paranoid write lock? self.__lock.acquire_write() try: url = self.__urls[self.__position] self.__position+=1 return self.__data[url] finally: self.__lock.release_write() def rewind(self): self.__position=0 class URLFetcher(threading.Thread): """Fetchs data from an url and stores it into storage """ def __init__(self, url, storage, semaphore): threading.Thread.__init__(self, name=url) self.setDaemon(True) self.__url = url self.__storage = storage self.__semaphore = semaphore def run(self): self.__semaphore.acquire() try: time.sleep(5) try: conn = urllib2.urlopen(url=self.__url.url) data = conn.read() except urllib2.URLError, msg: self.__storage.updateError(self.__url.url, msg) else: self.__storage.updateURL(self.__url.url, data) finally: self.__semaphore.release() class URLFetcherMainThread(WorkerThread): """Main thread that is looping and threading child threads to update urls """ def __init__(self, name, storage, wait, max_threads): WorkerThread.__init__(self, name=name, wait=wait) self.__storage = storage self.__semaphore = threading.BoundedSemaphore(max_threads) self.__threads = [] self.__force_update = False log('Starting main thread') # run() is used by WorkerThread! def forceUpdate(self): self.__force_update = True self.__storage.rewind() self.kick(self) self.__force_update = False def main(self): storage = self.__storage semaphore = self.__semaphore #XXX log('Running main thread') while True: try: url = storage.next() except IndexError: # stops main loop until next time event storage.rewind() # XXX self.__force_update = False break else: if url.outdated() or self.__force_update: thread = URLFetcher(url, storage, semaphore) self.__threads.append(thread) thread.start() else: pass # XXX log('Not outdated: %s' % url.url) #### # instances urlStorage = URLContainer() asyncFetcher = URLFetcherMainThread('AsyncURLFetcher', storage=urlStorage, wait=WAIT_TIME, max_threads=MAX_THREADS) def registerURL(uri, timeout=TIMEOUT): urlStorage.registerURL(url, timeout) asyncFetcher.kick() getURL = urlStorage.__getitem__ def initialize(): asyncFetcher.start() def test(): urlStorage.registerURL('http://example.org/')