diff options
author | John Denker <jsd@av8n.com> | 2012-07-23 12:42:23 -0700 |
---|---|---|
committer | John Denker <jsd@av8n.com> | 2012-07-23 12:42:23 -0700 |
commit | fdaeef07dffa8894672da5e51f63d467d452c7c9 (patch) | |
tree | f0f2e77d7ff0520dc9d34ee0fecb67ea2c311bc7 /tools | |
parent | 8eeff9d54790fdc8b3d9e069d191487417102476 (diff) |
much more logical about keeping track of pipes and how they are used
Diffstat (limited to 'tools')
-rw-r--r-- | tools/hi-q.c | 195 | ||||
-rw-r--r-- | tools/hi-test.c | 33 | ||||
-rwxr-xr-x | tools/hi-test.conf | 7 | ||||
-rwxr-xr-x | tools/hi-test5.conf | 6 |
4 files changed, 160 insertions, 81 deletions
diff --git a/tools/hi-q.c b/tools/hi-q.c index 114570f..5ee7688 100644 --- a/tools/hi-q.c +++ b/tools/hi-q.c @@ -64,6 +64,7 @@ foo_sa(CONFIG, 78, "configuration error") ;\ foo_sa(TOOBIG, 98, "message was too big to process (see --max-size)" +typedef enum {MSG, ENV} channeler; #define bufsize 16384 @@ -198,6 +199,7 @@ void slurp(const int inch, const int ouch){ ssize_t todo; for (;;) { ssize_t got = read(inch, buf, bufsize); + //xx cerr << "slurp: read returns " << got << endl; if (got == 0) { // EoF break; } @@ -210,6 +212,7 @@ void slurp(const int inch, const int ouch){ todo = got; while (todo) { ssize_t sent = write(ouch, buf, todo); + //xx cerr << "slurp: write returns " << sent << endl; if (sent < 0 && errno != EINTR) { fprintf(stderr, "hi-q: output error on fd%d : ", ouch); perror(0); @@ -282,6 +285,22 @@ string basename(const string path){ return path; } +void attach(const int pipe_end, const int fd, const int kidno){ + cerr << "attaching current pipe_end " << pipe_end + << " to " << fd + << " for " << kidno << endl; + if (pipe_end != fd) { + int rslt = dup2(pipe_end, fd); + if (rslt < 0) { + fprintf(stderr, "hi-q: dup2(%d,%d) failed for kid %d : ", pipe_end, fd, kidno); + perror(0); + exit(ex_syserr); + } + close(pipe_end); + } + +} + int main(int argc, char** argv) { { progname = *argv; @@ -299,7 +318,6 @@ bar int kidstatus; int rslt; - int loose_end = 0; // our original stdin typedef vector<string> VS; vector<jobber> filter; @@ -395,11 +413,28 @@ bar // to close it and dup() something useful onto it. map<int,int> iiofpid; - + map<channeler,int> next_read; + next_read[MSG] = 0; // our original stdin + next_read[ENV] = -1; // no kid is (yet) empowered to read envelope info + int slurp_read(1); // our original non-standard input + int slurp_write = -1; // effectively next_write[ENV]; + map<channeler,int> current_read; + map<channeler,int> cur_write; // current kid writes here + cur_write[MSG] = -1; + cur_write[ENV] = -1; + +// important loop to start all kids for (unsigned int ii=0; ii < nkids; ii++){ /* loop starting all kids */ - //xx cerr << "top of loop ... loose end " << loose_end << " for " << ii << endl; - if (loose_end > 20) exit(99); - int kid_end; + current_read = next_read; + + cerr << "top of loop: " + << " cr.MSG: " << current_read[MSG] + << " cr.ENV: " << current_read[ENV] + << " w.MSG: " << cur_write[MSG] + << " w.ENV: " << cur_write[ENV] + << " for " << ii << endl; + if (current_read[MSG] > 20) exit(99); + if (current_read[ENV] > 20) exit(99); int datapipe[2]; @@ -407,17 +442,15 @@ bar case series: case qq: case sa: -// connect *old* loose end to this kid's stdin - //xx cerr << "moving old loose end " << loose_end << " to 0 for " << ii << endl; - if (loose_end) { - close(0); - dup2(loose_end, 0); - close(loose_end); - } -// Create a pipe, which will be used to connect -// this child's fd1 to the next child's fd0 ... -// except for the last kid, which reads both fd0 and fd1, +// Create a new pipe. +// Pipe must be created here (in the parent). +// The intended bindings must be figured out shortly below. +// Some of the bindings must be hooked up later (in the child), +// while others are used by the parent (e.g. envelope slurp). +// This pipe will be used (by the children) to connect +// this child's output to the next child's input ... +// except for the special kid, which reads both fd0 and fd1, // while writing nothing. rslt = pipe(datapipe); if (rslt < 0) { @@ -425,6 +458,10 @@ bar perror(0); exeunt(ex_syserr); } + if (1) cerr << "new pipe" + << " reading: " << datapipe[rEnd] + << " writing: " << datapipe[wEnd] + << endl; break; case postspam: case stub: @@ -435,20 +472,23 @@ bar exeunt(ex_syserr); } -// For N-1 kids, the loose end feeds forward. -// It will be written by this kid and read by the next kid. -// For the special kid, the loose end will be its nonstandard input. -// It will be written by us (hi-q) and read by the last kid. - +// figure out the intended bindings: switch (filter[ii].mode) { - case series: case sa: - loose_end = datapipe[rEnd]; - kid_end = datapipe[wEnd]; + case series: + cur_write[MSG] = datapipe[wEnd]; + next_read[MSG] = datapipe[rEnd]; break; case qq: - loose_end = datapipe[wEnd]; // reverse of normal "series" case - kid_end = datapipe[rEnd]; // reverse of normal "series" case + if (slurp_write >= 0){ + cerr << "???? multiple qq jobs?" << endl; + } + slurp_write= datapipe[wEnd]; + current_read[ENV] = datapipe[rEnd]; + next_read[ENV] = -1; + next_read[MSG] = -1; + cur_write[ENV] = -1; + cur_write[MSG] = -1; break; case postspam: case stub: @@ -467,7 +507,7 @@ bar } iiofpid[kidpid[ii]] = ii; if (!kidpid[ii]) { /*** child code ***/ - if (verbose) cerr << "top of kid ... loose end " << loose_end << " for " << ii << endl; + if (verbose) cerr << "top of kid ... loose end " << current_read[MSG] << " for " << ii << endl; pid_t kidgroup(0); // process group for all kids is // equal to pid of kid#0 @@ -513,31 +553,19 @@ bar } } - if (0) cerr << "before closing loose end " << loose_end - << " and kid end " << kid_end - << " for " << ii << endl; switch (filter[ii].mode){ - case sa: case qq: + attach(current_read[MSG], 0, ii); + attach(current_read[ENV], 1, ii); + break; + case sa: case series: - close(loose_end); // the reading end is none of this kid's business - // except last kid: writing end - - // Note this does an implicit close on the previously-open fd1: - rslt = dup2(kid_end, 1); // the writing end is stdout for this kid - // except last kid: nonstandard input - if (rslt < 0) { - fprintf(stderr, "hi-q: kid %d: dup2(%d,1) failed: ", ii, kid_end); - perror(0); - exit(ex_syserr); - } - close(kid_end); // use fd1 instead now - // OK, at this point this kid is set up to read fd0 and write fd1 - // (except last kid reads fd1 as well as fd0). + attach(current_read[MSG], 0, ii); + attach(cur_write[MSG], 1, ii); break; case stub: case postspam: - // nothing to do + // nothing to hook up; no pipe was even created. break; case fail: cerr << "should never happen: invalid filter" << endl; @@ -545,6 +573,12 @@ bar break; } +// in all modes: +// close envelope channel in kid space +// (leaving it open in parent space) + close(current_read[ENV]); + close(slurp_write); + //// probe_fd(); int ntok = filter[ii].cmd.size(); @@ -576,7 +610,10 @@ bar perror(0); exeunt(ex_syserr); } - close(kid_end); + +// these tricks are for kid: + close(cur_write[MSG]); + close(cur_write[ENV]); // Let kid #0 run a little ways: if (ii==0) { @@ -598,6 +635,7 @@ bar } /* end loop starting all kids */ // here with the whole pipeline of kids launched +// parent program continues close(resync[wEnd]); // important, so that block gets released close(resync[rEnd]); // less important, just housecleaning @@ -730,35 +768,46 @@ bar // Here if all filters agree this is not spam. // Now it is safe to transfer the envelope information: - slurp(1, loose_end); - close(1); - close(loose_end); + + if (0) cerr << "about to slurp: " + << " cr.MSG: " << current_read[MSG] + << " cr.ENV: " << current_read[ENV] + << " w.MSG: " << cur_write[MSG] + << " w.ENV: " << cur_write[ENV] + << " slurp_read: " << slurp_read + << " slurp_write: " << slurp_write + << endl; + + slurp(slurp_read, slurp_write); + close(slurp_write); + close(slurp_read); // now that the envelope information has been transfered, // wait for the last kid in the usual way - { - for(;;) { - waitpid(special_pid, &kidstatus, WUNTRACED); - if (WIFEXITED(kidstatus)) { - int sts = WEXITSTATUS(kidstatus); - cerr << progid - << " says: qq program" - << " i.e. " << basename(filter[nkids-1].cmd[0]) - << "[" << kidpid[nkids-1] << "]" - << " returned status " << sts - << endl; - return sts; - } else if (WIFSIGNALED(kidstatus)) { - cerr << progid - << " says: qq program" - << " i.e. " << basename(filter[nkids-1].cmd[0]) - << "[" << kidpid[nkids-1] << "]" - << " was killed by signal " << WTERMSIG(kidstatus) - << endl; - return ex_syserr; - } else { - /* paused, not dead */ - } + + for(;;) { + waitpid(special_pid, &kidstatus, WUNTRACED); + if (WIFEXITED(kidstatus)) { + int sts = WEXITSTATUS(kidstatus); + cerr << progid + << " says: qq program" + << " i.e. " << basename(filter[nkids-1].cmd[0]) + << "[" << kidpid[nkids-1] << "]" + << " returned status " << sts + << endl; + return sts; + } else if (WIFSIGNALED(kidstatus)) { + cerr << progid + << " says: qq program" + << " i.e. " << basename(filter[nkids-1].cmd[0]) + << "[" << kidpid[nkids-1] << "]" + << " was killed by signal " << WTERMSIG(kidstatus) + << endl; + return ex_syserr; + } else { + /* paused, not dead */ } - } + } /* loop until all kids accounted for */ + // should never get here; + // exit from within loop is the only way out } diff --git a/tools/hi-test.c b/tools/hi-test.c index e2626cc..0661ada 100644 --- a/tools/hi-test.c +++ b/tools/hi-test.c @@ -3,6 +3,7 @@ #include <stdlib.h> #include <string> #include <signal.h> +#include <sstream> #include <stdio.h> /* perror() */ @@ -13,6 +14,8 @@ const int sa_good(0); const int sa_spam(1); const int sa_usage(64); +int verbosity(0); + //////////////// // little utility to help with argument parsing: // @@ -42,10 +45,12 @@ void exeunt(const int sts){ using namespace std; string progname; +string progid; +int mypid; void dump(const string var){ char* str = getenv(var.c_str()); - cerr << progname << ": " << var; + cerr << progid << " " << var; if (str) cerr << " is set to '" << str << "'" << endl; else cerr << " is not set." << endl; } @@ -55,10 +60,19 @@ void countsome(const int unit){ int total(0); for (;;) { int rslt = read(unit, buf, sizeof(buf)); + if (verbosity) cerr << "hi-test: count: unit " << unit + << " read returns " << rslt << endl; if (rslt <= 0) break; total += rslt; } - cerr << "read " << total << " bytes from unit " << unit << endl; + cerr << progid + << " read " << total << " bytes from unit " << unit << endl; +} + +string basename(const string path){ + size_t where = path.rfind("/"); + if (where != string::npos) return path.substr(1+where); + return path; } int main(int _argc, const char** _argv){ @@ -68,7 +82,16 @@ int main(int _argc, const char** _argv){ int countmode(0); int argc(_argc); const char **argv(_argv); - progname = *argv; argv++; argc--; + + { + progname = *argv; + mypid = getpid(); + stringstream binder; + binder << "+++++ " << basename(progname) << "[" << mypid << "]"; + progid = binder.str(); + } + + argv++; argc--; while (argc) { string arg(*argv); argv++; argc--; @@ -113,8 +136,8 @@ int main(int _argc, const char** _argv){ exit(sa_usage); } } - - cerr << "++++ hi-test pid: " << getpid() << " group: " << getpgid(0); + + cerr << progid << " group: " << getpgid(0); char* foo = getenv("HI_Q_GROUP"); if (foo) cerr << " HI_Q_GROUP: " << foo; cerr << endl; diff --git a/tools/hi-test.conf b/tools/hi-test.conf index f692f37..20df5a7 100755 --- a/tools/hi-test.conf +++ b/tools/hi-test.conf @@ -1,6 +1,7 @@ -#! /usr/local/bin/bash-c set -x ; </dev/null TCPREMOTEHOST=asf TCPREMOTEIP=1.2.3.4 hi-q $0 ; echo $? +#! /usr/local/bin/bash-c set -x ; 1</tmp/a TCPREMOTEHOST=asf TCPREMOTEIP=1.2.3.4 hi-q $0 ; echo $? # another comment, with blank line between -series hi-test x0 -snooze 10 -stub hi-test x1 -snooze 1 -exit 21 -kill +series /bin/echo asdf +series hi-test x0 -snooze 2 +# stub hi-test x1 -snooze 1 -exit 21 -kill qq hi-test x2 -snooze 10 diff --git a/tools/hi-test5.conf b/tools/hi-test5.conf new file mode 100755 index 0000000..524c954 --- /dev/null +++ b/tools/hi-test5.conf @@ -0,0 +1,6 @@ +#! /usr/local/bin/bash-c set -x ; 1</tmp/a TCPREMOTEHOST=asf TCPREMOTEIP=1.2.3.4 hi-q $0 ; echo $? + +# another comment, with blank line between +series /bin/echo asdfasdfasdf +series /bin/cat +qq hi-test -count |