Personal tools
You are here: Home Open Source Misc threads in zope AsyncURLFetcher
Document Actions

AsyncURLFetcher

by Christian Heimes last modified 2004-05-25 23:29

Fetch urls async using workerthread.py

#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
"""Async URL Fetcher library
(c) 2004 Christian Heimes


"""
# Depends on Python 2.3+

################################
# configuration
WAIT_TIME = 60.0 # reloop ever 60 seconds
MAX_THREADS = 1  # run up to 5 parallel fetcher threads
TIMEOUT = 300    # timeout an url after 5 minutes
ERROR_THRESHOLD = 5
ERROR_DELAY = 300

#WAIT_TIME = 10
#TIMEOUT = 60
################################

import threading
import thread
import time
import urllib2

from workerthread import WorkerThread, ReadWriteLock

try:
    from zLOG import LOG, INFO, PROBLEM
except ImportError:
    INFO = 'INFO'
    PROBLEM = 'PROBLEM'
    def LOG(prd, level, msg):
        print "%s: %s - %s" % (prd, level, msg)

def log(msg, level=INFO):
    LOG('AsyncURLFetcher', level, msg)

class URL(object):
    """An url object
    """
    __slots__ = ('__url', '__valid', '__last_updated', '__errors', '__errormsg',
                 '__data', '__implements__', )
    __implements__ = None #XXX IURL

    def __init__(self, url, valid):
        self.__url = url
        self.__valid = valid
        self.__last_updated = None
        self.__errors = 0
        self.__errormsg = ''
        self.__data = ''

    def _setData(self, data):
        self.__last_updated = time.time()
        self.__data = data

    def _setErrorMsg(self, msg):
        if self.__errors < ERROR_THRESHOLD:
            self.__errors+=1
        self.__errormsg = msg

    def __repr__(self):
        return '<URL object for %s at %s' % (self.url, hex(id(self)))

    def __str__(self):
        return self.url

    url   = property(lambda self: self.__url, None, None, 'Url to fetch')
    data  = property(lambda self: self.__data, _setData, None, 'Data fetched from the url')
    error  = property(lambda self: self.__errormsg, _setErrorMsg, None, 'Last error message')

    def outdated(self):
        """Is data outdated?
        
        Based on the last update time and amount of errors
        """
        if self.__errors > 0:
            # decrease error amount
            self.__errors-=1

        if self.__errors >= ERROR_THRESHOLD:
            # add a delay if we are over the max error threshold
            timeout = self.__valid + ERROR_DELAY
        else:
            timeout = self.__valid

        if self.__last_updated:
            return self.__last_updated + timeout < time.time()
        else:
            return True

