By the end of my last chunk of work on WideFinder, I'd found a set of things to investigate to improve performance. All were helpful, one way or another.
- Use fixed string searches either instead of (for referrer checking) or as well as (for page detection) regular expressions [Fredrik Lundh]
- Pre-fetch the regular expression method outside the loop [Fredrik Lundh]
- Snap my Python wrapper of
dict
to use a built-in [me] - Use temporary files instead of pipes to communicate between workers and master [Alex Morega]
- Out of order processing [me]
- Reducing concurrent memory usage to fit within core [me]
- Use the CoolStack SPARC-optimised build of Python 2.5 [Tim Bray]
With smaller data sets, the first three together gave around a 10% improvement running on my dev machine; the last gave about 1.5% on the WF-2 machine over the 2.5 build I'd made myself; however I can't use it for the full run because it's 32 bit (the problem here is discussed below).
Initially I was running tests on my dev machine, using small amounts of data. There, out of order processing lost out slightly. It's bound to be a little slower in pure terms, because I had to use files instead of pipes for communication (I'm guessing the output was big enough to fill up the pipe; with some thought this could be fixed), but is more efficient with lots of workers across a large amount of data, because the run time variance increases. Additionally, the naive way of decreasing concurrent memory usage meant increasing the number of reduces; the alternative would increase the complexity of the user code, which I wanted to avoid. So there's probably some further improvements that can be made.
Anyway: my final result. I re-tested my two final versions, one with out-of-order processing and one not, on the 10m data sets before letting them both rip on the full data set. On the 10m line set, I was seeing up to a 25% improvement over my previous best run; for the whole lot, I saw only an 18% improvement: 34m15.76s. At this kind of scale, out of order processing, even with the file serialisation overhead, gives around a 10% speed improvement. (This is somewhat less than Mauricio Fernandez found doing pretty much the same thing in OCaml.)
This isn't even as good as the other Python implementation; for some reason I'm still getting nothing like efficient use out of the processor. Possibly having readers feeding the data to the processors over pipes might improve things here. I also still had to use the 64 bit version of Python, as Python's mmap
doesn't support the offset
parameter (there's a patch to allow it, which is private on the Python SourceForge site, sigh). In fact, it feels like every architectural optimisation I've tried to make has made things slower because of Python; the only things that have really speeded it up (beyond the initial parallelism) are the lower level optimisations that make the code less idiomatically Pythonic anyway.
It's possible that with some more thought and work, it would be possible to reduce this further; but I'd bet that Python will never get a run time below 15 minutes on WF-2 (although that would still put it in the same ballpark as the JVM-based implementations). At this point I'm mostly into optimising without being able to do the optimisations I want to, which seems fairly pointless; I've also sunk more time into this than I originally intended, so I'm going to stop there.
The code
User code (86 LOC)
I suspect that more code can be trimmed from here, particularly from the top()
function, but it seems neater in many ways to leave it largely as the direct translation of Tim's original, with a few optimisations. Python is somewhat more verbose than Ruby; for this kind of job I find the readability about the same.
The only optimisation I'm unhappy with is the line that tests for a fixed string and then tests for a regular expression that starts with that fixed string; this does seem to work faster, but is crazily unreadable, and really needs a comment. Of course, regular expressions in general tend to cause readability problems on their own; it's notable that my thought on writing this was to think "hey, we could just get rid of the regular expression entirely and see what happens"—what happens is that we start treating URIs with '.' in them, such as /ongoing/When/200x/2007/06/17/IMGP5702.png
, as pages, when they aren't, so all the results are wrong.
You can't do this easily by avoiding regular expressions entirely; Tim's URI layout means that /ongoing/When/200x/2007/06/17/
is an archive list page, rather than a genuine entry page, so you can't just check with a '.' somewhere in the URI (although this is actually slightly slower than using a regular expression anyway). However looking into this in detail brought up errors in the regular expression parsing as well: /ongoing/When/200x/2005/07/14/Atom-1.0
is a valid page URI, but the regex thinks it's an auxiliary file. There's also the unexpected /ongoing/When/200x/2005/04/18/Adobe-Macromedia?IWasEnlightenedBy=MossyBlog.com
, which while not strictly within Tim's URI layout, is allowed to exist and be a page resource by his server configuration. Regular expressions, although powerful, are very difficult to craft correctly; this is made much harder by the problem being (essentially) an ad hoc one: I doubt Tim ever sat down and designed his URI layout with a thought to doing this kind of processing on it. (Even if he had, it would probably get caught out by something similar.)
import re, sys, parallel
def top(dct, num=10):
keys = []
last = None
def sorter(k1,k2):
if k2==None:
return 1
diff = cmp(dct[k1], dct[k2])
if diff==0:
return cmp(k2,k1)
else:
return diff
for key in dct.keys():
if sorter(key, last)>0:
keys.append(key)
keys.sort(sorter)
if len(keys)>num:
keys = keys[1:]
last = keys[0]
keys.reverse()
return keys
hit_re = re.compile(r'^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$')
hit_re_search = hit_re.search
hit_str = "/ongoing/When/"
ref_str = '"http://www.tbray.org/ongoing/'
def report(label, hash, shrink = False):
print "Top %s:" % label
if shrink:
fmt = " %9.1fM: %s"
else:
fmt = " %10d: %s"
for key in top(hash):
if len(key) > 60:
pkey = key[0:60] + "..."
else:
pkey = key
if shrink:
print fmt % (hash[key] / 1024.0 / 1024.0, pkey)
else:
print fmt % (hash[key], pkey)
print
def processor(lines, driver):
u_hits = driver.get_accumulator()
u_bytes = driver.get_accumulator()
s404s = driver.get_accumulator()
clients = driver.get_accumulator()
refs = driver.get_accumulator()
def record(client, u, bytes, ref):
u_bytes[u] += bytes
if hit_str in u and hit_re_search(u):
u_hits[u] += 1
clients[client] += 1
if ref !='"-"' and not ref.startswith(ref_str):
refs[ref[1:-1]] += 1 # lose the quotes
for line in lines:
f = line.split()
if len(f)<11 or f[5]!='"GET':
continue
client, u, status, bytes, ref = f[0], f[6], f[8], f[9], f[10]
if status == '200':
try:
b = int(bytes)
except:
b = 0
record(client, u, b, ref)
elif status == '304':
record(client, u, 0, ref)
elif status == '404':
s404s[u] += 1
return [u_hits, u_bytes, s404s, clients, refs]
(u_hits, u_bytes, s404s, clients, refs) = parallel.process(sys.argv[1], processor)
print "%i resources, %i 404s, %i clients\n" % (len(u_hits), len(s404s), len(clients))
report('URIs by hit', u_hits)
report('URIs by bytes', u_bytes, True)
report('404s', s404s)
report('client addresses', clients)
report('referrers', refs)
Supporting library code (134 LOC)
Perhaps 30 LOC here is logging and debugging, or could be removed by getting ridding of a layer or two of abstraction (I had lots of different driver types while working on this). This is actually my final running code: note that various things aren't needed any more (such as the chunksize
parameter to ParallelDriver.process_chunk
). This impedes readability a little, but hopefully it's still fairly obvious what's going on: we run J
children, each of which subdivides its part of the data into a number of equal chunks (calculated based on the total memory we want to use, but calculated confusingly because I tried various different ways of doing things and evolved the code rather than, you know, thinking), and processes and reduces each one separately. The result per child gets pushed back over a pipe, and we run a final reduce per child in the parent process.
import os, mmap, string, cPickle, sys, collections, logging
logging.basicConfig(format="%(message)s")
class ParallelDriver:
# chunksize only used to detect overflow back to start of previous chunk
def process_chunk(self, processor, mm, worker_id, chunk_id, chunk_start, chunk_end, chunksize):
start = chunk_start
end = chunk_end
if start>0:
if mm[start]=='\n':
start -= 1
while mm[start]!='\n':
start -= 1
if mm[start]=='\n':
start += 1
# [start, end) ie don't include end, just like python slices
mm.seek(start)
class LineIter:
def __init__(self, mm):
self.mm = mm
def __iter__(self):
return self
def next(self):
c1 = self.mm.tell()
l = self.mm.readline()
c2 = self.mm.tell()
if c2 > end or c1 == c2:
raise StopIteration
return l
it = LineIter(mm)
result = processor(it, self)
return result
def get_accumulator(self):
return collections.defaultdict(int)
class ParallelMmapFilesMaxMemDriver(ParallelDriver):
def __init__(self, file):
self.file = file
def process(self, processor):
# based on <http://www.cs.ucsd.edu/~sorourke/wf.pl>
s = os.stat(self.file)
f = open(self.file)
j = os.environ.get('J', '8')
j = int(j)
maxmem = os.environ.get('MAXMEM', 24*1024*1024*1024)
maxmem = int(maxmem)
size = s.st_size
if maxmem > size:
maxmem = size
chunksize = maxmem / j
PAGE=16*1024
if chunksize > PAGE:
# round to pages
chunksize = PAGE * chunksize / PAGE
if chunksize < 1:
chunksize = 1
total_chunks = size / chunksize
chunks_per_worker = float(total_chunks) / j
commfiles = {}
for i in range(0, j):
commfile = os.tmpfile()
pid = os.fork()
if pid:
commfiles[pid] = commfile
else:
pickle = cPickle.Pickler(commfile)
result = None
worker_start = int(i*chunks_per_worker) * chunksize
worker_end = int((i+1)*chunks_per_worker) * chunksize
if i==j-1:
worker_end = size
chunks_for_this_worker = (worker_end - worker_start) / chunksize
for chunk in range(0, chunks_for_this_worker):
chunk_start = worker_start + chunk*chunksize
if chunk_start >= size:
break
chunk_end = worker_start + (chunk + 1)*chunksize
if chunk==chunks_for_this_worker-1:
chunk_end = worker_end
mm = mmap.mmap(f.fileno(), size, mmap.MAP_SHARED, mmap.PROT_READ)
interim_result = self.process_chunk(processor, mm, i, chunk, chunk_start, chunk_end, chunksize)
mm.close()
if interim_result==None:
continue
if result==None:
result = interim_result
else:
for idx in range(0, len(interim_result)):
for key in interim_result[idx].keys():
result[idx][key] += interim_result[idx][key]
pickle.dump(result)
commfile.close()
sys.exit(0)
final = None
for i in range(0, j):
(pid, status) = os.wait()
readf = commfiles[pid]
readf.seek(0)
unpickle = cPickle.Unpickler(readf)
result = unpickle.load()
if result==None:
readf.close()
continue
if final==None:
# first time through, set up final accumulators
final = []
for i in range(0, len(result)):
final.append(self.get_accumulator())
for i in range(0, len(result)):
for key in result[i].keys():
final[i][key] += result[i][key]
readf.close()
f.close()
return final
def process(file, processor):
d = ParallelMmapFilesMaxMemDriver(file)
return d.process(processor)
Final thoughts
First off: functional languages. Let's use them more. Mauricio Fernandez's OCaml implementation is still the leader, and despite being a little more verbose than the Ruby original is still pretty damn readable (he's actually just announced an even faster one: 25% faster by using block rather than line-oriented I/O; the LOC count is creeping up with each optimisation he does, though). Functional languages require you to think in a different way, but when you're dealing with streams of data, why on earth would you not want to think like this? Better, OCaml gives you imperative and object-oriented language features as well, and seems to pack a solid standard library. I haven't learned a new language for a while, and I'm a little rusty on the functional languages I've been exposed to anyway; I guess OCaml is going to be my next.
Second off: compilers. Let's use them more as well. It's all very well to have an interpreted language where you have no build phase, and can just dive straight into fiddling with code, but I'm really not convinced this is a valid optimisation of the programming process. For one thing, if you're working with tests (and please, work with tests), my experience is that running them will vastly out-weigh any compile time; in particular, if you use an interpreted language, it is likely to completely mask the compilation time. There are enough good, high-level, compiled languages around that compilation doesn't put you on the wrong end of an abstraction trade-off (the way using assembler over C used to before everyone gave up writing assembler by hand).
I have an anecdote that I think further backs up my assertion that a compilation step isn't an impediment to programming. The only time I can remember writing a non-trivial program and having it work first time was with a compiled language. Not because of type safety, not because the compiler caught lots of foolish mistakes. What happened was I was using voice recognition software to control my computer, due to a very serious bout of RSI, so I spent a fair amount of time thinking about the problem before going near the computer. It took about the same amount of time overall as if I'd been able to type, and had started programming straight away—a couple of days total. But this way, I wasn't exploring the problem at the computer, just trying things out in the way I might with an interpreted language, particularly one with an interpreter shell. Programming is primarily about thinking: in many cases doing the thinking up front will take at most as long as sitting down and starting typing. Plus, you can do it at a whiteboard, which means you're standing up and so probably being more healthy, and you can also pretend you're on House, doing differential diagnosis of your problem space.
There are situations where you want to explore, of course; not where you have no idea how to shape your solution, but rather where you aren't sure about the behaviour of the systems you're building on. WF-2 has been mostly like that: much of my time was actually spent writing subtle variants of the parallel driver to test against each other, because I don't understand the effects of the T1 processor, ZFS, and so on, well enough. A lot of the remaining time was spent in debugging line splitting code, due to lack of up-front thought. Neither of which, as far as I'm concerned, is really getting to the point of WF-2. I suspect this is true of most people working on WF-2; you choose your approach, and it comes down to how fast your language goes, how smart you are, and how much time you spend profiling and optimising.
Anyway, you need to have a rapid prototyping environment to explore the systems you're working with, perhaps an interpreted language or one with a shell; and you want a final implementation environment, which may want to use a compiled language. For the best of both worlds, use a language with an interpreter and a native code compiler. (Perhaps unsurprisingly, OCaml has both.)
Where does this leave us with WF-2? None of my original three questions has been answered, although I certainly have learned other things by actually getting my hands dirty. It may be premature to call this, because people are still working on the problem, but I'm not aware of anyone trying a different approach, architecturally speaking, probably because straightforward data decomposition has got so close to the theoretical best case. The optimisations that we've seen work at this scale are helpful—but may not be trivial for the programmer-on-the-street to use, either because language support is sometimes flimsy, or because they'll be building on top of other people's libraries and frameworks. So it's good news that, besides automatic parallelisation of user code, it's possible to parallelise various common algorithms. GCC has parallel versions of various algorithms, there are various parallel libraries for Java, and we can expect other systems to follow. For obvious reasons there's lots of research in this space; check out a list of books on parallel algorithms. In particular check out the dates: this stuff has been around for a long time, waiting for us normal programmers to need it.