summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Denker <jsd@av8n.com>2012-11-23 11:51:38 -0800
committerJohn Denker <jsd@av8n.com>2012-11-23 11:51:38 -0800
commit4134b154839c91f44bb39547af9297f4b6a353d3 (patch)
treef8b7bee0f7596f3db63a4f44cb8a0f2dc2dfd5e9
parent2db619bf6bb01efdc01ce9b3729575ae21abcf5f (diff)
pipette ... for playing around with pipes and other i/o stuff
-rw-r--r--tools/makefile5
-rw-r--r--tools/pipette.c981
-rw-r--r--tools/utils.c19
-rw-r--r--tools/utils.h3
4 files changed, 1007 insertions, 1 deletions
diff --git a/tools/makefile b/tools/makefile
index 38d35ce..ec9c101 100644
--- a/tools/makefile
+++ b/tools/makefile
@@ -17,7 +17,7 @@ qmain = pido.c hi-q.c skrewt.c hi-test.c mail-scan.c \
qprogs = $(qmain:%.c=%)
# sources for other main programs:
-moremain = wripper.c bash-c.c ltgrey.c fixown.c
+moremain = wripper.c bash-c.c ltgrey.c fixown.c pipette.c
moreprogs = $(moremain:%.c=%)
nonmain = libltgrey.c
@@ -49,6 +49,9 @@ fixown2: fixown.o utils.o
chmod o-rwx $@
./fixown $@
+pipette: pipette.o utils.o
+ $(CC) $^ -o $@
+
skrewt: skrewt.o utils.o sepofra.o
$(CC) $^ -lboost_filesystem-mt -lboost_system -lspf2 -o $@
./fixown $@
diff --git a/tools/pipette.c b/tools/pipette.c
new file mode 100644
index 0000000..1eda2d0
--- /dev/null
+++ b/tools/pipette.c
@@ -0,0 +1,981 @@
+///////////////
+// lightweight connection from qmail to filters e.g. spamassassin
+// (hi-q filter, get it?)
+
+// Hint: For testing, see also hi-test.conf which invokes ./hi-test:
+// ./hi-q hi-test.conf
+
+// TODO: Exeunt stop should signal all children.
+// TODO: Possibly: Wait for all kids in parallel?
+// That's because they might finish out of order.
+
+#include <unistd.h>
+#include <stdlib.h> /* for exit(), getenv() */
+#include <stdio.h> /* for perror */
+#include <errno.h>
+#include <sys/types.h> /* for fork(), wait() */
+#include <sys/stat.h>
+#include <sys/wait.h>
+
+using namespace std;
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include <string>
+#include <list>
+#include <vector>
+#include <sstream>
+#include <map>
+
+#include <sys/types.h> /* for fstat */
+#include <sys/stat.h> /* .. */
+#include <unistd.h> /* .. */
+#include <iomanip> /* for setw() */
+#include "utils.h" /* for strError() */
+
+// error exit codes, mostly as stated in qmail.c
+#define bar \
+foo(good, 0) ;\
+foo(spam, 21) ;\
+foo(penaltybox, 22) ;\
+foo(permerr, 31) ;\
+foo(greylisting, 70) ;\
+foo(syserr, 71) ;\
+foo(comerr, 74) ;
+
+#define foo(name, num) const int ex_ ## name = num
+bar
+#undef foo
+
+map<int,string> codemap;
+
+
+#define bar_sa \
+foo_sa(GOOD, 0, "ham") ;\
+foo_sa(SPAM, 1, "spam") ;\
+foo_sa(USAGE, 64, "command line usage error") ;\
+foo_sa(DATAERR, 65, "data format error") ;\
+foo_sa(NOINPUT, 66, "cannot open input") ;\
+foo_sa(NOUSER, 67, "addressee unknown") ;\
+foo_sa(NOHOST, 68, "host name unknown") ;\
+foo_sa(UNAVAILABLE, 69, "service unavailable") ;\
+foo_sa(SOFTWARE, 70, "internal software error") ;\
+foo_sa(OSERR, 71, "system error (e.g., can't fork)") ;\
+foo_sa(OSFILE, 72, "critical OS file missing") ;\
+foo_sa(CANTCREAT, 73, "can't create (user) output file") ;\
+foo_sa(IOERR, 74, "input/output error") ;\
+foo_sa(TEMPFAIL, 75, "temp failure; user is invited to retry") ;\
+foo_sa(PROTOCOL, 76, "remote error in protocol") ;\
+foo_sa(NOPERM, 77, "permission denied") ;\
+foo_sa(CONFIG, 78, "configuration error") ;\
+foo_sa(TOOBIG, 98, "message was too big to process (see --max-size)"
+
+string progname;
+pid_t mypid;
+string progid;
+
+extern char** environ;
+const int rEnd(0); // end of a pipe for reading
+const int wEnd(1); // end of a pipe for writing
+
+typedef enum {MSG, ENV} channeler;
+
+#define bufsize 16384
+
+// meanings:
+// sa is a filter, using not-very-expressive exit codes: 0=ham 1=spam.
+// stub is not a filter; no stdin or stdout; just looks at environment.
+// series is a filter.
+// qq is not a filter, just an absorber.
+//
+// Note that series and stub use the same exit codes as qq.
+//
+
+/*
+Notation for future use:
+ 0< &kb 1> &scr 2> &scr prog1 # stand-alone
+ 0< &kb 1> &redpipe 2> &scr prog2 # upstream end of pipe
+ 0< &redpipe 1> &scr 2> &scr prog3 # downstream end of pipe
+
+ 0< &msg 1< &envelope 2> &log qmail-queue
+
+ 0< &kb 1> &scr 2> &scr 7> &up 8< &down parent
+ 0< &up 1> &down 2> &scr childprocess
+
+Simple case:
+ 0< &msg 1> &msg2 2> &log skrewt
+ 0< &msg2 1> &msg3 2> &log spamc
+ 0< &msg3 1< &envelope 2> &log qmail-queue
+
+Fancier "triangular piping" case:
+ 0< &msg 1< &env 2> &log 7> &msg2 8> &env2 skrewt
+ 0< &msg2 1> &msg3 2> &log spamc
+ 0< &msg3 1< &env2 2> &log qmail-queue
+
+Note that units 7 and 8 are arbitrary and could be renumbered. In
+contrast, many of the other are fixed by standards and/or traditions.
+
+Some questions:
+
+How hard is it to detect a /dry/ pipe segment, i.e. one where all the
+writing-ends have been closed, and all the bytes have been read? This
+means the reading ends can be closed, freeing up FD units.
+
+Similarly, now hard is it to detect a /broken/ pipe segment, i.e. one
+where all the reading ends have been closed? This means the writing
+ends can be closed, freeing up FD units.
+
+*/
+
+typedef enum {series, stub, sa, qq, postspam, fail} moder;
+
+class jobber{
+public:
+ moder mode;
+ vector<string> cmd;
+
+ jobber(const moder _mode, const vector<string> _cmd)
+ : mode(_mode), cmd(_cmd)
+ {}
+
+ jobber(const string _mode, const vector<string> _cmd)
+ : mode(fail), cmd(_cmd){
+ setmode(_mode);
+ }
+
+ jobber()
+ : mode(fail), cmd(0)
+ {}
+
+ void setmode(const string _mode) {
+ if (0) {}
+ else if (_mode == "sa") mode = sa;
+ else if (_mode == "stub") mode = stub;
+ else if (_mode == "series") mode = series;
+ else if (_mode == "qq") mode = qq;
+ else if (_mode == "postspam") mode = postspam;
+ else {
+ cerr << "jobber: bad mode: " << _mode << endl;
+ mode = fail;
+ }
+ }
+};
+
+// klugey global variable:
+vector<jobber> post;
+
+// We are fussy about the argument types because we want
+// this to compile cleanly under g++ as well as gcc,
+// and each is strict about different things, such that
+// one or the other will complain unless everything is
+// done just right.
+
+// This is the way execve really behaves:
+// the characters are held constant
+// and the (char*) pointers are held constant:
+int Execve(char const * fn,
+ char const * const * argv,
+ char const * const * env) {
+// coerce the arg types to match the unwise declaration in unistd.h :
+ return execve(fn, (char*const*) argv, (char*const*) env);
+}
+
+int fork_and_wait(const jobber job){
+ pid_t kidpid = fork();
+ if (kidpid == -1) {
+ cerr << progid << " fork failed : ";
+ perror(0);
+ exit(ex_syserr);
+ }
+ int ntok = job.cmd.size();
+ const char* prog[1+ntok];
+ for (int jj = 0; jj < ntok; jj++){
+ prog[jj] = job.cmd[jj].c_str();
+ }
+ prog[ntok] = 0;
+
+ if (!kidpid){
+ /*** child code ***/
+ Execve(prog[0], prog, environ);
+ cerr << progid << " failed to exec '"
+ << prog[0] << "' : " << endl;
+ perror(0);
+ exit(ex_syserr);
+ } else {
+ /*** parent code ***/
+ int kidstatus;
+ pid_t somekid;
+ somekid = waitpid(kidpid, &kidstatus, WUNTRACED);
+ if (somekid < 0) {
+ cerr << progid << " ??? waitpid failed : ";
+ perror(0);
+ return(ex_syserr);
+ }
+ if (WIFEXITED(kidstatus)) {
+ int sts = WEXITSTATUS(kidstatus);
+ if (sts != ex_good && sts != ex_spam) {
+ cerr << "hi-q: job " << prog[0]
+ << " unexpectedly returns status: " << sts
+ << endl;
+ exit(sts);
+ }
+ return 0;
+ } else if (WIFSIGNALED(kidstatus)) {
+ int sig = WTERMSIG(kidstatus);
+ if (sig == SIGUSR1) {/* normal, no logging required */}
+ else cerr << progid << " job " << prog[0]
+ << " killed by signal " << sig << endl;
+ return(ex_syserr);
+ } else {
+ /* paused, not dead */
+ }
+ }
+ return 0;
+}
+
+int fork_and_wait(vector<jobber> post){
+ for(vector<jobber>::const_iterator foo = post.begin();
+ foo != post.end(); foo++) {
+ int rslt = fork_and_wait(*foo);
+ if (rslt) return rslt;
+ }
+ return 0;
+}
+
+void exeunt(const int sts) {
+ // FIXME: stop other children, maybe?
+ //xxxx cerr << progid << " exeunt called with " << sts << endl;
+ if (sts == ex_spam) fork_and_wait(post);
+ if (sts == ex_penaltybox) exit(ex_spam);
+ exit(sts);
+}
+
+void slurp(const int inch, const int ouch){
+ char buf[bufsize];
+ ssize_t todo;
+ for (;;) {
+ ssize_t got = read(inch, buf, bufsize);
+ //xx cerr << "slurp: read returns " << got << endl;
+ if (got == 0) { // EoF
+ break;
+ }
+ if (got < 0) {
+ cerr << progid
+ << " slurp: input error on fd " << inch
+ << " : ";
+ perror(0);
+ exeunt(ex_comerr);
+ }
+
+ todo = got;
+ while (todo) {
+ ssize_t sent = write(ouch, buf, todo);
+ //xx cerr << "slurp: write returns " << sent << endl;
+ if (sent < 0 && errno != EINTR) {
+ cerr << progid
+ << " slurp: output rror on fd " << ouch
+ << " : ";
+ perror(0);
+ exeunt(ex_comerr);
+ }
+ todo -= sent;
+ }
+ }
+}
+
+
+void probe_fd(){
+ int ii;
+ struct stat buf;
+ for (ii = 0; ii < 16; ii++) {
+ int rslt = fstat(ii, &buf);
+ fprintf(stderr, "fd %2d status %2d", ii, rslt);
+ if (rslt==0)
+ fprintf(stderr, " : %d", (int)buf.st_dev);
+ fprintf(stderr, "\n");
+ }
+ fprintf(stderr, "============\n");
+}
+
+
+void blurb(const int ii, const pid_t* kidpid) {
+ int kidstatus;
+ /*pid_t somekid = */ waitpid(kidpid[ii], &kidstatus, WUNTRACED);
+ if (WIFEXITED(kidstatus))
+ fprintf(stderr, "kid #%d (%d) exited with status %d\n",
+ ii, kidpid[ii], WEXITSTATUS(kidstatus));
+ if (WIFSIGNALED(kidstatus))
+ fprintf(stderr, "kid #%d (%d) killed by signal %d\n",
+ ii, kidpid[ii], WTERMSIG(kidstatus));
+
+}
+
+void usage() {
+ cerr << "Usage:\n"
+" hi-q filter.conf\n"
+"or\n"
+" HI_Q_CONF=filter.conf hi-q\n";
+}
+
+////////////////////////////////////////
+// we have data coming in on fd 0.
+// and envelope / control information coming in on fd 1.
+
+void dump(const string var){
+ char* str = getenv(var.c_str());
+ cerr << progid << var;
+ if (str) cerr << " is set to '" << str << "'" << endl;
+ else cerr << " is not set." << endl;
+}
+
+int xclose(int arg){
+ cerr << "closing " << arg << endl;
+ return close(arg);
+}
+
+typedef list<int> LI;
+void block_fd(const LI todo){
+ int blocker(-1);
+ int inplace(0);
+
+ for (LI::const_iterator ptr = todo.begin();
+ ptr != todo.end(); ptr++) {
+ int fd = *ptr;
+ struct stat statbuf;
+ int rslt = fstat(fd, &statbuf);
+ if (rslt) {
+ if (0) {
+ cerr << "**** definitely needed to block_fd unit " << fd << " : ";
+ perror(0);
+ }
+ if (blocker < 0) {
+ int blockex[2];
+ pipe(blockex);
+ close(blockex[rEnd]);
+ blocker = blockex[wEnd];
+ }
+ if (blocker != fd){
+ dup2(blocker, fd);
+ close(blocker);
+ } else {
+ inplace++;
+ }
+ } else {
+ if (0) cerr << "unit " << fd << " already blocked" << endl;
+ }
+ }
+ if (!inplace) close(blocker);
+}
+
+void attach(const int pipe_end, const int fd, const int kidno){
+ if (0) cerr << "attaching current pipe_end " << pipe_end
+ << " to " << fd
+ << " for " << kidno << endl;
+ if (pipe_end != fd) {
+ int rslt = dup2(pipe_end, fd);
+ if (rslt < 0) {
+ cerr << progid << " dup2(" << pipe_end
+ << "," << fd << ")"
+ " failed for kid " << kidno << " : ";
+ perror(0);
+ exit(ex_syserr);
+ }
+ close(pipe_end);
+ }
+
+}
+
+void dumpstat(const int fd) {
+ cout << setw(4) << fd;
+ struct stat statbuf;
+ int rslt = fstat(fd, &statbuf);
+ if (rslt) {
+ cerr << " " << progid
+ << " fstat() failed: " << strError() << endl;
+ return;
+ }
+#if 0
+ dev_t st_dev; /* ID of device containing file */
+ ino_t st_ino; /* inode number */
+ mode_t st_mode; /* protection */
+ nlink_t st_nlink; /* number of hard links */
+ uid_t st_uid; /* user ID of owner */
+ gid_t st_gid; /* group ID of owner */
+ dev_t st_rdev; /* device ID (if special file) */
+ off_t st_size; /* total size, in bytes */
+ blksize_t st_blksize; /* blocksize for file system I/O */
+ blkcnt_t st_blocks; /* number of 512B blocks allocated */
+ time_t st_atime; /* time of last access */
+ time_t st_mtime; /* time of last modification */
+ time_t st_ctime; /* time of last status change */
+#endif
+ cout << setw(4) << statbuf.st_dev
+ << setw(12) << statbuf.st_ino
+ << setw(8) << setbase(8) << statbuf.st_mode
+ << setbase(10)
+ << setw(4) << statbuf.st_nlink
+ << setw(8) << statbuf.st_uid
+ << setw(8) << statbuf.st_gid
+ << setw(4) << statbuf.st_rdev
+ << setw(4) << statbuf.st_size
+ << setw(8) << statbuf.st_blksize
+ << setw(4) << statbuf.st_blocks
+ << setw(12) << statbuf.st_atime
+ << setw(12) << statbuf.st_mtime
+ << setw(12) << statbuf.st_ctime
+ ;;;
+ cout << endl;
+}
+
+void play() {
+ int tube[2];
+ int rslt = pipe(tube);
+ if (rslt) {
+ cerr << progid << " pipe() failed";
+ perror(0);
+ exit(ex_syserr);
+ }
+ dumpstat(tube[rEnd]);
+ dumpstat(tube[wEnd]);
+ sleep(1.5);
+ close(tube[rEnd]);
+ dumpstat(tube[rEnd]);
+ dumpstat(tube[wEnd]);
+}
+
+int main(int argc, char** argv) {
+ {
+ progname = *argv;
+ mypid = getpid();
+ stringstream binder;
+ binder << basename(progname) << "[" << mypid << "]";
+ progid = binder.str();
+ }
+
+ play();
+ exit(0);
+
+#define foo(name, num) codemap[num] = #name ;
+bar
+#undef foo
+
+ int verbose(0);
+ int kidstatus;
+
+ int rslt;
+
+ typedef vector<string> VS;
+ vector<jobber> filter;
+ string conf_var = "HI_Q_CONF";
+ char* auth = getenv("QMAIL_AUTHORIZED");
+ if (auth && *auth) conf_var = "HI_Q_AUCONF";
+
+ char* conf_name;
+ if (argc == 1) {
+ conf_name = getenv(conf_var.c_str());
+ if (!conf_name) {
+ usage();
+ exit(1);
+ }
+ }
+
+ if (argc >= 2) {
+ conf_name = argv[1];
+ }
+
+ if (argc >= 3) {
+ if (auth && *auth) conf_name = argv[2];
+ }
+
+ if (argc > 3) {
+ usage();
+ exit(1);
+ }
+
+ ifstream conf;
+ conf.open(conf_name);
+ if (! conf.good()) {
+ cerr << progid << " could not open filter.conf file '"
+ << conf_name << "'" << endl;
+ exit(1);
+ }
+ for (;;) {
+ string line;
+ if (!getline(conf, line).good()) break;
+ istringstream parse(line);
+ jobber job;
+ while (parse.good()){
+ string token;
+ parse >> token;
+ if (parse.fail()) break;
+ if (token[0] == '#') break;
+ job.cmd.push_back(token);
+ }
+ if (job.cmd.size()) {
+ job.setmode(job.cmd.front());
+ job.cmd.erase(job.cmd.begin());
+ }
+ // here with a properly built job descriptor
+ if (job.cmd.size()) {
+ if (job.mode == postspam) {
+ post.push_back(job);
+ } else {
+ filter.push_back(job);
+ }
+ }
+ }
+ unsigned int nkids = filter.size();
+
+// Check for nothing to do.
+// This is important, because the "last kid" is a special case.
+// This makes it safe to assume that nkids-1 is non-negative.
+ if (nkids == 0) exit(0); // nothing to do
+
+ if (verbose) for (unsigned int ii = 0; ii < nkids; ii++) {
+ cerr << progid << " filter[" << ii << "] :; ";
+ for (VS::const_iterator token = filter[ii].cmd.begin();
+ token != filter[ii].cmd.end(); token++){
+ cerr << *token << " ";
+ }
+ cerr << endl;
+ }
+
+ vector<pid_t> kidpid(nkids); // indexed by kid number
+
+ int sync[2];
+ int resync[2];
+ if (pipe(sync) != 0) cerr << "sync pipe failed" << endl;
+ if (pipe(resync) != 0) cerr << "resync pipe failed" << endl;
+
+// At this point, there are some loop invariants;
+// (a) fd0 is open (standard input) and has the email msg,
+// ready for the next child to read, and
+// (b) fd1 is open (nonstandard input) and has envelope information.
+// We need it to be open, so that pipe()
+// doesn't choose it. That allows N-1 of the kids
+// to close it and dup() something useful onto it.
+
+ map<int,int> iiofpid;
+ map<channeler,int> next_read;
+ next_read[MSG] = 0; // our original stdin
+ next_read[ENV] = -1; // no kid is (yet) empowered to read envelope info
+ int slurp_read(1); // our original non-standard input
+ int slurp_write = -1; // effectively next_write[ENV];
+ map<channeler,int> current_read;
+ map<channeler,int> current_write; // current kid writes here
+ current_write[MSG] = -1;
+ current_write[ENV] = -1;
+ list<int> blockme;
+ blockme.push_back(0);
+ blockme.push_back(1);
+
+// important loop to start all kids
+ for (unsigned int ii=0; ii < nkids; ii++){ /* loop starting all kids */
+ string kidid;
+ {
+ stringstream foo;
+ foo << ii
+ << " mode " << filter[ii].mode
+ << " " << filter[ii].cmd[0];
+ kidid = foo.str();
+ }
+ current_read = next_read;
+
+ if (verbose) cerr << "top of loop: "
+ << " cr.MSG: " << current_read[MSG]
+ << " cr.ENV: " << current_read[ENV]
+ << " w.MSG: " << current_write[MSG]
+ << " w.ENV: " << current_write[ENV]
+ << " for " << ii << endl;
+ if (current_read[MSG] > 20) exit(99);
+ if (current_read[ENV] > 20) exit(99);
+
+ int datapipe[2];
+
+ switch (filter[ii].mode) {
+ case series:
+ case qq:
+ case sa:
+
+// Create a new pipe.
+// Pipe must be created here (in the parent).
+// The intended bindings must be figured out shortly below.
+// Some of the bindings must be hooked up later (in the child),
+// while others are used by the parent (e.g. envelope slurp).
+// This pipe will be used (by the children) to connect
+// this child's output to the next child's input ...
+// except for the special kid, which reads both fd0 and fd1,
+// while writing nothing.
+ block_fd(blockme);
+ rslt = pipe(datapipe);
+ if (rslt < 0) {
+ cerr << progid << " could not create datapipe : ";
+ perror(0);
+ exeunt(ex_syserr);
+ }
+ if (0) cerr << "new pipe"
+ << " reading: " << datapipe[rEnd]
+ << " writing: " << datapipe[wEnd]
+ << endl;
+ break;
+ case postspam:
+ case stub:
+ // do not need to create a pipe
+ break;
+ case fail:
+ cerr << "should never happen: invalid filter" << endl;
+ exeunt(ex_syserr);
+ }
+
+// figure out the intended bindings:
+ list<int> pardang;
+ switch (filter[ii].mode) {
+ case sa:
+ case series:
+ current_write[MSG] = datapipe[wEnd];
+ pardang.push_back(current_write[MSG]);
+ pardang.push_back(current_read[MSG]);
+ next_read[MSG] = datapipe[rEnd];
+ break;
+ case qq:
+ if (slurp_write >= 0){
+ cerr << "???? multiple qq jobs?" << endl;
+ }
+ slurp_write= datapipe[wEnd];
+ current_read[ENV] = datapipe[rEnd];
+ pardang.push_back(current_read[ENV]);
+ next_read[ENV] = -1;
+ next_read[MSG] = -1;
+ current_write[ENV] = -1;
+ current_write[MSG] = -1;
+ break;
+ case postspam:
+ case stub:
+ // no pipe even got created.
+ break;
+ case fail:
+ cerr << "should never happen:: invalid filter" << endl;
+ exeunt(ex_syserr);
+ }
+
+ kidpid[ii] = fork();
+ if (kidpid[ii] == -1) {
+ cerr << progid << " fork failed : ";
+ perror(0);
+ exit(ex_syserr);
+ }
+ iiofpid[kidpid[ii]] = ii;
+ if (!kidpid[ii]) { /*** child code ***/
+ if (verbose) cerr << "top of kid ... loose end " << current_read[MSG] << " for " << ii << endl;
+
+ pid_t kidgroup(0); // process group for all kids is
+ // equal to pid of kid#0
+ if (ii) kidgroup = kidpid[0];
+ if (setpgid(0, kidgroup) != 0) {
+ cerr << "*** kid " << ii
+ << " setpgid failed! " << errno << " ... ";
+ perror(0);
+ } else {
+ // cerr << "*** kid " << ii << " setpgid OK" << endl;
+ }
+
+// ... everybody else has to wait for us to get this far ...
+// ... so that the new process group will be valid ...
+// Write-a-byte synchronization is released when the *first* guy writes.
+ if (ii == 0) {
+ int junk(1);
+ write(sync[wEnd], &junk, 1);
+ //cerr << "sync sent" << endl;
+ }
+#if 0
+ cerr << "kid [" << ii << "] " << getpid()
+ << " kidpid[0]: " << kidpid[0]
+ << " pgid: " << getpgid(0)
+ << " starts" << endl;
+#endif
+
+ close(resync[wEnd]); // send resync
+ //xx cerr << "after sending resync " << ii << endl;
+
+// ... now we must wait for everybody else, because ...
+// ... if we do the exec(), the new process group becomes invalid ...
+// Close synchronization is released when the *last* guy closes.
+ if (ii==0) {
+ int junk;
+ //cerr << "about to read resync" << endl;
+ ssize_t rslt = read(resync[rEnd], &junk, 1);
+ if (rslt < 0 ) {
+ cerr << "bad sync ... " << rslt << endl;
+ // FIXME (maybe?) should this be fatal?
+ } else {
+ // cerr << "back from read resync, good: " << rslt << endl;
+ }
+ }
+
+ switch (filter[ii].mode){
+ case qq:
+ attach(current_read[MSG], 0, ii);
+ attach(current_read[ENV], 1, ii);
+ break;
+ case sa:
+ case series:
+ attach(current_read[MSG], 0, ii);
+ attach(current_write[MSG], 1, ii);
+ break;
+ case stub:
+ case postspam:
+ // nothing to hook up; no pipe was even created.
+ break;
+ case fail:
+ cerr << "should never happen: invalid filter" << endl;
+ exeunt(ex_syserr);
+ break;
+ }
+
+// in all modes:
+// close envelope channel in kid space
+// (leaving it open in parent space)
+ close(current_read[ENV]);
+ close(slurp_write);
+
+//// probe_fd();
+
+ int ntok = filter[ii].cmd.size();
+ const char* prog[1+ntok];
+ for (int jj = 0; jj < ntok; jj++){
+ prog[jj] = filter[ii].cmd[jj].c_str();
+ }
+ prog[ntok] = 0;
+ close(resync[rEnd]);
+ close(sync[rEnd]);
+ close(sync[wEnd]);
+
+ stringstream convert;
+ convert << getpgid(0);
+ const string grouper("HI_Q_GROUP=" + convert.str());
+ if (putenv((char*)grouper.c_str()) != 0) {
+ cerr << "putenv failed" << endl;
+ exit(1);
+ }
+ rslt = Execve(prog[0], prog, environ);
+ cerr << progid << " failed to exec '"
+ << prog[0] << "' : ";
+ perror(0);
+ exit(ex_syserr);
+ }
+
+ /*** parent code ***/
+ if (kidpid[ii] < 0) {
+ cerr << " failure to fork kid#" << ii << " : ";
+ perror(0);
+ exeunt(ex_syserr);
+ }
+
+// these tricks are for kid:
+ close(current_write[MSG]);
+ close(current_write[ENV]);
+ close(current_read[ENV]);
+ for (LI::const_iterator ptr = pardang.begin();
+ ptr != pardang.end(); ptr++) {
+ if (0) cerr << "closing " << *ptr
+ << " for parent of " << kidid << endl;
+
+ close(*ptr);
+ }
+
+// Let kid #0 run a little ways:
+ if (ii==0) {
+ int junk;
+ //cerr << "about to read sync" << endl;
+ ssize_t rslt = read(sync[rEnd], &junk, 1);
+ if (rslt != 1) {
+ cerr << "bad sync ... 1 != " << rslt << endl;
+ } else {
+ //cerr << "back from read sync, good: " << rslt << endl;
+ }
+ }
+
+#if 0
+ cerr << "apparent kid #" << ii
+ << " (" << kidpid[ii] << ") "
+ << endl;
+#endif
+ } /* end loop starting all kids */
+
+// here with the whole pipeline of kids launched
+// parent program continues
+
+ close(resync[wEnd]); // important, so that block gets released
+ close(resync[rEnd]); // less important, just housecleaning
+
+ close(sync[wEnd]); // more housecleaning
+ close(sync[rEnd]);
+
+ close(0); // Housecleaning: the reading end of stdin was
+ // delegated to the first child,
+ // so we don't need it.
+
+ if (verbose) for (unsigned int ii = 0; ii < nkids; ii++) {
+ cerr << progid << " filter[" << ii << "] "
+ << kidpid[ii]
+ << " :; ";
+ for (VS::const_iterator token = filter[ii].cmd.begin();
+ token != filter[ii].cmd.end(); token++){
+ cerr << *token << " ";
+ }
+ cerr << endl;
+ }
+
+ pid_t special_pid = kidpid[nkids-1];
+ int alive(nkids-1); // not counting the special kid
+ int best_blame(0); // best reason, even if not a great reason
+ pid_t argbest_blame(-1); // kid# associated with best blame
+
+ for (;;) {
+ if (alive == 0) break;
+ pid_t somekid = waitpid(-1, &kidstatus, WUNTRACED);
+ if (somekid == special_pid){
+ // do not decrement the "alive" counter
+ // since that only applies to non-special kids
+ if (WIFEXITED(kidstatus)) {
+ cerr << progid << " special kid exited early, status "
+ << WEXITSTATUS(kidstatus)
+ << " with " << alive << " kids still alive"
+ << endl;
+ return(ex_syserr);
+ } else if (WIFSIGNALED(kidstatus)) {
+ int sig = WTERMSIG(kidstatus);
+ if (sig == SIGUSR1) {/* normal, no logging required */}
+ else {
+ cerr << progid << " special kid killed by signal "
+ << sig << endl;
+ // this is not normal
+ return(ex_syserr);
+ }
+ } else {
+ /* paused, not dead */
+ }
+ continue;
+ }
+// here if somekid is not the special kid
+ if (WIFEXITED(kidstatus)) {
+ alive--;
+ int sts = WEXITSTATUS(kidstatus);
+#ifndef PENALIZE_SPAMMERS
+ // ignore penalties for the moment
+ // to see whether there are any false positives
+ if (sts == ex_penaltybox) sts = ex_good;
+#endif
+ if (sts) {
+ argbest_blame = somekid;
+ best_blame = kidstatus;
+ break;
+ }
+ } else if (WIFSIGNALED(kidstatus)) {
+ alive--;
+ argbest_blame = somekid;
+ best_blame = kidstatus;
+ if (WTERMSIG(kidstatus) != SIGUSR1) break;
+ } else {
+ /* kid is paused, not dead */
+ /* not a problem */
+ }
+ }
+// here if all kids have exited normally
+// *or* if there is a great reason for quitting early
+
+///////////////////
+// decode the best reason why the filter-chain terminated
+ //xx cerr << "cleanup: " << best_blame << endl;
+ if (best_blame) {
+ string short_name("");
+ int kidno(iiofpid[argbest_blame]);
+ if (WIFEXITED(best_blame)) {
+ string exword = "???"; // default, should never happen
+ int excode = ex_syserr; // default, should never happen
+ int sts = WEXITSTATUS(best_blame);
+ if (sts == 0){
+ // should never get here
+ // should be no accounting for blame if there was no blame
+ cerr << progid << " should never happen: no child to blame" << endl;
+ exeunt(ex_syserr);
+ }
+
+ if (filter[kidno].mode != sa) {
+ exword = codemap[sts];
+ excode = sts;
+ } else { // here to translate spamc results
+ if (sts == 1) {
+ excode = ex_spam;
+ exword = "spam";
+ } else {
+ excode = ex_syserr;
+ stringstream foo;
+ foo << "bad status: " << sts;
+ exword = foo.str();
+ }
+ }
+ cerr << progid
+ << " concludes: kid[" << kidno << "]"
+ << " i.e. " << basename(filter[kidno].cmd[0])
+ << "[" << argbest_blame << "]"
+ << " reports " << exword << endl;
+ exeunt(excode);
+ } else if (WIFSIGNALED(best_blame)) {
+ int sig = WTERMSIG(best_blame);
+ cerr << progid
+ << " concludes: kid[" << kidno << "]"
+ << " i.e. " << basename(filter[kidno].cmd[0])
+ << "[" << argbest_blame << "]"
+ << " was killed by signal " << sig
+ << endl;
+ // if the *best* blame is a kill, that's not normal
+ exeunt(ex_syserr);
+ }
+ }
+
+// Here if all filters agree this is not spam.
+// Now it is safe to transfer the envelope information:
+
+ if (0) cerr << "about to slurp: "
+ << " cr.MSG: " << current_read[MSG]
+ << " cr.ENV: " << current_read[ENV]
+ << " w.MSG: " << current_write[MSG]
+ << " w.ENV: " << current_write[ENV]
+ << " slurp_read: " << slurp_read
+ << " slurp_write: " << slurp_write
+ << endl;
+
+ slurp(slurp_read, slurp_write);
+ close(slurp_write);
+ close(slurp_read);
+
+// now that the envelope information has been transfered,
+// wait for the last kid in the usual way
+
+ for(;;) {
+ waitpid(special_pid, &kidstatus, WUNTRACED);
+ if (WIFEXITED(kidstatus)) {
+ int sts = WEXITSTATUS(kidstatus);
+ cerr << progid
+ << " says: qq program"
+ << " i.e. " << basename(filter[nkids-1].cmd[0])
+ << "[" << kidpid[nkids-1] << "]"
+ << " returned status " << sts
+ << endl;
+ return sts;
+ } else if (WIFSIGNALED(kidstatus)) {
+ cerr << progid
+ << " says: qq program"
+ << " i.e. " << basename(filter[nkids-1].cmd[0])
+ << "[" << kidpid[nkids-1] << "]"
+ << " was killed by signal " << WTERMSIG(kidstatus)
+ << endl;
+ return ex_syserr;
+ } else {
+ /* paused, not dead */
+ }
+ } /* loop until all kids accounted for */
+ // should never get here;
+ // exit from within loop is the only way out
+}
diff --git a/tools/utils.c b/tools/utils.c
index 691070e..3895215 100644
--- a/tools/utils.c
+++ b/tools/utils.c
@@ -2,6 +2,7 @@
#include <sstream>
#include <iomanip>
#include <stdlib.h> /* for abs() */
+#include <string.h> /* for strerror_r() */
///// <cmath> would not be an improvement
///// due to lack of interger abs()
///// and ambiguous (and inefficient) promotion
@@ -124,3 +125,21 @@ string join(const string sep, const list<string> stuff){
}
return rslt;
}
+
+string strError(const int errnum){
+ char buf[1000];
+ char* rslt = strerror_r(errnum, buf, sizeof(buf));
+ return rslt;
+#ifdef XSI_not_gnu
+ if (rslt) {
+ cerr << "strerror_r() failed: " << rslt << " "
+ perror(0);
+ exit(1);
+ }
+#endif
+}
+
+extern int errno;
+string strError(){
+ return strError(errno);
+}
diff --git a/tools/utils.h b/tools/utils.h
index 0ef0fca..ea6d9ba 100644
--- a/tools/utils.h
+++ b/tools/utils.h
@@ -88,3 +88,6 @@ public:
return current_arg == longer.substr(0, current_arg.length());
}
};
+
+std::string strError(const int);
+std::string strError();