class URLContainer(object):
    """Storage object for urls
    """
    __slots__ = ('__lock', '__data', '__urls', '__position', '__implements__', )
    __implements__ = None #XXX IURLContainer

    def __init__(self):
        self.__lock = ReadWriteLock()
        # dict of url objects: url -> object
        self.__data = {}
        # list of objects
        self.__urls = []
        self.__position = 0

    def __len__(self):
        self.__lock.acquire_read()
        try:
            return len(self.__data)
        finally:
            self.__lock.release_read()

    def __contains__(self, url):
        self.__lock.acquire_read()
        try:
            return url in self.__data
        finally:
            self.__lock.release_read()

    def __iter__(self):
        return iter(self.__data.values())

    def __getitem__(self, url):
        self.__lock.acquire_read()
        try:
            return self.__data[key]
        finally:
            self.__lock.release_read()

    def __delitem__(self, url):
        idx = self.__url.index(key)
        self.__lock.acquire_write()
        try:
            del self.__data[key]
            del self.__url[idx]
            assert (len(self.__data) == len(self.__urls))
        finally:
            self.__lock.release_write()

    def registerURL(self, url, valid=TIMEOUT):
        """Adds a new url and forces it to be fetched next
        """
        assert (len(self.__data) == len(self.__urls))

        if not isinstance(url, URL):
            url = URL(url, valid)

        if url.url in self:
            return

        self.__lock.acquire_write()
        try:
            self.__data[url.url] = url
            self.__urls.append(url.url)
        finally:
            self.__lock.release_write()

    def unregisterURL(self, url):
        """Unregisters an url
        """
        if not isinstance(url, URL):
            url = url.url
        del self[url]

    def listURLs(self):
        """Lists all url names
        """
        return self.__urls

    def updateURL(self, url, data):
        """Updates the data of an registered url
        """
        # XXX interface
        if hasattr(url, 'url'):
            url = url.url
        # XXX
        log('Updating %s with %s kb of data' % (url, len(data) / 1024))
        self.__lock.acquire_write()
        try:
            if url not in self.__data:
                return
            self.__data[url].data = data
        finally:
            self.__lock.release_write()

    def updateError(self, url, error):
        """Sets the last error of an registered url
        """
        log('Error updating %s: %s' % (url, error))
        self.__lock.acquire_write()
        try:
            if url not in self.__data:
                return
            self.__data[url].error = error
        finally:
            self.__lock.release_write()

    def next(self):
        """Get the next url or next forced url
        
        May raise an IndexError at the end
        """
        # XXX paranoid write lock? 
        self.__lock.acquire_write()
        try:
            url = self.__urls[self.__position]
            self.__position+=1
            return self.__data[url]
        finally:
            self.__lock.release_write()

    def rewind(self):
        self.__position=0


class URLFetcher(threading.Thread):
    """Fetchs data from an url and stores it into storage
    """

    def __init__(self, url, storage, semaphore):
        threading.Thread.__init__(self, name=url)
        self.setDaemon(True)
        self.__url = url
        self.__storage = storage
        self.__semaphore = semaphore

    def run(self):
        self.__semaphore.acquire()
        try:
            time.sleep(5)
            try:
                conn = urllib2.urlopen(url=self.__url.url)
                data = conn.read()
            except urllib2.URLError, msg:
                self.__storage.updateError(self.__url.url, msg)
            else:
                self.__storage.updateURL(self.__url.url, data)
        finally:
            self.__semaphore.release()


class URLFetcherMainThread(WorkerThread):
    """Main thread that is looping and threading child threads to update urls
    """

    def __init__(self, name, storage, wait, max_threads):
        WorkerThread.__init__(self, name=name, wait=wait)
        self.__storage = storage
        self.__semaphore = threading.BoundedSemaphore(max_threads)
        self.__threads = []
        self.__force_update = False
        log('Starting main thread')

    # run() is used by WorkerThread!

    def forceUpdate(self):
        self.__force_update = True
        self.__storage.rewind()
        self.kick(self)
        self.__force_update = False

    def main(self):
        storage = self.__storage
        semaphore = self.__semaphore
        #XXX log('Running main thread')
        while True:
            try:
                url = storage.next()
            except IndexError:
                # stops main loop until next time event
                storage.rewind()
                # XXX self.__force_update = False
                break
            else:
                if url.outdated() or self.__force_update:
                    thread = URLFetcher(url, storage, semaphore)
                    self.__threads.append(thread)
                    thread.start()
                else:
                    pass
                    # XXX
                    log('Not outdated: %s' % url.url)


####
# instances

urlStorage = URLContainer()
asyncFetcher = URLFetcherMainThread('AsyncURLFetcher',
               storage=urlStorage, wait=WAIT_TIME, max_threads=MAX_THREADS)

def registerURL(uri, timeout=TIMEOUT):
    urlStorage.registerURL(url, timeout)
    asyncFetcher.kick()

getURL = urlStorage.__getitem__

def initialize():
    asyncFetcher.start()

def test():
    urlStorage.registerURL('http://example.org/')

Powered by Plone CMS, the Open Source Content Management System