Wednesday, September 27, 2006

Unix shell pipeline: Part 2 - Using temporary file for IPC

This is a continuation of the prior post. I'll describe some of the findings. Recall the sample problem was doing recursive grep in a directory.

Study Solution:

find . ( -path *obj-ppc -o -path *lib-ppc -o -path *bin-ppc -o
-path *\.CC -o -path */\.T* ) -prune -o -type f -print |
xargs file -- | grep text | cut -d: -f1 |
xargs grep $pattern

The solution comprised of using find to generate the whole list of files. Then use the 'file' command to weed out all non-text files. Finally pass the file-list through grep to do the actual search. We also used 'xargs' to ensure minimal copies of processes are forked (for the file and grep).

We already did some optimization. The best ones include using -prune of find to cut branches of uninteresting directories like binaries, object directories. The second was using 'file' command to remove non-text files. When I did the measurement this process was really slow. Sometimes it even took more time than the last grep! I expected file to run pretty fast as I had thought it uses some magic number just in the beginning of the file and does not scan the whole file content.

As we saw in the last post, the real killer of speed is the IPC between the processes in the pipeline. I removed the IPC and used temporary files to capture the intermediate results. The performance boost was immense. In fact the same run on 100k files could complete literally under 40 seconds when the IPC based one took forever (I think at least 15 minutes). Moreover the file cache influence was substantial. The first run took about 2 minutes or so; but the immediate next run will complete under 40 seconds. The first run would have a CPU util of about 1% while the second run hitting almost 40% at times. This percentage is the CPU time (both user+sys) versus wall-clock time. I think getting to anything above 25% is impressive.

The next step was I tried to exploit the Dual-CPU and all the 2nd cache resources. Our performance is still a lot I/O bound; but let's try to use all the intermediate caches and RAM. The solution is to take each intermediate temporary file and break it into n pieces. Then feed each of these pieces to n threads of the next stage. Thus we combine multi-threading with temporary file IPCs. The gain was good considering we are still bound by IO. BTW the splitting was done using unix 'split' command (see below in code).

An added bonus of the temporary file approach is re-usability of results from prior run. For our problem, if only the search string changes, say from needle to pin, we can reuse the results of all the prior stages. Thus no need to fire up find or file. Just jump directly to the last stage of the pipeline.

Thus the bottom line observation: File systems with large RAM caches have gone through decades of optimizations - so exploit it. Instead of using IPC through pipes between a producer and consumer, use temporary files. The loss in parallel running of the two processes is well compensated by allowing each to run at its full speed. Each process's output and input are connected to temporary disk files.

Here are some of the code listing.. showing the essential part of using temporary file for IPC and splitting it for parallel processing. Please excuse the formatting.. I had to resort to HTML pre tag.

# command processor using file IO redirection
# A thread which will run a unix command. Objective is to get the
# command complete ASAP. Thus stdin, stdout, stderr are all redirected
# from regular disk files. The guess is with no other process involved,
# the command should be able to run at top speed
#
class command_processor_fileio (Thread):
def __init__ (self, cmd, fname_stdin, fname_out):
Thread.__init__(self)
self.cmd = cmd # Unix cmd.. must be string..sent to Popen
self.fname_stdin = fname_stdin # stdin redirect fname
self.fname_out = fname_out # common prefix to stdout/stderr
self.sh_output = [] # in case intermediate sh complains
self.fname_stdout = '' # set up later in run
self.fname_stderr = '' # set up later in run
self.status = 0 # status of cmd execution
self.status_is_signaled = 0 # status of cmd execution ..is signal?
def run(self):
cmd_run = self.cmd
thread_name = self.getName()
self.fname_stdout = "%s-%s-stdout.txt" % (self.fname_out, thread_name)
self.fname_stderr = "%s-%s-stderr.txt" % (self.fname_out, thread_name)
redirection = ' >|%s 2>|%s < %s' % (self.fname_stdout, self.fname_stderr, self.fname_stdin)
cmd_run += redirection
if options.debug > 0:
print "Starting thread %s cmd %s" % (thread_name, cmd_run)
run = popen2.Popen3(cmd_run)
run.tochild.close() # all redirec. So we don't expect any input
# following may be skipped if it aids in performance
self.sh_output = run.fromchild.readlines() # just in case sh complains

