diff options
author | John Denker <jsd@av8n.com> | 2012-11-23 11:51:38 -0800 |
---|---|---|
committer | John Denker <jsd@av8n.com> | 2012-11-23 11:51:38 -0800 |
commit | 4134b154839c91f44bb39547af9297f4b6a353d3 (patch) | |
tree | f8b7bee0f7596f3db63a4f44cb8a0f2dc2dfd5e9 /tools | |
parent | 2db619bf6bb01efdc01ce9b3729575ae21abcf5f (diff) |
pipette ... for playing around with pipes and other i/o stuff
Diffstat (limited to 'tools')
-rw-r--r-- | tools/makefile | 5 | ||||
-rw-r--r-- | tools/pipette.c | 981 | ||||
-rw-r--r-- | tools/utils.c | 19 | ||||
-rw-r--r-- | tools/utils.h | 3 |
4 files changed, 1007 insertions, 1 deletions
diff --git a/tools/makefile b/tools/makefile index 38d35ce..ec9c101 100644 --- a/tools/makefile +++ b/tools/makefile @@ -17,7 +17,7 @@ qmain = pido.c hi-q.c skrewt.c hi-test.c mail-scan.c \ qprogs = $(qmain:%.c=%) # sources for other main programs: -moremain = wripper.c bash-c.c ltgrey.c fixown.c +moremain = wripper.c bash-c.c ltgrey.c fixown.c pipette.c moreprogs = $(moremain:%.c=%) nonmain = libltgrey.c @@ -49,6 +49,9 @@ fixown2: fixown.o utils.o chmod o-rwx $@ ./fixown $@ +pipette: pipette.o utils.o + $(CC) $^ -o $@ + skrewt: skrewt.o utils.o sepofra.o $(CC) $^ -lboost_filesystem-mt -lboost_system -lspf2 -o $@ ./fixown $@ diff --git a/tools/pipette.c b/tools/pipette.c new file mode 100644 index 0000000..1eda2d0 --- /dev/null +++ b/tools/pipette.c @@ -0,0 +1,981 @@ +/////////////// +// lightweight connection from qmail to filters e.g. spamassassin +// (hi-q filter, get it?) + +// Hint: For testing, see also hi-test.conf which invokes ./hi-test: +// ./hi-q hi-test.conf + +// TODO: Exeunt stop should signal all children. +// TODO: Possibly: Wait for all kids in parallel? +// That's because they might finish out of order. + +#include <unistd.h> +#include <stdlib.h> /* for exit(), getenv() */ +#include <stdio.h> /* for perror */ +#include <errno.h> +#include <sys/types.h> /* for fork(), wait() */ +#include <sys/stat.h> +#include <sys/wait.h> + +using namespace std; +#include <iostream> +#include <fstream> +#include <sstream> +#include <string> +#include <list> +#include <vector> +#include <sstream> +#include <map> + +#include <sys/types.h> /* for fstat */ +#include <sys/stat.h> /* .. */ +#include <unistd.h> /* .. */ +#include <iomanip> /* for setw() */ +#include "utils.h" /* for strError() */ + +// error exit codes, mostly as stated in qmail.c +#define bar \ +foo(good, 0) ;\ +foo(spam, 21) ;\ +foo(penaltybox, 22) ;\ +foo(permerr, 31) ;\ +foo(greylisting, 70) ;\ +foo(syserr, 71) ;\ +foo(comerr, 74) ; + +#define foo(name, num) const int ex_ ## name = num +bar +#undef foo + +map<int,string> codemap; + + +#define bar_sa \ +foo_sa(GOOD, 0, "ham") ;\ +foo_sa(SPAM, 1, "spam") ;\ +foo_sa(USAGE, 64, "command line usage error") ;\ +foo_sa(DATAERR, 65, "data format error") ;\ +foo_sa(NOINPUT, 66, "cannot open input") ;\ +foo_sa(NOUSER, 67, "addressee unknown") ;\ +foo_sa(NOHOST, 68, "host name unknown") ;\ +foo_sa(UNAVAILABLE, 69, "service unavailable") ;\ +foo_sa(SOFTWARE, 70, "internal software error") ;\ +foo_sa(OSERR, 71, "system error (e.g., can't fork)") ;\ +foo_sa(OSFILE, 72, "critical OS file missing") ;\ +foo_sa(CANTCREAT, 73, "can't create (user) output file") ;\ +foo_sa(IOERR, 74, "input/output error") ;\ +foo_sa(TEMPFAIL, 75, "temp failure; user is invited to retry") ;\ +foo_sa(PROTOCOL, 76, "remote error in protocol") ;\ +foo_sa(NOPERM, 77, "permission denied") ;\ +foo_sa(CONFIG, 78, "configuration error") ;\ +foo_sa(TOOBIG, 98, "message was too big to process (see --max-size)" + +string progname; +pid_t mypid; +string progid; + +extern char** environ; +const int rEnd(0); // end of a pipe for reading +const int wEnd(1); // end of a pipe for writing + +typedef enum {MSG, ENV} channeler; + +#define bufsize 16384 + +// meanings: +// sa is a filter, using not-very-expressive exit codes: 0=ham 1=spam. +// stub is not a filter; no stdin or stdout; just looks at environment. +// series is a filter. +// qq is not a filter, just an absorber. +// +// Note that series and stub use the same exit codes as qq. +// + +/* +Notation for future use: + 0< &kb 1> &scr 2> &scr prog1 # stand-alone + 0< &kb 1> &redpipe 2> &scr prog2 # upstream end of pipe + 0< &redpipe 1> &scr 2> &scr prog3 # downstream end of pipe + + 0< &msg 1< &envelope 2> &log qmail-queue + + 0< &kb 1> &scr 2> &scr 7> &up 8< &down parent + 0< &up 1> &down 2> &scr childprocess + +Simple case: + 0< &msg 1> &msg2 2> &log skrewt + 0< &msg2 1> &msg3 2> &log spamc + 0< &msg3 1< &envelope 2> &log qmail-queue + +Fancier "triangular piping" case: + 0< &msg 1< &env 2> &log 7> &msg2 8> &env2 skrewt + 0< &msg2 1> &msg3 2> &log spamc + 0< &msg3 1< &env2 2> &log qmail-queue + +Note that units 7 and 8 are arbitrary and could be renumbered. In +contrast, many of the other are fixed by standards and/or traditions. + +Some questions: + +How hard is it to detect a /dry/ pipe segment, i.e. one where all the +writing-ends have been closed, and all the bytes have been read? This +means the reading ends can be closed, freeing up FD units. + +Similarly, now hard is it to detect a /broken/ pipe segment, i.e. one +where all the reading ends have been closed? This means the writing +ends can be closed, freeing up FD units. + +*/ + +typedef enum {series, stub, sa, qq, postspam, fail} moder; + +class jobber{ +public: + moder mode; + vector<string> cmd; + + jobber(const moder _mode, const vector<string> _cmd) + : mode(_mode), cmd(_cmd) + {} + + jobber(const string _mode, const vector<string> _cmd) + : mode(fail), cmd(_cmd){ + setmode(_mode); + } + + jobber() + : mode(fail), cmd(0) + {} + + void setmode(const string _mode) { + if (0) {} + else if (_mode == "sa") mode = sa; + else if (_mode == "stub") mode = stub; + else if (_mode == "series") mode = series; + else if (_mode == "qq") mode = qq; + else if (_mode == "postspam") mode = postspam; + else { + cerr << "jobber: bad mode: " << _mode << endl; + mode = fail; + } + } +}; + +// klugey global variable: +vector<jobber> post; + +// We are fussy about the argument types because we want +// this to compile cleanly under g++ as well as gcc, +// and each is strict about different things, such that +// one or the other will complain unless everything is +// done just right. + +// This is the way execve really behaves: +// the characters are held constant +// and the (char*) pointers are held constant: +int Execve(char const * fn, + char const * const * argv, + char const * const * env) { +// coerce the arg types to match the unwise declaration in unistd.h : + return execve(fn, (char*const*) argv, (char*const*) env); +} + +int fork_and_wait(const jobber job){ + pid_t kidpid = fork(); + if (kidpid == -1) { + cerr << progid << " fork failed : "; + perror(0); + exit(ex_syserr); + } + int ntok = job.cmd.size(); + const char* prog[1+ntok]; + for (int jj = 0; jj < ntok; jj++){ + prog[jj] = job.cmd[jj].c_str(); + } + prog[ntok] = 0; + + if (!kidpid){ + /*** child code ***/ + Execve(prog[0], prog, environ); + cerr << progid << " failed to exec '" + << prog[0] << "' : " << endl; + perror(0); + exit(ex_syserr); + } else { + /*** parent code ***/ + int kidstatus; + pid_t somekid; + somekid = waitpid(kidpid, &kidstatus, WUNTRACED); + if (somekid < 0) { + cerr << progid << " ??? waitpid failed : "; + perror(0); + return(ex_syserr); + } + if (WIFEXITED(kidstatus)) { + int sts = WEXITSTATUS(kidstatus); + if (sts != ex_good && sts != ex_spam) { + cerr << "hi-q: job " << prog[0] + << " unexpectedly returns status: " << sts + << endl; + exit(sts); + } + return 0; + } else if (WIFSIGNALED(kidstatus)) { + int sig = WTERMSIG(kidstatus); + if (sig == SIGUSR1) {/* normal, no logging required */} + else cerr << progid << " job " << prog[0] + << " killed by signal " << sig << endl; + return(ex_syserr); + } else { + /* paused, not dead */ + } + } + return 0; +} + +int fork_and_wait(vector<jobber> post){ + for(vector<jobber>::const_iterator foo = post.begin(); + foo != post.end(); foo++) { + int rslt = fork_and_wait(*foo); + if (rslt) return rslt; + } + return 0; +} + +void exeunt(const int sts) { + // FIXME: stop other children, maybe? + //xxxx cerr << progid << " exeunt called with " << sts << endl; + if (sts == ex_spam) fork_and_wait(post); + if (sts == ex_penaltybox) exit(ex_spam); + exit(sts); +} + +void slurp(const int inch, const int ouch){ + char buf[bufsize]; + ssize_t todo; + for (;;) { + ssize_t got = read(inch, buf, bufsize); + //xx cerr << "slurp: read returns " << got << endl; + if (got == 0) { // EoF + break; + } + if (got < 0) { + cerr << progid + << " slurp: input error on fd " << inch + << " : "; + perror(0); + exeunt(ex_comerr); + } + + todo = got; + while (todo) { + ssize_t sent = write(ouch, buf, todo); + //xx cerr << "slurp: write returns " << sent << endl; + if (sent < 0 && errno != EINTR) { + cerr << progid + << " slurp: output rror on fd " << ouch + << " : "; + perror(0); + exeunt(ex_comerr); + } + todo -= sent; + } + } +} + + +void probe_fd(){ + int ii; + struct stat buf; + for (ii = 0; ii < 16; ii++) { + int rslt = fstat(ii, &buf); + fprintf(stderr, "fd %2d status %2d", ii, rslt); + if (rslt==0) + fprintf(stderr, " : %d", (int)buf.st_dev); + fprintf(stderr, "\n"); + } + fprintf(stderr, "============\n"); +} + + +void blurb(const int ii, const pid_t* kidpid) { + int kidstatus; + /*pid_t somekid = */ waitpid(kidpid[ii], &kidstatus, WUNTRACED); + if (WIFEXITED(kidstatus)) + fprintf(stderr, "kid #%d (%d) exited with status %d\n", + ii, kidpid[ii], WEXITSTATUS(kidstatus)); + if (WIFSIGNALED(kidstatus)) + fprintf(stderr, "kid #%d (%d) killed by signal %d\n", + ii, kidpid[ii], WTERMSIG(kidstatus)); + +} + +void usage() { + cerr << "Usage:\n" +" hi-q filter.conf\n" +"or\n" +" HI_Q_CONF=filter.conf hi-q\n"; +} + +//////////////////////////////////////// +// we have data coming in on fd 0. +// and envelope / control information coming in on fd 1. + +void dump(const string var){ + char* str = getenv(var.c_str()); + cerr << progid << var; + if (str) cerr << " is set to '" << str << "'" << endl; + else cerr << " is not set." << endl; +} + +int xclose(int arg){ + cerr << "closing " << arg << endl; + return close(arg); +} + +typedef list<int> LI; +void block_fd(const LI todo){ + int blocker(-1); + int inplace(0); + + for (LI::const_iterator ptr = todo.begin(); + ptr != todo.end(); ptr++) { + int fd = *ptr; + struct stat statbuf; + int rslt = fstat(fd, &statbuf); + if (rslt) { + if (0) { + cerr << "**** definitely needed to block_fd unit " << fd << " : "; + perror(0); + } + if (blocker < 0) { + int blockex[2]; + pipe(blockex); + close(blockex[rEnd]); + blocker = blockex[wEnd]; + } + if (blocker != fd){ + dup2(blocker, fd); + close(blocker); + } else { + inplace++; + } + } else { + if (0) cerr << "unit " << fd << " already blocked" << endl; + } + } + if (!inplace) close(blocker); +} + +void attach(const int pipe_end, const int fd, const int kidno){ + if (0) cerr << "attaching current pipe_end " << pipe_end + << " to " << fd + << " for " << kidno << endl; + if (pipe_end != fd) { + int rslt = dup2(pipe_end, fd); + if (rslt < 0) { + cerr << progid << " dup2(" << pipe_end + << "," << fd << ")" + " failed for kid " << kidno << " : "; + perror(0); + exit(ex_syserr); + } + close(pipe_end); + } + +} + +void dumpstat(const int fd) { + cout << setw(4) << fd; + struct stat statbuf; + int rslt = fstat(fd, &statbuf); + if (rslt) { + cerr << " " << progid + << " fstat() failed: " << strError() << endl; + return; + } +#if 0 + dev_t st_dev; /* ID of device containing file */ + ino_t st_ino; /* inode number */ + mode_t st_mode; /* protection */ + nlink_t st_nlink; /* number of hard links */ + uid_t st_uid; /* user ID of owner */ + gid_t st_gid; /* group ID of owner */ + dev_t st_rdev; /* device ID (if special file) */ + off_t st_size; /* total size, in bytes */ + blksize_t st_blksize; /* blocksize for file system I/O */ + blkcnt_t st_blocks; /* number of 512B blocks allocated */ + time_t st_atime; /* time of last access */ + time_t st_mtime; /* time of last modification */ + time_t st_ctime; /* time of last status change */ +#endif + cout << setw(4) << statbuf.st_dev + << setw(12) << statbuf.st_ino + << setw(8) << setbase(8) << statbuf.st_mode + << setbase(10) + << setw(4) << statbuf.st_nlink + << setw(8) << statbuf.st_uid + << setw(8) << statbuf.st_gid + << setw(4) << statbuf.st_rdev + << setw(4) << statbuf.st_size + << setw(8) << statbuf.st_blksize + << setw(4) << statbuf.st_blocks + << setw(12) << statbuf.st_atime + << setw(12) << statbuf.st_mtime + << setw(12) << statbuf.st_ctime + ;;; + cout << endl; +} + +void play() { + int tube[2]; + int rslt = pipe(tube); + if (rslt) { + cerr << progid << " pipe() failed"; + perror(0); + exit(ex_syserr); + } + dumpstat(tube[rEnd]); + dumpstat(tube[wEnd]); + sleep(1.5); + close(tube[rEnd]); + dumpstat(tube[rEnd]); + dumpstat(tube[wEnd]); +} + +int main(int argc, char** argv) { + { + progname = *argv; + mypid = getpid(); + stringstream binder; + binder << basename(progname) << "[" << mypid << "]"; + progid = binder.str(); + } + + play(); + exit(0); + +#define foo(name, num) codemap[num] = #name ; +bar +#undef foo + + int verbose(0); + int kidstatus; + + int rslt; + + typedef vector<string> VS; + vector<jobber> filter; + string conf_var = "HI_Q_CONF"; + char* auth = getenv("QMAIL_AUTHORIZED"); + if (auth && *auth) conf_var = "HI_Q_AUCONF"; + + char* conf_name; + if (argc == 1) { + conf_name = getenv(conf_var.c_str()); + if (!conf_name) { + usage(); + exit(1); + } + } + + if (argc >= 2) { + conf_name = argv[1]; + } + + if (argc >= 3) { + if (auth && *auth) conf_name = argv[2]; + } + + if (argc > 3) { + usage(); + exit(1); + } + + ifstream conf; + conf.open(conf_name); + if (! conf.good()) { + cerr << progid << " could not open filter.conf file '" + << conf_name << "'" << endl; + exit(1); + } + for (;;) { + string line; + if (!getline(conf, line).good()) break; + istringstream parse(line); + jobber job; + while (parse.good()){ + string token; + parse >> token; + if (parse.fail()) break; + if (token[0] == '#') break; + job.cmd.push_back(token); + } + if (job.cmd.size()) { + job.setmode(job.cmd.front()); + job.cmd.erase(job.cmd.begin()); + } + // here with a properly built job descriptor + if (job.cmd.size()) { + if (job.mode == postspam) { + post.push_back(job); + } else { + filter.push_back(job); + } + } + } + unsigned int nkids = filter.size(); + +// Check for nothing to do. +// This is important, because the "last kid" is a special case. +// This makes it safe to assume that nkids-1 is non-negative. + if (nkids == 0) exit(0); // nothing to do + + if (verbose) for (unsigned int ii = 0; ii < nkids; ii++) { + cerr << progid << " filter[" << ii << "] :; "; + for (VS::const_iterator token = filter[ii].cmd.begin(); + token != filter[ii].cmd.end(); token++){ + cerr << *token << " "; + } + cerr << endl; + } + + vector<pid_t> kidpid(nkids); // indexed by kid number + + int sync[2]; + int resync[2]; + if (pipe(sync) != 0) cerr << "sync pipe failed" << endl; + if (pipe(resync) != 0) cerr << "resync pipe failed" << endl; + +// At this point, there are some loop invariants; +// (a) fd0 is open (standard input) and has the email msg, +// ready for the next child to read, and +// (b) fd1 is open (nonstandard input) and has envelope information. +// We need it to be open, so that pipe() +// doesn't choose it. That allows N-1 of the kids +// to close it and dup() something useful onto it. + + map<int,int> iiofpid; + map<channeler,int> next_read; + next_read[MSG] = 0; // our original stdin + next_read[ENV] = -1; // no kid is (yet) empowered to read envelope info + int slurp_read(1); // our original non-standard input + int slurp_write = -1; // effectively next_write[ENV]; + map<channeler,int> current_read; + map<channeler,int> current_write; // current kid writes here + current_write[MSG] = -1; + current_write[ENV] = -1; + list<int> blockme; + blockme.push_back(0); + blockme.push_back(1); + +// important loop to start all kids + for (unsigned int ii=0; ii < nkids; ii++){ /* loop starting all kids */ + string kidid; + { + stringstream foo; + foo << ii + << " mode " << filter[ii].mode + << " " << filter[ii].cmd[0]; + kidid = foo.str(); + } + current_read = next_read; + + if (verbose) cerr << "top of loop: " + << " cr.MSG: " << current_read[MSG] + << " cr.ENV: " << current_read[ENV] + << " w.MSG: " << current_write[MSG] + << " w.ENV: " << current_write[ENV] + << " for " << ii << endl; + if (current_read[MSG] > 20) exit(99); + if (current_read[ENV] > 20) exit(99); + + int datapipe[2]; + + switch (filter[ii].mode) { + case series: + case qq: + case sa: + +// Create a new pipe. +// Pipe must be created here (in the parent). +// The intended bindings must be figured out shortly below. +// Some of the bindings must be hooked up later (in the child), +// while others are used by the parent (e.g. envelope slurp). +// This pipe will be used (by the children) to connect +// this child's output to the next child's input ... +// except for the special kid, which reads both fd0 and fd1, +// while writing nothing. + block_fd(blockme); + rslt = pipe(datapipe); + if (rslt < 0) { + cerr << progid << " could not create datapipe : "; + perror(0); + exeunt(ex_syserr); + } + if (0) cerr << "new pipe" + << " reading: " << datapipe[rEnd] + << " writing: " << datapipe[wEnd] + << endl; + break; + case postspam: + case stub: + // do not need to create a pipe + break; + case fail: + cerr << "should never happen: invalid filter" << endl; + exeunt(ex_syserr); + } + +// figure out the intended bindings: + list<int> pardang; + switch (filter[ii].mode) { + case sa: + case series: + current_write[MSG] = datapipe[wEnd]; + pardang.push_back(current_write[MSG]); + pardang.push_back(current_read[MSG]); + next_read[MSG] = datapipe[rEnd]; + break; + case qq: + if (slurp_write >= 0){ + cerr << "???? multiple qq jobs?" << endl; + } + slurp_write= datapipe[wEnd]; + current_read[ENV] = datapipe[rEnd]; + pardang.push_back(current_read[ENV]); + next_read[ENV] = -1; + next_read[MSG] = -1; + current_write[ENV] = -1; + current_write[MSG] = -1; + break; + case postspam: + case stub: + // no pipe even got created. + break; + case fail: + cerr << "should never happen:: invalid filter" << endl; + exeunt(ex_syserr); + } + + kidpid[ii] = fork(); + if (kidpid[ii] == -1) { + cerr << progid << " fork failed : "; + perror(0); + exit(ex_syserr); + } + iiofpid[kidpid[ii]] = ii; + if (!kidpid[ii]) { /*** child code ***/ + if (verbose) cerr << "top of kid ... loose end " << current_read[MSG] << " for " << ii << endl; + + pid_t kidgroup(0); // process group for all kids is + // equal to pid of kid#0 + if (ii) kidgroup = kidpid[0]; + if (setpgid(0, kidgroup) != 0) { + cerr << "*** kid " << ii + << " setpgid failed! " << errno << " ... "; + perror(0); + } else { + // cerr << "*** kid " << ii << " setpgid OK" << endl; + } + +// ... everybody else has to wait for us to get this far ... +// ... so that the new process group will be valid ... +// Write-a-byte synchronization is released when the *first* guy writes. + if (ii == 0) { + int junk(1); + write(sync[wEnd], &junk, 1); + //cerr << "sync sent" << endl; + } +#if 0 + cerr << "kid [" << ii << "] " << getpid() + << " kidpid[0]: " << kidpid[0] + << " pgid: " << getpgid(0) + << " starts" << endl; +#endif + + close(resync[wEnd]); // send resync + //xx cerr << "after sending resync " << ii << endl; + +// ... now we must wait for everybody else, because ... +// ... if we do the exec(), the new process group becomes invalid ... +// Close synchronization is released when the *last* guy closes. + if (ii==0) { + int junk; + //cerr << "about to read resync" << endl; + ssize_t rslt = read(resync[rEnd], &junk, 1); + if (rslt < 0 ) { + cerr << "bad sync ... " << rslt << endl; + // FIXME (maybe?) should this be fatal? + } else { + // cerr << "back from read resync, good: " << rslt << endl; + } + } + + switch (filter[ii].mode){ + case qq: + attach(current_read[MSG], 0, ii); + attach(current_read[ENV], 1, ii); + break; + case sa: + case series: + attach(current_read[MSG], 0, ii); + attach(current_write[MSG], 1, ii); + break; + case stub: + case postspam: + // nothing to hook up; no pipe was even created. + break; + case fail: + cerr << "should never happen: invalid filter" << endl; + exeunt(ex_syserr); + break; + } + +// in all modes: +// close envelope channel in kid space +// (leaving it open in parent space) + close(current_read[ENV]); + close(slurp_write); + +//// probe_fd(); + + int ntok = filter[ii].cmd.size(); + const char* prog[1+ntok]; + for (int jj = 0; jj < ntok; jj++){ + prog[jj] = filter[ii].cmd[jj].c_str(); + } + prog[ntok] = 0; + close(resync[rEnd]); + close(sync[rEnd]); + close(sync[wEnd]); + + stringstream convert; + convert << getpgid(0); + const string grouper("HI_Q_GROUP=" + convert.str()); + if (putenv((char*)grouper.c_str()) != 0) { + cerr << "putenv failed" << endl; + exit(1); + } + rslt = Execve(prog[0], prog, environ); + cerr << progid << " failed to exec '" + << prog[0] << "' : "; + perror(0); + exit(ex_syserr); + } + + /*** parent code ***/ + if (kidpid[ii] < 0) { + cerr << " failure to fork kid#" << ii << " : "; + perror(0); + exeunt(ex_syserr); + } + +// these tricks are for kid: + close(current_write[MSG]); + close(current_write[ENV]); + close(current_read[ENV]); + for (LI::const_iterator ptr = pardang.begin(); + ptr != pardang.end(); ptr++) { + if (0) cerr << "closing " << *ptr + << " for parent of " << kidid << endl; + + close(*ptr); + } + +// Let kid #0 run a little ways: + if (ii==0) { + int junk; + //cerr << "about to read sync" << endl; + ssize_t rslt = read(sync[rEnd], &junk, 1); + if (rslt != 1) { + cerr << "bad sync ... 1 != " << rslt << endl; + } else { + //cerr << "back from read sync, good: " << rslt << endl; + } + } + +#if 0 + cerr << "apparent kid #" << ii + << " (" << kidpid[ii] << ") " + << endl; +#endif + } /* end loop starting all kids */ + +// here with the whole pipeline of kids launched +// parent program continues + + close(resync[wEnd]); // important, so that block gets released + close(resync[rEnd]); // less important, just housecleaning + + close(sync[wEnd]); // more housecleaning + close(sync[rEnd]); + + close(0); // Housecleaning: the reading end of stdin was + // delegated to the first child, + // so we don't need it. + + if (verbose) for (unsigned int ii = 0; ii < nkids; ii++) { + cerr << progid << " filter[" << ii << "] " + << kidpid[ii] + << " :; "; + for (VS::const_iterator token = filter[ii].cmd.begin(); + token != filter[ii].cmd.end(); token++){ + cerr << *token << " "; + } + cerr << endl; + } + + pid_t special_pid = kidpid[nkids-1]; + int alive(nkids-1); // not counting the special kid + int best_blame(0); // best reason, even if not a great reason + pid_t argbest_blame(-1); // kid# associated with best blame + + for (;;) { + if (alive == 0) break; + pid_t somekid = waitpid(-1, &kidstatus, WUNTRACED); + if (somekid == special_pid){ + // do not decrement the "alive" counter + // since that only applies to non-special kids + if (WIFEXITED(kidstatus)) { + cerr << progid << " special kid exited early, status " + << WEXITSTATUS(kidstatus) + << " with " << alive << " kids still alive" + << endl; + return(ex_syserr); + } else if (WIFSIGNALED(kidstatus)) { + int sig = WTERMSIG(kidstatus); + if (sig == SIGUSR1) {/* normal, no logging required */} + else { + cerr << progid << " special kid killed by signal " + << sig << endl; + // this is not normal + return(ex_syserr); + } + } else { + /* paused, not dead */ + } + continue; + } +// here if somekid is not the special kid + if (WIFEXITED(kidstatus)) { + alive--; + int sts = WEXITSTATUS(kidstatus); +#ifndef PENALIZE_SPAMMERS + // ignore penalties for the moment + // to see whether there are any false positives + if (sts == ex_penaltybox) sts = ex_good; +#endif + if (sts) { + argbest_blame = somekid; + best_blame = kidstatus; + break; + } + } else if (WIFSIGNALED(kidstatus)) { + alive--; + argbest_blame = somekid; + best_blame = kidstatus; + if (WTERMSIG(kidstatus) != SIGUSR1) break; + } else { + /* kid is paused, not dead */ + /* not a problem */ + } + } +// here if all kids have exited normally +// *or* if there is a great reason for quitting early + +/////////////////// +// decode the best reason why the filter-chain terminated + //xx cerr << "cleanup: " << best_blame << endl; + if (best_blame) { + string short_name(""); + int kidno(iiofpid[argbest_blame]); + if (WIFEXITED(best_blame)) { + string exword = "???"; // default, should never happen + int excode = ex_syserr; // default, should never happen + int sts = WEXITSTATUS(best_blame); + if (sts == 0){ + // should never get here + // should be no accounting for blame if there was no blame + cerr << progid << " should never happen: no child to blame" << endl; + exeunt(ex_syserr); + } + + if (filter[kidno].mode != sa) { + exword = codemap[sts]; + excode = sts; + } else { // here to translate spamc results + if (sts == 1) { + excode = ex_spam; + exword = "spam"; + } else { + excode = ex_syserr; + stringstream foo; + foo << "bad status: " << sts; + exword = foo.str(); + } + } + cerr << progid + << " concludes: kid[" << kidno << "]" + << " i.e. " << basename(filter[kidno].cmd[0]) + << "[" << argbest_blame << "]" + << " reports " << exword << endl; + exeunt(excode); + } else if (WIFSIGNALED(best_blame)) { + int sig = WTERMSIG(best_blame); + cerr << progid + << " concludes: kid[" << kidno << "]" + << " i.e. " << basename(filter[kidno].cmd[0]) + << "[" << argbest_blame << "]" + << " was killed by signal " << sig + << endl; + // if the *best* blame is a kill, that's not normal + exeunt(ex_syserr); + } + } + +// Here if all filters agree this is not spam. +// Now it is safe to transfer the envelope information: + + if (0) cerr << "about to slurp: " + << " cr.MSG: " << current_read[MSG] + << " cr.ENV: " << current_read[ENV] + << " w.MSG: " << current_write[MSG] + << " w.ENV: " << current_write[ENV] + << " slurp_read: " << slurp_read + << " slurp_write: " << slurp_write + << endl; + + slurp(slurp_read, slurp_write); + close(slurp_write); + close(slurp_read); + +// now that the envelope information has been transfered, +// wait for the last kid in the usual way + + for(;;) { + waitpid(special_pid, &kidstatus, WUNTRACED); + if (WIFEXITED(kidstatus)) { + int sts = WEXITSTATUS(kidstatus); + cerr << progid + << " says: qq program" + << " i.e. " << basename(filter[nkids-1].cmd[0]) + << "[" << kidpid[nkids-1] << "]" + << " returned status " << sts + << endl; + return sts; + } else if (WIFSIGNALED(kidstatus)) { + cerr << progid + << " says: qq program" + << " i.e. " << basename(filter[nkids-1].cmd[0]) + << "[" << kidpid[nkids-1] << "]" + << " was killed by signal " << WTERMSIG(kidstatus) + << endl; + return ex_syserr; + } else { + /* paused, not dead */ + } + } /* loop until all kids accounted for */ + // should never get here; + // exit from within loop is the only way out +} diff --git a/tools/utils.c b/tools/utils.c index 691070e..3895215 100644 --- a/tools/utils.c +++ b/tools/utils.c @@ -2,6 +2,7 @@ #include <sstream> #include <iomanip> #include <stdlib.h> /* for abs() */ +#include <string.h> /* for strerror_r() */ ///// <cmath> would not be an improvement ///// due to lack of interger abs() ///// and ambiguous (and inefficient) promotion @@ -124,3 +125,21 @@ string join(const string sep, const list<string> stuff){ } return rslt; } + +string strError(const int errnum){ + char buf[1000]; + char* rslt = strerror_r(errnum, buf, sizeof(buf)); + return rslt; +#ifdef XSI_not_gnu + if (rslt) { + cerr << "strerror_r() failed: " << rslt << " " + perror(0); + exit(1); + } +#endif +} + +extern int errno; +string strError(){ + return strError(errno); +} diff --git a/tools/utils.h b/tools/utils.h index 0ef0fca..ea6d9ba 100644 --- a/tools/utils.h +++ b/tools/utils.h @@ -88,3 +88,6 @@ public: return current_arg == longer.substr(0, current_arg.length()); } }; + +std::string strError(const int); +std::string strError(); |