: Widefinder: Final Results

Published at
Tuesday 17th June, 2008
Tagged as

By the end of my last chunk of work on WideFinder, I'd found a set of things to investigate to improve performance. All were helpful, one way or another.

With smaller data sets, the first three together gave around a 10% improvement running on my dev machine; the last gave about 1.5% on the WF-2 machine over the 2.5 build I'd made myself; however I can't use it for the full run because it's 32 bit (the problem here is discussed below).

Initially I was running tests on my dev machine, using small amounts of data. There, out of order processing lost out slightly. It's bound to be a little slower in pure terms, because I had to use files instead of pipes for communication (I'm guessing the output was big enough to fill up the pipe; with some thought this could be fixed), but is more efficient with lots of workers across a large amount of data, because the run time variance increases. Additionally, the naive way of decreasing concurrent memory usage meant increasing the number of reduces; the alternative would increase the complexity of the user code, which I wanted to avoid. So there's probably some further improvements that can be made.

Anyway: my final result. I re-tested my two final versions, one with out-of-order processing and one not, on the 10m data sets before letting them both rip on the full data set. On the 10m line set, I was seeing up to a 25% improvement over my previous best run; for the whole lot, I saw only an 18% improvement: 34m15.76s. At this kind of scale, out of order processing, even with the file serialisation overhead, gives around a 10% speed improvement. (This is somewhat less than Mauricio Fernandez found doing pretty much the same thing in OCaml.)

