diff options
-rw-r--r-- | tools/hi-q.c | 119 | ||||
-rwxr-xr-x | tools/hi-test4.conf | 2 |
2 files changed, 93 insertions, 28 deletions
diff --git a/tools/hi-q.c b/tools/hi-q.c index 5ee7688..3d654c5 100644 --- a/tools/hi-q.c +++ b/tools/hi-q.c @@ -22,10 +22,15 @@ using namespace std; #include <fstream> #include <sstream> #include <string> +#include <list> #include <vector> #include <sstream> #include <map> +#include <sys/types.h> /* for fstat */ +#include <sys/stat.h> /* .. */ +#include <unistd.h> /* .. */ + // error exit codes, mostly as stated in qmail.c #define bar \ foo(good, 0) ;\ @@ -63,6 +68,13 @@ foo_sa(NOPERM, 77, "permission denied") ;\ foo_sa(CONFIG, 78, "configuration error") ;\ foo_sa(TOOBIG, 98, "message was too big to process (see --max-size)" +string progname; +pid_t mypid; +string progid; + +extern char** environ; +const int rEnd(0); // end of a pipe for reading +const int wEnd(1); // end of a pipe for writing typedef enum {MSG, ENV} channeler; @@ -204,7 +216,9 @@ void slurp(const int inch, const int ouch){ break; } if (got < 0) { - fprintf(stderr, "hi-q: input error: "); + cerr << progid + << " slurp: input error on fd " << inch + << " : "; perror(0); exeunt(ex_comerr); } @@ -214,7 +228,9 @@ void slurp(const int inch, const int ouch){ ssize_t sent = write(ouch, buf, todo); //xx cerr << "slurp: write returns " << sent << endl; if (sent < 0 && errno != EINTR) { - fprintf(stderr, "hi-q: output error on fd%d : ", ouch); + cerr << progid + << " slurp: output rror on fd " << ouch + << " : "; perror(0); exeunt(ex_comerr); } @@ -261,10 +277,6 @@ void usage() { // we have data coming in on fd 0. // and envelope / control information coming in on fd 1. -string progname; -pid_t mypid; -string progid; - void dump(const string var){ char* str = getenv(var.c_str()); cerr << progid << var; @@ -277,7 +289,39 @@ int xclose(int arg){ return close(arg); } -extern char** environ; +typedef list<int> LI; +void block_fd(const LI todo){ + int blocker(-1); + int inplace(0); + + for (LI::const_iterator ptr = todo.begin(); + ptr != todo.end(); ptr++) { + int fd = *ptr; + struct stat statbuf; + int rslt = fstat(fd, &statbuf); + if (rslt) { + if (0) { + cerr << "definitely needed to block_fd unit " << fd << " : "; + perror(0); + } + if (blocker < 0) { + int blockex[2]; + pipe(blockex); + close(blockex[rEnd]); + blocker = blockex[wEnd]; + } + if (blocker != fd){ + dup2(blocker, fd); + close(blocker); + } else { + inplace++; + } + } else { + if (0) cerr << "unit " << fd << " already blocked" << endl; + } + } + if (!inplace) close(blocker); +} string basename(const string path){ size_t where = path.rfind("/"); @@ -286,7 +330,7 @@ string basename(const string path){ } void attach(const int pipe_end, const int fd, const int kidno){ - cerr << "attaching current pipe_end " << pipe_end + cerr << "attaching current pipe_end " << pipe_end << " to " << fd << " for " << kidno << endl; if (pipe_end != fd) { @@ -298,7 +342,7 @@ void attach(const int pipe_end, const int fd, const int kidno){ } close(pipe_end); } - + } int main(int argc, char** argv) { @@ -397,8 +441,6 @@ bar vector<pid_t> kidpid(nkids); // indexed by kid number - const int rEnd(0); // end of a pipe for reading - const int wEnd(1); // end of a pipe for writing int sync[2]; int resync[2]; if (pipe(sync) != 0) cerr << "sync pipe failed" << endl; @@ -419,19 +461,30 @@ bar int slurp_read(1); // our original non-standard input int slurp_write = -1; // effectively next_write[ENV]; map<channeler,int> current_read; - map<channeler,int> cur_write; // current kid writes here - cur_write[MSG] = -1; - cur_write[ENV] = -1; + map<channeler,int> current_write; // current kid writes here + current_write[MSG] = -1; + current_write[ENV] = -1; + list<int> blockme; + blockme.push_back(0); + blockme.push_back(1); // important loop to start all kids for (unsigned int ii=0; ii < nkids; ii++){ /* loop starting all kids */ + string kidid; + { + stringstream foo; + foo << ii + << " mode " << filter[ii].mode + << " " << filter[ii].cmd[0]; + kidid = foo.str(); + } current_read = next_read; cerr << "top of loop: " - << " cr.MSG: " << current_read[MSG] + << " cr.MSG: " << current_read[MSG] << " cr.ENV: " << current_read[ENV] - << " w.MSG: " << cur_write[MSG] - << " w.ENV: " << cur_write[ENV] + << " w.MSG: " << current_write[MSG] + << " w.ENV: " << current_write[ENV] << " for " << ii << endl; if (current_read[MSG] > 20) exit(99); if (current_read[ENV] > 20) exit(99); @@ -452,6 +505,7 @@ bar // this child's output to the next child's input ... // except for the special kid, which reads both fd0 and fd1, // while writing nothing. + block_fd(blockme); rslt = pipe(datapipe); if (rslt < 0) { fprintf(stderr, "hi-q: could not create datapipe: "); @@ -473,10 +527,13 @@ bar } // figure out the intended bindings: + int pardang1(-1), pardang2(-1); // used by current kid, + // but dangling, from parent's point of view switch (filter[ii].mode) { case sa: case series: - cur_write[MSG] = datapipe[wEnd]; + pardang1 = current_write[MSG] = datapipe[wEnd]; + pardang2 = current_read[MSG]; next_read[MSG] = datapipe[rEnd]; break; case qq: @@ -484,11 +541,11 @@ bar cerr << "???? multiple qq jobs?" << endl; } slurp_write= datapipe[wEnd]; - current_read[ENV] = datapipe[rEnd]; + pardang1 = current_read[ENV] = datapipe[rEnd]; next_read[ENV] = -1; next_read[MSG] = -1; - cur_write[ENV] = -1; - cur_write[MSG] = -1; + current_write[ENV] = -1; + current_write[MSG] = -1; break; case postspam: case stub: @@ -561,7 +618,7 @@ bar case sa: case series: attach(current_read[MSG], 0, ii); - attach(cur_write[MSG], 1, ii); + attach(current_write[MSG], 1, ii); break; case stub: case postspam: @@ -612,8 +669,16 @@ bar } // these tricks are for kid: - close(cur_write[MSG]); - close(cur_write[ENV]); + close(current_write[MSG]); + close(current_write[ENV]); + close(current_read[ENV]); + if (0) cerr << "closing " << pardang1 + << " for parent of " << kidid << endl; + + close(pardang1); + if (0) cerr << "closing " << pardang2 + << " for parent of " << kidid << endl; + close(pardang2); // Let kid #0 run a little ways: if (ii==0) { @@ -770,10 +835,10 @@ bar // Now it is safe to transfer the envelope information: if (0) cerr << "about to slurp: " - << " cr.MSG: " << current_read[MSG] + << " cr.MSG: " << current_read[MSG] << " cr.ENV: " << current_read[ENV] - << " w.MSG: " << cur_write[MSG] - << " w.ENV: " << cur_write[ENV] + << " w.MSG: " << current_write[MSG] + << " w.ENV: " << current_write[ENV] << " slurp_read: " << slurp_read << " slurp_write: " << slurp_write << endl; diff --git a/tools/hi-test4.conf b/tools/hi-test4.conf index 850784e..c0ef589 100755 --- a/tools/hi-test4.conf +++ b/tools/hi-test4.conf @@ -5,7 +5,7 @@ #! /usr/local/bin/bash-c set -x ; /bin/echo "a b c " | 1</tmp/a TCPREMOTEHOST=asf TCPREMOTEIP=1.2.3.4 /var/qmail/bin/hi-q $0 ; echo $? series /bin/echo "a b c" -series /bin/cat +sa /bin/cat stub hi-test x0 -snooze 1 series /bin/cat qq hi-test -count |