Doubt in Pipelines with built-ins challenge (#NY9): Is it possible to implement pipeline with built in commands without forking the builtins?

I am working on the pipelines with builtins challenge in cpp and was trying to think of ways to implement this. I have come across the fact that in a pipeline, the different tasks should execute concurrently, however the “Notes” in the challenge instruction mentions

  • “* Built-in commands don’t typically involve creating a new process via fork/exec. You’ll need to handle their execution within the shell process while still managing the input/output redirection required by the pipeline.”

Based on this it appears that for builtin tasks ,the instructions state we need not or should not use fork system call . But in that case the redirection of file descriptors will not allow the tasks to execute concurrently, since in the parent (shell) process , the file descriptor will only point to one file descriptor when the task is executing ,and if other task requires a different file descriptor then it will have to wait until this task is finished.
So to summarize I want to know whether it is true that tasks in a pipeline have to be executed concurrently, compulsorily, and if so then is it actually possible to implement the built ins in the pipeline without forking a child process?

Pipelines are expected to work with sequential programs typically taking input and writing output and using block I/O. If you don’t run them in parallel you will have issues with:

  • No progress/interactivity. Since only entry and exit to the pipeline interact with shell I/O you will not have any interactivity: only output from the last task will go to the console. So if the input program is waiting for you to “press continue” you will not know about it.
  • You will have to buffer ALL of each tasks output since the next task is not running
  • You pipeline is broken if the input generating task does not terminate on its own. This can happen either because it is a by design a long-running/forever running process, or later stages of the pipeline actually feeds in input.
1 Like

As to how to implement a “parallel task” without forking, can’t you use threads? Of course there is more complex select/epoll event I/O, but threads seems the simpler option? Just pass the thread the file descriptors it should use for reading and writing?

Thanks for your reply.After your suggestion I checked and saw that if I create threads for example using clone system call, they will share the file descriptors with the parent thread, and so I think concurrently running threads will not be able to properly perform input and output redirection. Managing input and output redirection between threads sharing the file descriptors will require synchronization, but then I don’t think they will be running parallelly. Since you have mentioned that parallel execution is infact a requirement , the question only remains about how to implement the builtins without forking a child process ,as mentioned in the challenge instructions , unless I am wrong about the fact that threads share the file descriptors.

Your question actually prompted me to earlier do a proof-of-concept. I am not sure what you mean by synchronizing file descriptors? Each spawned task, whether by process or thread, gets a file descriptor for both STDIN and STDOUT. They never share (in the uses same FD sense) file descriptors; there is nothing to synchronize. The main issue I discovered here is that while file descriptors for processes are automatically closed on process exit, the local thread ones need more care.

The following Python code uses an example on Windows, but the concept remains the same I think.

For each “link” between task we create a pipe and assign the “left” task the output end of the pipe and the “right” task the gets the input end of the pipe. I don’t manually “pump” the input and output to the entire pipeline as some people do by creating extra pipe for the shell to write and read from: I just pass in the shell’s STDIN as input at the beginning of the pipeline and receive into the shell STDOUT the final output. To prevent anything from accidentally killing the shell I duplicate those FDs first (note you need to ensure they the pipe and initial FD are inheritable, which Popen takes care of here).

This doesn’t handle redirection but that is easy to add if you have parsed the redirections.

IMPORTANT: Note it is SUPER important that the non-process based task close their stdout! Otherwise the next task in the pipeline never know we are done.

from subprocess import Popen
import os
from os.path import basename
import threading
import sys

IS_EXECUTABLE = 1
IS_BUILT_IN = 0

# "built-in"
def line_to_basename(stdin, stdout, _unused_args):
    for line in stdin:
        line = line.strip()
        try:
            print(basename(line), file=stdout)
        except:
            pass
    stdout.close()


pipeline = [
    (fr'where /r {os.environ['USERPROFILE']} *.txt', IS_EXECUTABLE),
    (line_to_basename, IS_BUILT_IN),
    (r'findstr /i "test.*\.txt$"', IS_EXECUTABLE),
]

def pipeline_test(pipeline, pl_stdin=sys.stdin, pl_stdout=sys.stdout):
    processes = []
    threads = []
    
    # Create pipes
    read_fd = []
    write_fd = []
    for _i in range(len(pipeline)-1):
        r, w = os.pipe()
        read_fd.append(r)
        write_fd.append(w)
    # Add the ends. We dup to ensure we don't lose our Shell's hold
    read_fd = [os.dup(pl_stdin.fileno())] + read_fd
    write_fd = write_fd + [os.dup(pl_stdout.fileno())]
    
    # We wrap in thread in normal files to avoid double free later
    # in case thread forgot to close properly.
    thread_files = []
    for (cmd, kind), w_fd, r_fd in zip(pipeline, write_fd, read_fd):
        if kind == IS_EXECUTABLE:
            process = Popen(cmd, stdin=r_fd, stdout=w_fd)
            # Subprocess closes the handles now.
            # Close our hold on the handles
            os.close(r_fd)
            os.close(w_fd)
            processes.append(process)
        elif kind == IS_BUILT_IN:
            # We wrap in File to prevent double os.close later.
            r_file = os.fdopen(r_fd, 'r')
            w_file = os.fdopen(w_fd, 'w') 
            thread_files.extend((r_file, w_file,))
            thread = threading.Thread(target=cmd, args=(r_file, w_file, "unused built-in arg"))
            threads.append(thread)
            thread.start()
    
    for p in processes:
        p.wait()
    for t in threads:
        t.join()
    # Ensure we closed our thread files
    for f in thread_files:
        f.close()


if __name__ == '__main__':
    pipeline_test(pipeline)
2 Likes

In your code you are passing the input and output files as parameter and using output file within the print function, so basically the built in process does not use the inherited file descriptor of stdout from the parent thread in the main (shell ) process. This indeed fulfills the requirement of the challenge.
My question did not take this facility of mentioning the input source and output destination for the thread’s execution function into account , and I was simply assuming that whatever built in command I will be using, will take input from stdin and pass it to its stdout .Since stdin and stdout was same for all threads so I felt the requirement of synchronization to manage this parallel use of stdin and stdout among multiple threads.
In my case I should probably use open(), write(), and close() system calls within the threads (while passing the file descriptors as arguments during thread creation )instead of using the stdin and stdout.
Thanks for your help.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.