result = run.wait() # block wait until cmd is finished.

self.status = os.WEXITSTATUS(result)
self.status_is_signaled = os.WIFSIGNALED(result)

#
# Merge all the output of various threads into one file
#
def thread_output_files_merge (stdout_files, stderr_files, target):

fname_stdout = redirection_filename(target)
fname_stderr = redirection_filename(target, is_stderr=True)

# just merge all files.. so do a cat f1 f2 f3 ...fn > f_out
merge_cmd = "cat %s > %s" % ( ' '.join(stdout_files), fname_stdout)
os.system(merge_cmd)
merge_cmd = "cat %s > %s" % ( ' '.join(stderr_files), fname_stderr)
os.system(merge_cmd)

#
# Split one file into many smaller files for each thread to work on
# The routine must return num_threads files... even empty ones are okay
#
def thread_input_file_split (fname, num_threads):

if num_threads == 1:
# easiest to just get out early
return [fname]

fsize = os.path.getsize(fname)
bytes_per_file = fsize / num_threads

split_prefix = "%s-thread-" % (fname,)

if options.use_shortcut > 1: # User gave -s -s .. that means don't
# do the split of files.. just re-use from prior run even the split
# files. NOTE: -t value must remain the same as in prior run
split_files = glob.glob(split_prefix + 'a?') # split.. uses aa..ab.ac
# must have atleast num_threads elements.
if options.debug > 0:
print "Reusing thread split files (%d files) from prior run " "due to -s -s: %s" % (num_threads, split_files)
return split_files[:num_threads]

# before running the split command, remove any old split output files
# from a prior run
old_files = glob.glob(split_prefix + 'a?')# man split.. uses aa..ab
for old_file in old_files:
if options.debug > 0:
print "Previous run leftovers. Removing ..", old_file
os.remove(old_file)

split_cmd = "split -C%d %s %s" % (bytes_per_file, fname, split_prefix)
os.system(split_cmd)

# Assert: We will have at least num_threads now .. not less

split_files = glob.glob(split_prefix + 'a?') # man split.. uses aa..ab.ac

split_files.sort() # exploit aa..ab..naming.. a is the smallest file
while len(split_files) > num_threads:
# Take the last file (smallest one) and merge it with it's prior file
smallest_file = split_files.pop()
merge_cmd = "cat %s >> %s" % (smallest_file, split_files[-1])
if options.debug > 0:
print "Merging smallest file: cmd %s" % (merge_cmd, )
os.system(merge_cmd)
if options.debug > 0:
print "Multiple thread input split_files: ", split_files
# ASSERT: len(split_files) == num_threads
return split_files


#
# Run cmd using given number of threads
#
def execute_child_threads_fileio (cmd, stdin_fname, num_threads, target):

stdin = file_get_all_lines(stdin_fname)
stdin_size = len(stdin)

min_per_thread = 4096 # just some minimum size for a thread
fsize = os.path.getsize(stdin_fname)

if fsize < min_per_thread: # must be * num_threads.. but okay
num_threads = 1

#split the file into num_thread pieces
stdin_fnames = thread_input_file_split(stdin_fname, num_threads)

fname_out = redirection_filename(target, is_prefix=True)

#work_slice = stdin_size / num_threads; # each thread's work size
workers = []
# We are going to chop stdin into equal sized chunks for each thread.
for i in range(num_threads):
#start_index = i * work_slice;
#end_index = start_index + work_slice;
#if i == num_threads - 1: # if last worker...take all remaining
# end_index = stdin_size;
#this_stdin = stdin[start_index:end_index] # this thread's in data

cp = command_processor_fileio(cmd, stdin_fnames[i], fname_out)
workers.append(cp);
cp.start() # fire up the thread to do the data munching

stdout_files = [] # collect each thread's output file
stderr_files = []
for worker in workers:
worker.join() # wait for each worker to finish
#TODO. Handle each thread's return status...
stdout_files.append(worker.fname_stdout)
stderr_files.append(worker.fname_stderr)


