/////////////// // lightweight connection from qmail to filters e.g. spamassassin // (hi-q filter, get it?) // Hint: For testing, see also hi-test.conf which invokes ./hi-test: // ./hi-q hi-test.conf // TODO: Exeunt stop should signal all children. // TODO: Possibly: Wait for all kids in parallel? // That's because they might finish out of order. #include #include /* for exit(), getenv() */ #include /* for perror */ #include #include /* for fork(), wait() */ #include #include using namespace std; #include #include #include #include #include #include #include #include #include /* for fstat */ #include /* .. */ #include /* .. */ // error exit codes, mostly as stated in qmail.c #define bar \ foo(good, 0) ;\ foo(spam, 21) ;\ foo(penaltybox, 22) ;\ foo(permerr, 31) ;\ foo(greylisting, 70) ;\ foo(syserr, 71) ;\ foo(comerr, 74) ; #define foo(name, num) const int ex_ ## name = num bar #undef foo map codemap; #define bar_sa \ foo_sa(GOOD, 0, "ham") ;\ foo_sa(SPAM, 1, "spam") ;\ foo_sa(USAGE, 64, "command line usage error") ;\ foo_sa(DATAERR, 65, "data format error") ;\ foo_sa(NOINPUT, 66, "cannot open input") ;\ foo_sa(NOUSER, 67, "addressee unknown") ;\ foo_sa(NOHOST, 68, "host name unknown") ;\ foo_sa(UNAVAILABLE, 69, "service unavailable") ;\ foo_sa(SOFTWARE, 70, "internal software error") ;\ foo_sa(OSERR, 71, "system error (e.g., can't fork)") ;\ foo_sa(OSFILE, 72, "critical OS file missing") ;\ foo_sa(CANTCREAT, 73, "can't create (user) output file") ;\ foo_sa(IOERR, 74, "input/output error") ;\ foo_sa(TEMPFAIL, 75, "temp failure; user is invited to retry") ;\ foo_sa(PROTOCOL, 76, "remote error in protocol") ;\ foo_sa(NOPERM, 77, "permission denied") ;\ foo_sa(CONFIG, 78, "configuration error") ;\ foo_sa(TOOBIG, 98, "message was too big to process (see --max-size)" string progname; pid_t mypid; string progid; extern char** environ; const int rEnd(0); // end of a pipe for reading const int wEnd(1); // end of a pipe for writing typedef enum {MSG, ENV} channeler; #define bufsize 16384 // meanings: // sa is a filter, using not-very-expressive exit codes: 0=ham 1=spam. // stub is not a filter; no stdin or stdout; just looks at environment. // series is a filter. // qq is not a filter, just an absorber. // // Note that series and stub use the same exit codes as qq. // /* Notation for future use: 0< &kb 1> &scr 2> &scr prog1 # stand-alone 0< &kb 1> &redpipe 2> &scr prog2 # upstream end of pipe 0< &redpipe 1> &scr 2> &scr prog3 # downstream end of pipe 0< &msg 1< &envelope 2> &log qmail-queue 0< &kb 1> &scr 2> &scr 7> &up 8< &down parent 0< &up 1> &down 2> &scr childprocess Simple case: 0< &msg 1> &msg2 2> &log skrewt 0< &msg2 1> &msg3 2> &log spamc 0< &msg3 1< &envelope 2> &log qmail-queue Fancier "triangular piping" case: 0< &msg 1< &env 2> &log 7> &msg2 8> &env2 skrewt 0< &msg2 1> &msg3 2> &log spamc 0< &msg3 1< &env2 2> &log qmail-queue Note that units 7 and 8 are arbitrary and could be renumbered. In contrast, many of the other are fixed by standards and/or traditions. Some questions: How hard is it to detect a /dry/ pipe segment, i.e. one where all the writing-ends have been closed, and all the bytes have been read? This means the reading ends can be closed, freeing up FD units. Similarly, now hard is it to detect a /broken/ pipe segment, i.e. one where all the reading ends have been closed? This means the writing ends can be closed, freeing up FD units. */ typedef enum {series, stub, sa, qq, postspam, fail} moder; class jobber{ public: moder mode; vector cmd; jobber(const moder _mode, const vector _cmd) : mode(_mode), cmd(_cmd) {} jobber(const string _mode, const vector _cmd) : mode(fail), cmd(_cmd){ setmode(_mode); } jobber() : mode(fail), cmd(0) {} void setmode(const string _mode) { if (0) {} else if (_mode == "sa") mode = sa; else if (_mode == "stub") mode = stub; else if (_mode == "series") mode = series; else if (_mode == "qq") mode = qq; else if (_mode == "postspam") mode = postspam; else { cerr << "jobber: bad mode: " << _mode << endl; mode = fail; } } }; // klugey global variable: vector post; // We are fussy about the argument types because we want // this to compile cleanly under g++ as well as gcc, // and each is strict about different things, such that // one or the other will complain unless everything is // done just right. // This is the way execve really behaves: // the characters are held constant // and the (char*) pointers are held constant: int Execve(char const * fn, char const * const * argv, char const * const * env) { // coerce the arg types to match the unwise declaration in unistd.h : return execve(fn, (char*const*) argv, (char*const*) env); } int fork_and_wait(const jobber job){ pid_t kidpid = fork(); if (kidpid == -1) { cerr << progid << " fork failed : "; perror(0); exit(ex_syserr); } int ntok = job.cmd.size(); const char* prog[1+ntok]; for (int jj = 0; jj < ntok; jj++){ prog[jj] = job.cmd[jj].c_str(); } prog[ntok] = 0; if (!kidpid){ /*** child code ***/ Execve(prog[0], prog, environ); cerr << progid << " failed to exec '" << prog[0] << "' : " << endl; perror(0); exit(ex_syserr); } else { /*** parent code ***/ int kidstatus; pid_t somekid; somekid = waitpid(kidpid, &kidstatus, WUNTRACED); if (somekid < 0) { cerr << progid << " ??? waitpid failed : "; perror(0); return(ex_syserr); } if (WIFEXITED(kidstatus)) { int sts = WEXITSTATUS(kidstatus); if (sts != ex_good && sts != ex_spam) { cerr << "hi-q: job " << prog[0] << " unexpectedly returns status: " << sts << endl; exit(sts); } return 0; } else if (WIFSIGNALED(kidstatus)) { int sig = WTERMSIG(kidstatus); if (sig == SIGUSR1) {/* normal, no logging required */} else cerr << progid << " job " << prog[0] << " killed by signal " << sig << endl; return(ex_syserr); } else { /* paused, not dead */ } } return 0; } int fork_and_wait(vector post){ for(vector::const_iterator foo = post.begin(); foo != post.end(); foo++) { int rslt = fork_and_wait(*foo); if (rslt) return rslt; } return 0; } void exeunt(const int sts) { // FIXME: stop other children, maybe? //xxxx cerr << progid << " exeunt called with " << sts << endl; if (sts == ex_spam) fork_and_wait(post); if (sts == ex_penaltybox) exit(ex_spam); exit(sts); } void slurp(const int inch, const int ouch){ char buf[bufsize]; ssize_t todo; for (;;) { ssize_t got = read(inch, buf, bufsize); //xx cerr << "slurp: read returns " << got << endl; if (got == 0) { // EoF break; } if (got < 0) { cerr << progid << " slurp: input error on fd " << inch << " : "; perror(0); exeunt(ex_comerr); } todo = got; while (todo) { ssize_t sent = write(ouch, buf, todo); //xx cerr << "slurp: write returns " << sent << endl; if (sent < 0 && errno != EINTR) { cerr << progid << " slurp: output rror on fd " << ouch << " : "; perror(0); exeunt(ex_comerr); } todo -= sent; } } } void probe_fd(){ int ii; struct stat buf; for (ii = 0; ii < 16; ii++) { int rslt = fstat(ii, &buf); fprintf(stderr, "fd %2d status %2d", ii, rslt); if (rslt==0) fprintf(stderr, " : %d", (int)buf.st_dev); fprintf(stderr, "\n"); } fprintf(stderr, "============\n"); } void blurb(const int ii, const pid_t* kidpid) { int kidstatus; /*pid_t somekid = */ waitpid(kidpid[ii], &kidstatus, WUNTRACED); if (WIFEXITED(kidstatus)) fprintf(stderr, "kid #%d (%d) exited with status %d\n", ii, kidpid[ii], WEXITSTATUS(kidstatus)); if (WIFSIGNALED(kidstatus)) fprintf(stderr, "kid #%d (%d) killed by signal %d\n", ii, kidpid[ii], WTERMSIG(kidstatus)); } void usage() { cerr << "Usage:\n" " hi-q filter.conf\n" "or\n" " HI_Q_CONF=filter.conf hi-q\n"; } //////////////////////////////////////// // we have data coming in on fd 0. // and envelope / control information coming in on fd 1. void dump(const string var){ char* str = getenv(var.c_str()); cerr << progid << var; if (str) cerr << " is set to '" << str << "'" << endl; else cerr << " is not set." << endl; } int xclose(int arg){ cerr << "closing " << arg << endl; return close(arg); } typedef list LI; void block_fd(const LI todo){ int blocker(-1); int inplace(0); for (LI::const_iterator ptr = todo.begin(); ptr != todo.end(); ptr++) { int fd = *ptr; struct stat statbuf; int rslt = fstat(fd, &statbuf); if (rslt) { if (0) { cerr << "**** definitely needed to block_fd unit " << fd << " : "; perror(0); } if (blocker < 0) { int blockex[2]; pipe(blockex); close(blockex[rEnd]); blocker = blockex[wEnd]; } if (blocker != fd){ dup2(blocker, fd); close(blocker); } else { inplace++; } } else { if (0) cerr << "unit " << fd << " already blocked" << endl; } } if (!inplace) close(blocker); } string basename(const string path){ size_t where = path.rfind("/"); if (where != string::npos) return path.substr(1+where); return path; } void attach(const int pipe_end, const int fd, const int kidno){ if (0) cerr << "attaching current pipe_end " << pipe_end << " to " << fd << " for " << kidno << endl; if (pipe_end != fd) { int rslt = dup2(pipe_end, fd); if (rslt < 0) { cerr << progid << " dup2(" << pipe_end << "," << fd << ")" " failed for kid " << kidno << " : "; perror(0); exit(ex_syserr); } close(pipe_end); } } int main(int argc, char** argv) { { progname = *argv; mypid = getpid(); stringstream binder; binder << basename(progname) << "[" << mypid << "]"; progid = binder.str(); } #define foo(name, num) codemap[num] = #name ; bar #undef foo int verbose(0); int kidstatus; int rslt; typedef vector VS; vector filter; string conf_var = "HI_Q_CONF"; char* auth = getenv("QMAIL_AUTHORIZED"); if (auth && *auth) conf_var = "HI_Q_AUCONF"; char* conf_name; if (argc == 1) { conf_name = getenv(conf_var.c_str()); if (!conf_name) { usage(); exit(1); } } if (argc >= 2) { conf_name = argv[1]; } if (argc >= 3) { if (auth && *auth) conf_name = argv[2]; } if (argc > 3) { usage(); exit(1); } ifstream conf; conf.open(conf_name); if (! conf.good()) { cerr << progid << " could not open filter.conf file '" << conf_name << "'" << endl; exit(1); } for (;;) { string line; if (!getline(conf, line).good()) break; istringstream parse(line); jobber job; while (parse.good()){ string token; parse >> token; if (parse.fail()) break; if (token[0] == '#') break; job.cmd.push_back(token); } if (job.cmd.size()) { job.setmode(job.cmd.front()); job.cmd.erase(job.cmd.begin()); } // here with a properly built job descriptor if (job.cmd.size()) { if (job.mode == postspam) { post.push_back(job); } else { filter.push_back(job); } } } unsigned int nkids = filter.size(); // Check for nothing to do. // This is important, because the "last kid" is a special case. // This makes it safe to assume that nkids-1 is non-negative. if (nkids == 0) exit(0); // nothing to do if (verbose) for (unsigned int ii = 0; ii < nkids; ii++) { cerr << progid << " filter[" << ii << "] :; "; for (VS::const_iterator token = filter[ii].cmd.begin(); token != filter[ii].cmd.end(); token++){ cerr << *token << " "; } cerr << endl; } vector kidpid(nkids); // indexed by kid number int sync[2]; int resync[2]; if (pipe(sync) != 0) cerr << "sync pipe failed" << endl; if (pipe(resync) != 0) cerr << "resync pipe failed" << endl; // At this point, there are some loop invariants; // (a) fd0 is open (standard input) and has the email msg, // ready for the next child to read, and // (b) fd1 is open (nonstandard input) and has envelope information. // We need it to be open, so that pipe() // doesn't choose it. That allows N-1 of the kids // to close it and dup() something useful onto it. map iiofpid; map next_read; next_read[MSG] = 0; // our original stdin next_read[ENV] = -1; // no kid is (yet) empowered to read envelope info int slurp_read(1); // our original non-standard input int slurp_write = -1; // effectively next_write[ENV]; map current_read; map current_write; // current kid writes here current_write[MSG] = -1; current_write[ENV] = -1; list blockme; blockme.push_back(0); blockme.push_back(1); // important loop to start all kids for (unsigned int ii=0; ii < nkids; ii++){ /* loop starting all kids */ string kidid; { stringstream foo; foo << ii << " mode " << filter[ii].mode << " " << filter[ii].cmd[0]; kidid = foo.str(); } current_read = next_read; if (verbose) cerr << "top of loop: " << " cr.MSG: " << current_read[MSG] << " cr.ENV: " << current_read[ENV] << " w.MSG: " << current_write[MSG] << " w.ENV: " << current_write[ENV] << " for " << ii << endl; if (current_read[MSG] > 20) exit(99); if (current_read[ENV] > 20) exit(99); int datapipe[2]; switch (filter[ii].mode) { case series: case qq: case sa: // Create a new pipe. // Pipe must be created here (in the parent). // The intended bindings must be figured out shortly below. // Some of the bindings must be hooked up later (in the child), // while others are used by the parent (e.g. envelope slurp). // This pipe will be used (by the children) to connect // this child's output to the next child's input ... // except for the special kid, which reads both fd0 and fd1, // while writing nothing. block_fd(blockme); rslt = pipe(datapipe); if (rslt < 0) { cerr << progid << " could not create datapipe : "; perror(0); exeunt(ex_syserr); } if (0) cerr << "new pipe" << " reading: " << datapipe[rEnd] << " writing: " << datapipe[wEnd] << endl; break; case postspam: case stub: // do not need to create a pipe break; case fail: cerr << "should never happen: invalid filter" << endl; exeunt(ex_syserr); } // figure out the intended bindings: list pardang; switch (filter[ii].mode) { case sa: case series: current_write[MSG] = datapipe[wEnd]; pardang.push_back(current_write[MSG]); pardang.push_back(current_read[MSG]); next_read[MSG] = datapipe[rEnd]; break; case qq: if (slurp_write >= 0){ cerr << "???? multiple qq jobs?" << endl; } slurp_write= datapipe[wEnd]; current_read[ENV] = datapipe[rEnd]; pardang.push_back(current_read[ENV]); next_read[ENV] = -1; next_read[MSG] = -1; current_write[ENV] = -1; current_write[MSG] = -1; break; case postspam: case stub: // no pipe even got created. break; case fail: cerr << "should never happen:: invalid filter" << endl; exeunt(ex_syserr); } kidpid[ii] = fork(); if (kidpid[ii] == -1) { cerr << progid << " fork failed : "; perror(0); exit(ex_syserr); } iiofpid[kidpid[ii]] = ii; if (!kidpid[ii]) { /*** child code ***/ if (verbose) cerr << "top of kid ... loose end " << current_read[MSG] << " for " << ii << endl; pid_t kidgroup(0); // process group for all kids is // equal to pid of kid#0 if (ii) kidgroup = kidpid[0]; if (setpgid(0, kidgroup) != 0) { cerr << "*** kid " << ii << " setpgid failed! " << errno << " ... "; perror(0); } else { // cerr << "*** kid " << ii << " setpgid OK" << endl; } // ... everybody else has to wait for us to get this far ... // ... so that the new process group will be valid ... // Write-a-byte synchronization is released when the *first* guy writes. if (ii == 0) { int junk(1); write(sync[wEnd], &junk, 1); //cerr << "sync sent" << endl; } #if 0 cerr << "kid [" << ii << "] " << getpid() << " kidpid[0]: " << kidpid[0] << " pgid: " << getpgid(0) << " starts" << endl; #endif close(resync[wEnd]); // send resync //xx cerr << "after sending resync " << ii << endl; // ... now we must wait for everybody else, because ... // ... if we do the exec(), the new process group becomes invalid ... // Close synchronization is released when the *last* guy closes. if (ii==0) { int junk; //cerr << "about to read resync" << endl; ssize_t rslt = read(resync[rEnd], &junk, 1); if (rslt < 0 ) { cerr << "bad sync ... " << rslt << endl; // FIXME (maybe?) should this be fatal? } else { // cerr << "back from read resync, good: " << rslt << endl; } } switch (filter[ii].mode){ case qq: attach(current_read[MSG], 0, ii); attach(current_read[ENV], 1, ii); break; case sa: case series: attach(current_read[MSG], 0, ii); attach(current_write[MSG], 1, ii); break; case stub: case postspam: // nothing to hook up; no pipe was even created. break; case fail: cerr << "should never happen: invalid filter" << endl; exeunt(ex_syserr); break; } // in all modes: // close envelope channel in kid space // (leaving it open in parent space) close(current_read[ENV]); close(slurp_write); //// probe_fd(); int ntok = filter[ii].cmd.size(); const char* prog[1+ntok]; for (int jj = 0; jj < ntok; jj++){ prog[jj] = filter[ii].cmd[jj].c_str(); } prog[ntok] = 0; close(resync[rEnd]); close(sync[rEnd]); close(sync[wEnd]); stringstream convert; convert << getpgid(0); const string grouper("HI_Q_GROUP=" + convert.str()); if (putenv((char*)grouper.c_str()) != 0) { cerr << "putenv failed" << endl; exit(1); } rslt = Execve(prog[0], prog, environ); cerr << progid << " failed to exec '" << prog[0] << "' : "; perror(0); exit(ex_syserr); } /*** parent code ***/ if (kidpid[ii] < 0) { cerr << " failure to fork kid#" << ii << " : "; perror(0); exeunt(ex_syserr); } // these tricks are for kid: close(current_write[MSG]); close(current_write[ENV]); close(current_read[ENV]); for (LI::const_iterator ptr = pardang.begin(); ptr != pardang.end(); ptr++) { if (0) cerr << "closing " << *ptr << " for parent of " << kidid << endl; close(*ptr); } // Let kid #0 run a little ways: if (ii==0) { int junk; //cerr << "about to read sync" << endl; ssize_t rslt = read(sync[rEnd], &junk, 1); if (rslt != 1) { cerr << "bad sync ... 1 != " << rslt << endl; } else { //cerr << "back from read sync, good: " << rslt << endl; } } #if 0 cerr << "apparent kid #" << ii << " (" << kidpid[ii] << ") " << endl; #endif } /* end loop starting all kids */ // here with the whole pipeline of kids launched // parent program continues close(resync[wEnd]); // important, so that block gets released close(resync[rEnd]); // less important, just housecleaning close(sync[wEnd]); // more housecleaning close(sync[rEnd]); close(0); // Housecleaning: the reading end of stdin was // delegated to the first child, // so we don't need it. if (verbose) for (unsigned int ii = 0; ii < nkids; ii++) { cerr << progid << " filter[" << ii << "] " << kidpid[ii] << " :; "; for (VS::const_iterator token = filter[ii].cmd.begin(); token != filter[ii].cmd.end(); token++){ cerr << *token << " "; } cerr << endl; } pid_t special_pid = kidpid[nkids-1]; int alive(nkids-1); // not counting the special kid int best_blame(0); // best reason, even if not a great reason pid_t argbest_blame(-1); // kid# associated with best blame for (;;) { if (alive == 0) break; pid_t somekid = waitpid(-1, &kidstatus, WUNTRACED); if (somekid == special_pid){ // do not decrement the "alive" counter // since that only applies to non-special kids if (WIFEXITED(kidstatus)) { cerr << progid << " special kid exited early, status " << WEXITSTATUS(kidstatus) << " with " << alive << " kids still alive" << endl; return(ex_syserr); } else if (WIFSIGNALED(kidstatus)) { int sig = WTERMSIG(kidstatus); if (sig == SIGUSR1) {/* normal, no logging required */} else { cerr << progid << " special kid killed by signal " << sig << endl; // this is not normal return(ex_syserr); } } else { /* paused, not dead */ } continue; } // here if somekid is not the special kid if (WIFEXITED(kidstatus)) { alive--; int sts = WEXITSTATUS(kidstatus); #ifndef PENALIZE_SPAMMERS // ignore penalties for the moment // to see whether there are any false positives if (sts == ex_penaltybox) sts = ex_good; #endif if (sts) { argbest_blame = somekid; best_blame = kidstatus; break; } } else if (WIFSIGNALED(kidstatus)) { alive--; argbest_blame = somekid; best_blame = kidstatus; if (WTERMSIG(kidstatus) != SIGUSR1) break; } else { /* kid is paused, not dead */ /* not a problem */ } } // here if all kids have exited normally // *or* if there is a great reason for quitting early /////////////////// // decode the best reason why the filter-chain terminated //xx cerr << "cleanup: " << best_blame << endl; if (best_blame) { string short_name(""); int kidno(iiofpid[argbest_blame]); if (WIFEXITED(best_blame)) { string exword = "???"; // default, should never happen int excode = ex_syserr; // default, should never happen int sts = WEXITSTATUS(best_blame); if (sts == 0){ // should never get here // should be no accounting for blame if there was no blame cerr << progid << " should never happen: no child to blame" << endl; exeunt(ex_syserr); } if (filter[kidno].mode != sa) { exword = codemap[sts]; excode = sts; } else { // here to translate spamc results if (sts == 1) { excode = ex_spam; exword = "spam"; } else { excode = ex_syserr; stringstream foo; foo << "bad status: " << sts; exword = foo.str(); } } cerr << progid << " concludes: kid[" << kidno << "]" << " i.e. " << basename(filter[kidno].cmd[0]) << "[" << argbest_blame << "]" << " reports " << exword << endl; exeunt(excode); } else if (WIFSIGNALED(best_blame)) { int sig = WTERMSIG(best_blame); cerr << progid << " concludes: kid[" << kidno << "]" << " i.e. " << basename(filter[kidno].cmd[0]) << "[" << argbest_blame << "]" << " was killed by signal " << sig << endl; // if the *best* blame is a kill, that's not normal exeunt(ex_syserr); } } // Here if all filters agree this is not spam. // Now it is safe to transfer the envelope information: if (0) cerr << "about to slurp: " << " cr.MSG: " << current_read[MSG] << " cr.ENV: " << current_read[ENV] << " w.MSG: " << current_write[MSG] << " w.ENV: " << current_write[ENV] << " slurp_read: " << slurp_read << " slurp_write: " << slurp_write << endl; slurp(slurp_read, slurp_write); close(slurp_write); close(slurp_read); // now that the envelope information has been transfered, // wait for the last kid in the usual way for(;;) { waitpid(special_pid, &kidstatus, WUNTRACED); if (WIFEXITED(kidstatus)) { int sts = WEXITSTATUS(kidstatus); cerr << progid << " says: qq program" << " i.e. " << basename(filter[nkids-1].cmd[0]) << "[" << kidpid[nkids-1] << "]" << " returned status " << sts << endl; return sts; } else if (WIFSIGNALED(kidstatus)) { cerr << progid << " says: qq program" << " i.e. " << basename(filter[nkids-1].cmd[0]) << "[" << kidpid[nkids-1] << "]" << " was killed by signal " << WTERMSIG(kidstatus) << endl; return ex_syserr; } else { /* paused, not dead */ } } /* loop until all kids accounted for */ // should never get here; // exit from within loop is the only way out }