This isn't even as good as the other Python implementation; for some reason I'm still getting nothing like efficient use out of the processor. Possibly having readers feeding the data to the processors over pipes might improve things here. I also still had to use the 64 bit version of Python, as Python's mmap doesn't support the offset parameter (there's a patch to allow it, which is private on the Python SourceForge site, sigh). In fact, it feels like every architectural optimisation I've tried to make has made things slower because of Python; the only things that have really speeded it up (beyond the initial parallelism) are the lower level optimisations that make the code less idiomatically Pythonic anyway.

It's possible that with some more thought and work, it would be possible to reduce this further; but I'd bet that Python will never get a run time below 15 minutes on WF-2 (although that would still put it in the same ballpark as the JVM-based implementations). At this point I'm mostly into optimising without being able to do the optimisations I want to, which seems fairly pointless; I've also sunk more time into this than I originally intended, so I'm going to stop there.

The code

User code (86 LOC)

I suspect that more code can be trimmed from here, particularly from the top() function, but it seems neater in many ways to leave it largely as the direct translation of Tim's original, with a few optimisations. Python is somewhat more verbose than Ruby; for this kind of job I find the readability about the same.

The only optimisation I'm unhappy with is the line that tests for a fixed string and then tests for a regular expression that starts with that fixed string; this does seem to work faster, but is crazily unreadable, and really needs a comment. Of course, regular expressions in general tend to cause readability problems on their own; it's notable that my thought on writing this was to think "hey, we could just get rid of the regular expression entirely and see what happens"—what happens is that we start treating URIs with '.' in them, such as /ongoing/When/200x/2007/06/17/IMGP5702.png, as pages, when they aren't, so all the results are wrong.

You can't do this easily by avoiding regular expressions entirely; Tim's URI layout means that /ongoing/When/200x/2007/06/17/ is an archive list page, rather than a genuine entry page, so you can't just check with a '.' somewhere in the URI (although this is actually slightly slower than using a regular expression anyway). However looking into this in detail brought up errors in the regular expression parsing as well: /ongoing/When/200x/2005/07/14/Atom-1.0 is a valid page URI, but the regex thinks it's an auxiliary file. There's also the unexpected /ongoing/When/200x/2005/04/18/Adobe-Macromedia?IWasEnlightenedBy=MossyBlog.com, which while not strictly within Tim's URI layout, is allowed to exist and be a page resource by his server configuration. Regular expressions, although powerful, are very difficult to craft correctly; this is made much harder by the problem being (essentially) an ad hoc one: I doubt Tim ever sat down and designed his URI layout with a thought to doing this kind of processing on it. (Even if he had, it would probably get caught out by something similar.)

import re, sys, parallel

def top(dct, num=10):
    keys = []
    last = None
    def sorter(k1,k2):
        if k2==None:
            return 1
        diff = cmp(dct[k1], dct[k2])
        if diff==0:
            return cmp(k2,k1)
        else:
            return diff
    for key in dct.keys():
        if sorter(key, last)>0:
            keys.append(key)
            keys.sort(sorter)
            if len(keys)>num:
                keys = keys[1:]
            last = keys[0]
    keys.reverse()
    return keys

hit_re = re.compile(r'^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$')
hit_re_search = hit_re.search
hit_str = "/ongoing/When/"
ref_str = '"http://www.tbray.org/ongoing/'

def report(label, hash, shrink = False):
    print "Top %s:" % label
    if shrink:
        fmt = " %9.1fM: %s"
    else:
        fmt = " %10d: %s"
    for key in top(hash):
        if len(key) > 60:
            pkey = key[0:60] + "..."
        else:
            pkey = key
        if shrink:
            print fmt % (hash[key] / 1024.0 / 1024.0, pkey)
        else:
            print fmt % (hash[key], pkey)
    print

def processor(lines, driver):
    u_hits = driver.get_accumulator()
    u_bytes = driver.get_accumulator()
    s404s = driver.get_accumulator()
    clients = driver.get_accumulator()
    refs = driver.get_accumulator()

    def record(client, u, bytes, ref):
        u_bytes[u] += bytes
        if hit_str in u and hit_re_search(u):
            u_hits[u] += 1
            clients[client] += 1
            if ref !='"-"' and not ref.startswith(ref_str):
                refs[ref[1:-1]] += 1 # lose the quotes

    for line in lines:
        f = line.split()
        if len(f)<11 or f[5]!='"GET':
            continue
        client, u, status, bytes, ref = f[0], f[6], f[8], f[9], f[10]
        if status == '200':
            try:
                b = int(bytes)
            except:
                b = 0
            record(client, u, b, ref)
        elif status == '304':
            record(client, u, 0, ref)
        elif status == '404':
            s404s[u] += 1
    return [u_hits, u_bytes, s404s, clients, refs]

(u_hits, u_bytes, s404s, clients, refs) = parallel.process(sys.argv[1], processor)

print "%i resources, %i 404s, %i clients\n" % (len(u_hits), len(s404s), len(clients))

report('URIs by hit', u_hits)
report('URIs by bytes', u_bytes, True)
report('404s', s404s)
report('client addresses', clients)
report('referrers', refs)

Supporting library code (134 LOC)

Perhaps 30 LOC here is logging and debugging, or could be removed by getting ridding of a layer or two of abstraction (I had lots of different driver types while working on this). This is actually my final running code: note that various things aren't needed any more (such as the chunksize parameter to ParallelDriver.process_chunk). This impedes readability a little, but hopefully it's still fairly obvious what's going on: we run J children, each of which subdivides its part of the data into a number of equal chunks (calculated based on the total memory we want to use, but calculated confusingly because I tried various different ways of doing things and evolved the code rather than, you know, thinking), and processes and reduces each one separately. The result per child gets pushed back over a pipe, and we run a final reduce per child in the parent process.

import os, mmap, string, cPickle, sys, collections, logging

logging.basicConfig(format="%(message)s")

class ParallelDriver:
    # chunksize only used to detect overflow back to start of previous chunk
    def process_chunk(self, processor, mm, worker_id, chunk_id, chunk_start, chunk_end, chunksize):
        start = chunk_start
        end = chunk_end
        if start>0:
            if mm[start]=='\n':
                start -= 1
            while mm[start]!='\n':
                start -= 1
            if mm[start]=='\n':
                start += 1
        # [start, end) ie don't include end, just like python slices
        mm.seek(start)
        class LineIter:
            def __init__(self, mm):
                self.mm = mm

            def __iter__(self):
                return self

            def next(self):
                c1 = self.mm.tell()
                l = self.mm.readline()
                c2 = self.mm.tell()
                if c2 > end or c1 == c2:
                    raise StopIteration
                return l

        it = LineIter(mm)
        result = processor(it, self)
        return result

    def get_accumulator(self):
        return collections.defaultdict(int)

class ParallelMmapFilesMaxMemDriver(ParallelDriver):
    def __init__(self, file):
        self.file = file

    def process(self, processor):
        # based on <http://www.cs.ucsd.edu/~sorourke/wf.pl>
        s = os.stat(self.file)
        f = open(self.file)
        j = os.environ.get('J', '8')
        j = int(j)
        maxmem = os.environ.get('MAXMEM', 24*1024*1024*1024)
        maxmem = int(maxmem)
        size = s.st_size
        if maxmem > size:
            maxmem = size
        chunksize = maxmem / j
        PAGE=16*1024
        if chunksize > PAGE:
            # round to pages
            chunksize = PAGE * chunksize / PAGE
        if chunksize < 1:
            chunksize = 1
        total_chunks = size / chunksize
        chunks_per_worker = float(total_chunks) / j
        commfiles = {}
        for i in range(0, j):
            commfile = os.tmpfile()
            pid = os.fork()
            if pid:
                commfiles[pid] = commfile
            else:
                pickle = cPickle.Pickler(commfile)
                result = None
                worker_start = int(i*chunks_per_worker) * chunksize
                worker_end = int((i+1)*chunks_per_worker) * chunksize
                if i==j-1:
                    worker_end = size
                chunks_for_this_worker = (worker_end - worker_start) / chunksize
                for chunk in range(0, chunks_for_this_worker):
                    chunk_start = worker_start + chunk*chunksize
                    if chunk_start >= size:
                        break
                    chunk_end = worker_start + (chunk + 1)*chunksize
                    if chunk==chunks_for_this_worker-1:
                        chunk_end = worker_end
                    mm = mmap.mmap(f.fileno(), size, mmap.MAP_SHARED, mmap.PROT_READ)
                    interim_result = self.process_chunk(processor, mm, i, chunk, chunk_start, chunk_end, chunksize)
                    mm.close()
                    if interim_result==None:
                        continue
                    if result==None:
                        result = interim_result
                    else:
                        for idx in range(0, len(interim_result)):
                            for key in interim_result[idx].keys():
                                result[idx][key] += interim_result[idx][key]
                pickle.dump(result)
                commfile.close()
                sys.exit(0)

        final = None
        for i in range(0, j):
            (pid, status) = os.wait()
            readf = commfiles[pid]
            readf.seek(0)
            unpickle = cPickle.Unpickler(readf)
            result = unpickle.load()
            if result==None:
                readf.close()
                continue
            if final==None:
                # first time through, set up final accumulators
                final = []
                for i in range(0, len(result)):
                    final.append(self.get_accumulator())
            for i in range(0, len(result)):
                for key in result[i].keys():
                    final[i][key] += result[i][key]
            readf.close()
        f.close()
        return final

def process(file, processor):
    d = ParallelMmapFilesMaxMemDriver(file)
    return d.process(processor)

Final thoughts

First off: functional languages. Let's use them more. Mauricio Fernandez's OCaml implementation is still the leader, and despite being a little more verbose than the Ruby original is still pretty damn readable (he's actually just announced an even faster one: 25% faster by using block rather than line-oriented I/O; the LOC count is creeping up with each optimisation he does, though). Functional languages require you to think in a different way, but when you're dealing with streams of data, why on earth would you not want to think like this? Better, OCaml gives you imperative and object-oriented language features as well, and seems to pack a solid standard library. I haven't learned a new language for a while, and I'm a little rusty on the functional languages I've been exposed to anyway; I guess OCaml is going to be my next.

Second off: compilers. Let's use them more as well. It's all very well to have an interpreted language where you have no build phase, and can just dive straight into fiddling with code, but I'm really not convinced this is a valid optimisation of the programming process. For one thing, if you're working with tests (and please, work with tests), my experience is that running them will vastly out-weigh any compile time; in particular, if you use an interpreted language, it is likely to completely mask the compilation time. There are enough good, high-level, compiled languages around that compilation doesn't put you on the wrong end of an abstraction trade-off (the way using assembler over C used to before everyone gave up writing assembler by hand).

I have an anecdote that I think further backs up my assertion that a compilation step isn't an impediment to programming. The only time I can remember writing a non-trivial program and having it work first time was with a compiled language. Not because of type safety, not because the compiler caught lots of foolish mistakes. What happened was I was using voice recognition software to control my computer, due to a very serious bout of RSI, so I spent a fair amount of time thinking about the problem before going near the computer. It took about the same amount of time overall as if I'd been able to type, and had started programming straight away—a couple of days total. But this way, I wasn't exploring the problem at the computer, just trying things out in the way I might with an interpreted language, particularly one with an interpreter shell. Programming is primarily about thinking: in many cases doing the thinking up front will take at most as long as sitting down and starting typing. Plus, you can do it at a whiteboard, which means you're standing up and so probably being more healthy, and you can also pretend you're on House, doing differential diagnosis of your problem space.

There are situations where you want to explore, of course; not where you have no idea how to shape your solution, but rather where you aren't sure about the behaviour of the systems you're building on. WF-2 has been mostly like that: much of my time was actually spent writing subtle variants of the parallel driver to test against each other, because I don't understand the effects of the T1 processor, ZFS, and so on, well enough. A lot of the remaining time was spent in debugging line splitting code, due to lack of up-front thought. Neither of which, as far as I'm concerned, is really getting to the point of WF-2. I suspect this is true of most people working on WF-2; you choose your approach, and it comes down to how fast your language goes, how smart you are, and how much time you spend profiling and optimising.

Anyway, you need to have a rapid prototyping environment to explore the systems you're working with, perhaps an interpreted language or one with a shell; and you want a final implementation environment, which may want to use a compiled language. For the best of both worlds, use a language with an interpreter and a native code compiler. (Perhaps unsurprisingly, OCaml has both.)

Where does this leave us with WF-2? None of my original three questions has been answered, although I certainly have learned other things by actually getting my hands dirty. It may be premature to call this, because people are still working on the problem, but I'm not aware of anyone trying a different approach, architecturally speaking, probably because straightforward data decomposition has got so close to the theoretical best case. The optimisations that we've seen work at this scale are helpful—but may not be trivial for the programmer-on-the-street to use, either because language support is sometimes flimsy, or because they'll be building on top of other people's libraries and frameworks. So it's good news that, besides automatic parallelisation of user code, it's possible to parallelise various common algorithms. GCC has parallel versions of various algorithms, there are various parallel libraries for Java, and we can expect other systems to follow. For obvious reasons there's lots of research in this space; check out a list of books on parallel algorithms. In particular check out the dates: this stuff has been around for a long time, waiting for us normal programmers to need it.