# merge all its output files into one.. so that next stage process
# in the pipeline sees one file.
thread_output_files_merge(stdout_files, stderr_files, target)
return ([] , 0, 0)

Monday, September 11, 2006

Unix shell pipeline: Making it multi-lane, Avoiding stop-n-go




This is a CS technical post related to using unix command-line. Just an early warning so that you can walk away, saving your precious browsing time! :). BTW I am aware of some poor formatting issues.. please bear with it..

Shell pipeline is the concept of attaching the stdout of a process to the stdin of another process. See wikipedia for more. This is one of those beautiful unix concepts that stood the test of time. It's simple and extremely effective. It's a shame that modern GUI designers didn't even think of a way of supporting the pipeline concept.. anyway I guess that's a different topic altogether.

Take the simple unix pipeline
$ ls | wc

It prints the number of files in the current directory (for the purist, the number of elements.. )

Recently I was looking at a way to search for a string in a directory tree. The earliest grep didn't do recursive grep. I am sure the newer ones sport the -r flag to do that. They even taken patterns to include/exclude directories.

I started off with the very basic, home grown rgrep.


karthikg@xyz :>cat ~/bin/rgrep
find . -type f -a -exec grep -l $1 {} \;


You run it as 'rgrep needle' to search 'needle' in the whole directory tree.

I need to do a search on a source code tree containing about 100k files ( that's just text files not counting binary files). When I issued the command from the commandline, it took forever to complete. And thus began the optimization process.

The very first step is the well-known trick of using xargs to avoid the exec in find. With -exec of find, we are forking a new process for every element, as find descends the tree. And this is very expensive. With xargs, it tries to feed as much files as possible to each created grep process.

find . -type f -print | xargs grep $pattern

Even this took forever; I started to move away from solving the generic case by exploiting known special properties. Some points to note 1. Almost always I'm interested only in text files. Thus using 'file' command, I can quickly discard binary files before they reach grep. 2. The directory tree has lots of object/executable file directories, which can be skipped. This is a big gain since 'find' need not descend the whole subtree.

Thus the next solution became

find . ( -path *obj-ppc -o -path *lib-ppc -o -path *bin-ppc -o -path *\.CC -o
-path */\.T* ) -prune -o -type f -print |
xargs file -- | grep text | cut -d: -f1 |
xargs grep $pattern


Please note shell quoting of special chars (*, .) is not shown above (that's
one other irritating thing.. can't we somehow tell the shell not to do
that??)

Here's what is happening:
  1. In find, we try as much as possible to skip whole sub-directories. -prune helps us there. I had -regex.. but felt it's too big a gun to use; -path must be light weight and for my needs it does the job.
  2. Then -type f helps to cut down the elements to regular files
  3. The second process in the pipeline, gets the file type, the following two, grabbing the text files and the cut gets the file name alone
  4. The last grep does the heavy weight string searching

I thought the above must run pretty fast. But to my surprise, it was crawling and looked like it will never complete.

I was running on a fairly powerful dual CPU linux machine and a top showed lots of free CPU. So why isn't it completing soon? Where are the bottle-necks?

My first guess is that grep is taking too much time (the last process). I was wondering if I can parallelize the search.. why can't I split the input file-list (of 100k entries) into say 4 chunks and feed to four different grep processes. These 4 can run in parallel and must finish sooner. Here is when I thought of a multi-lane shell pipeline.

Let's take this syntax

p1 | p2 |{number} p3


e.g. find .. | ... |4 xargs grep $pattern

Here I'm telling the shell to use 4 processes to do the last grep work. It must split the input into 4 parts and send them to each of the four xargs. Of course none of today's shell can do this. I ended up writing a python version of the above (see end). I'll discuss more on some of the findings in a later post.

Going back to the original slowness, looks like the good old shell pipeline is causing lots of stop-n-go among the processes. The find can't run in full speed as it fills its output buffer and gets blocked before the next process in the pipeline can read its stdin. Thus even with CPU usage so less, chaining lots of processes didn't help in the throughput. (Later using -fprint of find, I realized the find completed at a fraction, less than 10%, of its time with -print and pipeline)

My multi-lane version performed marginally better.. I make the "file ..cut" part use 4 threads and also the final grep part use another 4 threads.. still the response was sluggish. The bottle-neck seemed that the stop-n-go introduced by connecting processes. I experimented with using temporary files for the output.. and the speed gain was astounding. The whole process got done in a matter of seconds (about 40s) where-as the pipeline one took a whole 14 minutes to complete.

It would be good to have a shell feature where you use intermediate files to connect the processes, rather than directly feeding stdout to stdin. In the early days, avoiding large intermediate data would have appeared a welcoming solution; but today with vast amounts of memory and fast file-systems with large file caches, it makes more sense to run each processe at its full speed, rather than do this stop-n-go due to intermediate buffer issues.

Thus in conclusion I see that the real bottle-neck is not the load of the system, it's the IPC buffering issues introduced by the pipeline. So when high performance is needed, try using temporary files for stdout and stdin, which give enables each process to run at it's top speed. Even though, you are using the disk filesystem, speed gains are significant.



--- multilane_pipeline.py ----
from threading import Thread

# a small worker thread to read out the stdout of a child process
# This is used for proceses which generate lots of output as well as
# take in large stdin
# To solve the classic pipeline deadlock -- ie child can't proceed
# unless someone removes its output; but that someone can't wait to
# push in all the stdin into the child.
# So once a child is forked, before shoving all data into it's stdin,
# make a worker thread using this class. This worker thread will
# remove (periodically..as and when child outputs) child's stdout, so
# that child can continue to make progress
#
# Use only on targets where we push in large stdin *and* target generates
# large stdout. Else not needed... set flag 'is_thread_collect' if you
# anticipate this scenario
#
class stdout_reader (Thread):
def __init__ (self,fd):
Thread.__init__(self)
self.fd = fd # while file object to collect from
self.data_read = [] # the data collected
def run(self): # when giving 'go', pull out all data
self.data_read = self.fd.readlines()

# command processor
# A thread which will run a unix command and collects its output
# It takes in a list containing the whole stdin (thus different from
# standard module commands)
#
class command_processor (Thread):
def __init__ (self, cmd, stdin_list):
Thread.__init__(self)
self.cmd = cmd # Unix cmd.. can be sequence..sent to Popen
self.stdin_sent = stdin_list # What stdin to feed to cmd
self.stdout_rcvd = [] # What output collected from cmd's o/p
self.status = 0 # status of cmd execution
self.status_is_signaled = 0 # status of cmd execution ..is signal?
def run(self):
if options.debug > 0:
print "Starting thread for cmd: %s" % self.cmd
run = popen2.Popen3(self.cmd)
# set up another thread to start reading the stdout ...
reader_thread = stdout_reader(run.fromchild)
reader_thread.start()
# Feed to cmd's stdin
run.tochild.writelines(self.stdin_sent) # push input into child
run.tochild.close()

reader_thread.join() # wait for the worker thread to finish
self.stdout_rcvd = reader_thread.data_read

result = run.wait() # block wait until cmd is finished.

self.status = os.WEXITSTATUS(result)
self.status_is_signaled = os.WIFSIGNALED(result)

#
# Run cmd using given number of threads
# return output and status
#
def execute_child_threads (cmd, stdin, target):
thread_str = target_db[target]['threads']
# Do any $thread to actual value user gave on '-a -t'
num_threads = int(substitute_target_args(target, thread_str))

stdin_size = len(stdin)
if stdin_size < num_threads:
num_threads = 1

work_slice = stdin_size / num_threads; # each thread's work size
workers = []
# We are going to chop stdin into equal sized chunks for each thread.
for i in range(num_threads):
start_index = i * work_slice;
end_index = start_index + work_slice;
if i == num_threads - 1: # if last worker...take all remaining
end_index = stdin_size;
this_stdin = stdin[start_index:end_index] # this thread's in data

cp = command_processor(cmd, this_stdin)
workers.append(cp);
cp.start() # fire up the thread to do the data munching

output = []
for worker in workers:
worker.join() # wait for each worker to finish
output += worker.stdout_rcvd # collect the output it generated
#TODO. Handle each thread's return status...

return (output, 0, 0)