summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Denker <jsd@av8n.com>2012-07-23 12:42:23 -0700
committerJohn Denker <jsd@av8n.com>2012-07-29 15:32:36 -0700
commit1c7804ee064dadadb06e786efcf4992e2a3b2028 (patch)
treee2d31905dd747e5334861514e24d2578168cc59d
parente2778bb957309e8d66f4a385c45e403125b40696 (diff)
much more logical about keeping track of pipes and how they are used
-rw-r--r--.gitignore1
-rw-r--r--tools/hi-q.c195
-rw-r--r--tools/hi-test.c33
-rwxr-xr-xtools/hi-test.conf7
-rwxr-xr-xtools/hi-test5.conf6
5 files changed, 161 insertions, 81 deletions
diff --git a/.gitignore b/.gitignore
index 574561f..ad1d359 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
*~
*.a
*.o
+*.d
*.lib
*.0
*.orig
diff --git a/tools/hi-q.c b/tools/hi-q.c
index 114570f..5ee7688 100644
--- a/tools/hi-q.c
+++ b/tools/hi-q.c
@@ -64,6 +64,7 @@ foo_sa(CONFIG, 78, "configuration error") ;\
foo_sa(TOOBIG, 98, "message was too big to process (see --max-size)"
+typedef enum {MSG, ENV} channeler;
#define bufsize 16384
@@ -198,6 +199,7 @@ void slurp(const int inch, const int ouch){
ssize_t todo;
for (;;) {
ssize_t got = read(inch, buf, bufsize);
+ //xx cerr << "slurp: read returns " << got << endl;
if (got == 0) { // EoF
break;
}
@@ -210,6 +212,7 @@ void slurp(const int inch, const int ouch){
todo = got;
while (todo) {
ssize_t sent = write(ouch, buf, todo);
+ //xx cerr << "slurp: write returns " << sent << endl;
if (sent < 0 && errno != EINTR) {
fprintf(stderr, "hi-q: output error on fd%d : ", ouch);
perror(0);
@@ -282,6 +285,22 @@ string basename(const string path){
return path;
}
+void attach(const int pipe_end, const int fd, const int kidno){
+ cerr << "attaching current pipe_end " << pipe_end
+ << " to " << fd
+ << " for " << kidno << endl;
+ if (pipe_end != fd) {
+ int rslt = dup2(pipe_end, fd);
+ if (rslt < 0) {
+ fprintf(stderr, "hi-q: dup2(%d,%d) failed for kid %d : ", pipe_end, fd, kidno);
+ perror(0);
+ exit(ex_syserr);
+ }
+ close(pipe_end);
+ }
+
+}
+
int main(int argc, char** argv) {
{
progname = *argv;
@@ -299,7 +318,6 @@ bar
int kidstatus;
int rslt;
- int loose_end = 0; // our original stdin
typedef vector<string> VS;
vector<jobber> filter;
@@ -395,11 +413,28 @@ bar
// to close it and dup() something useful onto it.
map<int,int> iiofpid;
-
+ map<channeler,int> next_read;
+ next_read[MSG] = 0; // our original stdin
+ next_read[ENV] = -1; // no kid is (yet) empowered to read envelope info
+ int slurp_read(1); // our original non-standard input
+ int slurp_write = -1; // effectively next_write[ENV];
+ map<channeler,int> current_read;
+ map<channeler,int> cur_write; // current kid writes here
+ cur_write[MSG] = -1;
+ cur_write[ENV] = -1;
+
+// important loop to start all kids
for (unsigned int ii=0; ii < nkids; ii++){ /* loop starting all kids */
- //xx cerr << "top of loop ... loose end " << loose_end << " for " << ii << endl;
- if (loose_end > 20) exit(99);
- int kid_end;
+ current_read = next_read;
+
+ cerr << "top of loop: "
+ << " cr.MSG: " << current_read[MSG]
+ << " cr.ENV: " << current_read[ENV]
+ << " w.MSG: " << cur_write[MSG]
+ << " w.ENV: " << cur_write[ENV]
+ << " for " << ii << endl;
+ if (current_read[MSG] > 20) exit(99);
+ if (current_read[ENV] > 20) exit(99);
int datapipe[2];
@@ -407,17 +442,15 @@ bar
case series:
case qq:
case sa:
-// connect *old* loose end to this kid's stdin
- //xx cerr << "moving old loose end " << loose_end << " to 0 for " << ii << endl;
- if (loose_end) {
- close(0);
- dup2(loose_end, 0);
- close(loose_end);
- }
-// Create a pipe, which will be used to connect
-// this child's fd1 to the next child's fd0 ...
-// except for the last kid, which reads both fd0 and fd1,
+// Create a new pipe.
+// Pipe must be created here (in the parent).
+// The intended bindings must be figured out shortly below.
+// Some of the bindings must be hooked up later (in the child),
+// while others are used by the parent (e.g. envelope slurp).
+// This pipe will be used (by the children) to connect
+// this child's output to the next child's input ...
+// except for the special kid, which reads both fd0 and fd1,
// while writing nothing.
rslt = pipe(datapipe);
if (rslt < 0) {
@@ -425,6 +458,10 @@ bar
perror(0);
exeunt(ex_syserr);
}
+ if (1) cerr << "new pipe"
+ << " reading: " << datapipe[rEnd]
+ << " writing: " << datapipe[wEnd]
+ << endl;
break;
case postspam:
case stub:
@@ -435,20 +472,23 @@ bar
exeunt(ex_syserr);
}
-// For N-1 kids, the loose end feeds forward.
-// It will be written by this kid and read by the next kid.
-// For the special kid, the loose end will be its nonstandard input.
-// It will be written by us (hi-q) and read by the last kid.
-
+// figure out the intended bindings:
switch (filter[ii].mode) {
- case series:
case sa:
- loose_end = datapipe[rEnd];
- kid_end = datapipe[wEnd];
+ case series:
+ cur_write[MSG] = datapipe[wEnd];
+ next_read[MSG] = datapipe[rEnd];
break;
case qq:
- loose_end = datapipe[wEnd]; // reverse of normal "series" case
- kid_end = datapipe[rEnd]; // reverse of normal "series" case
+ if (slurp_write >= 0){
+ cerr << "???? multiple qq jobs?" << endl;
+ }
+ slurp_write= datapipe[wEnd];
+ current_read[ENV] = datapipe[rEnd];
+ next_read[ENV] = -1;
+ next_read[MSG] = -1;
+ cur_write[ENV] = -1;
+ cur_write[MSG] = -1;
break;
case postspam:
case stub:
@@ -467,7 +507,7 @@ bar
}
iiofpid[kidpid[ii]] = ii;
if (!kidpid[ii]) { /*** child code ***/
- if (verbose) cerr << "top of kid ... loose end " << loose_end << " for " << ii << endl;
+ if (verbose) cerr << "top of kid ... loose end " << current_read[MSG] << " for " << ii << endl;
pid_t kidgroup(0); // process group for all kids is
// equal to pid of kid#0
@@ -513,31 +553,19 @@ bar
}
}
- if (0) cerr << "before closing loose end " << loose_end
- << " and kid end " << kid_end
- << " for " << ii << endl;
switch (filter[ii].mode){
- case sa:
case qq:
+ attach(current_read[MSG], 0, ii);
+ attach(current_read[ENV], 1, ii);
+ break;
+ case sa:
case series:
- close(loose_end); // the reading end is none of this kid's business
- // except last kid: writing end
-
- // Note this does an implicit close on the previously-open fd1:
- rslt = dup2(kid_end, 1); // the writing end is stdout for this kid
- // except last kid: nonstandard input
- if (rslt < 0) {
- fprintf(stderr, "hi-q: kid %d: dup2(%d,1) failed: ", ii, kid_end);
- perror(0);
- exit(ex_syserr);
- }
- close(kid_end); // use fd1 instead now
- // OK, at this point this kid is set up to read fd0 and write fd1
- // (except last kid reads fd1 as well as fd0).
+ attach(current_read[MSG], 0, ii);
+ attach(cur_write[MSG], 1, ii);
break;
case stub:
case postspam:
- // nothing to do
+ // nothing to hook up; no pipe was even created.
break;
case fail:
cerr << "should never happen: invalid filter" << endl;
@@ -545,6 +573,12 @@ bar
break;
}
+// in all modes:
+// close envelope channel in kid space
+// (leaving it open in parent space)
+ close(current_read[ENV]);
+ close(slurp_write);
+
//// probe_fd();
int ntok = filter[ii].cmd.size();
@@ -576,7 +610,10 @@ bar
perror(0);
exeunt(ex_syserr);
}
- close(kid_end);
+
+// these tricks are for kid:
+ close(cur_write[MSG]);
+ close(cur_write[ENV]);
// Let kid #0 run a little ways:
if (ii==0) {
@@ -598,6 +635,7 @@ bar
} /* end loop starting all kids */
// here with the whole pipeline of kids launched
+// parent program continues
close(resync[wEnd]); // important, so that block gets released
close(resync[rEnd]); // less important, just housecleaning
@@ -730,35 +768,46 @@ bar
// Here if all filters agree this is not spam.
// Now it is safe to transfer the envelope information:
- slurp(1, loose_end);
- close(1);
- close(loose_end);
+
+ if (0) cerr << "about to slurp: "
+ << " cr.MSG: " << current_read[MSG]
+ << " cr.ENV: " << current_read[ENV]
+ << " w.MSG: " << cur_write[MSG]
+ << " w.ENV: " << cur_write[ENV]
+ << " slurp_read: " << slurp_read
+ << " slurp_write: " << slurp_write
+ << endl;
+
+ slurp(slurp_read, slurp_write);
+ close(slurp_write);
+ close(slurp_read);
// now that the envelope information has been transfered,
// wait for the last kid in the usual way
- {
- for(;;) {
- waitpid(special_pid, &kidstatus, WUNTRACED);
- if (WIFEXITED(kidstatus)) {
- int sts = WEXITSTATUS(kidstatus);
- cerr << progid
- << " says: qq program"
- << " i.e. " << basename(filter[nkids-1].cmd[0])
- << "[" << kidpid[nkids-1] << "]"
- << " returned status " << sts
- << endl;
- return sts;
- } else if (WIFSIGNALED(kidstatus)) {
- cerr << progid
- << " says: qq program"
- << " i.e. " << basename(filter[nkids-1].cmd[0])
- << "[" << kidpid[nkids-1] << "]"
- << " was killed by signal " << WTERMSIG(kidstatus)
- << endl;
- return ex_syserr;
- } else {
- /* paused, not dead */
- }
+
+ for(;;) {
+ waitpid(special_pid, &kidstatus, WUNTRACED);
+ if (WIFEXITED(kidstatus)) {
+ int sts = WEXITSTATUS(kidstatus);
+ cerr << progid
+ << " says: qq program"
+ << " i.e. " << basename(filter[nkids-1].cmd[0])
+ << "[" << kidpid[nkids-1] << "]"
+ << " returned status " << sts
+ << endl;
+ return sts;
+ } else if (WIFSIGNALED(kidstatus)) {
+ cerr << progid
+ << " says: qq program"
+ << " i.e. " << basename(filter[nkids-1].cmd[0])
+ << "[" << kidpid[nkids-1] << "]"
+ << " was killed by signal " << WTERMSIG(kidstatus)
+ << endl;
+ return ex_syserr;
+ } else {
+ /* paused, not dead */
}
- }
+ } /* loop until all kids accounted for */
+ // should never get here;
+ // exit from within loop is the only way out
}
diff --git a/tools/hi-test.c b/tools/hi-test.c
index e2626cc..0661ada 100644
--- a/tools/hi-test.c
+++ b/tools/hi-test.c
@@ -3,6 +3,7 @@
#include <stdlib.h>
#include <string>
#include <signal.h>
+#include <sstream>
#include <stdio.h> /* perror() */
@@ -13,6 +14,8 @@ const int sa_good(0);
const int sa_spam(1);
const int sa_usage(64);
+int verbosity(0);
+
////////////////
// little utility to help with argument parsing:
//
@@ -42,10 +45,12 @@ void exeunt(const int sts){
using namespace std;
string progname;
+string progid;
+int mypid;
void dump(const string var){
char* str = getenv(var.c_str());
- cerr << progname << ": " << var;
+ cerr << progid << " " << var;
if (str) cerr << " is set to '" << str << "'" << endl;
else cerr << " is not set." << endl;
}
@@ -55,10 +60,19 @@ void countsome(const int unit){
int total(0);
for (;;) {
int rslt = read(unit, buf, sizeof(buf));
+ if (verbosity) cerr << "hi-test: count: unit " << unit
+ << " read returns " << rslt << endl;
if (rslt <= 0) break;
total += rslt;
}
- cerr << "read " << total << " bytes from unit " << unit << endl;
+ cerr << progid
+ << " read " << total << " bytes from unit " << unit << endl;
+}
+
+string basename(const string path){
+ size_t where = path.rfind("/");
+ if (where != string::npos) return path.substr(1+where);
+ return path;
}
int main(int _argc, const char** _argv){
@@ -68,7 +82,16 @@ int main(int _argc, const char** _argv){
int countmode(0);
int argc(_argc);
const char **argv(_argv);
- progname = *argv; argv++; argc--;
+
+ {
+ progname = *argv;
+ mypid = getpid();
+ stringstream binder;
+ binder << "+++++ " << basename(progname) << "[" << mypid << "]";
+ progid = binder.str();
+ }
+
+ argv++; argc--;
while (argc) {
string arg(*argv); argv++; argc--;
@@ -113,8 +136,8 @@ int main(int _argc, const char** _argv){
exit(sa_usage);
}
}
-
- cerr << "++++ hi-test pid: " << getpid() << " group: " << getpgid(0);
+
+ cerr << progid << " group: " << getpgid(0);
char* foo = getenv("HI_Q_GROUP");
if (foo) cerr << " HI_Q_GROUP: " << foo;
cerr << endl;
diff --git a/tools/hi-test.conf b/tools/hi-test.conf
index f692f37..20df5a7 100755
--- a/tools/hi-test.conf
+++ b/tools/hi-test.conf
@@ -1,6 +1,7 @@
-#! /usr/local/bin/bash-c set -x ; </dev/null TCPREMOTEHOST=asf TCPREMOTEIP=1.2.3.4 hi-q $0 ; echo $?
+#! /usr/local/bin/bash-c set -x ; 1</tmp/a TCPREMOTEHOST=asf TCPREMOTEIP=1.2.3.4 hi-q $0 ; echo $?
# another comment, with blank line between
-series hi-test x0 -snooze 10
-stub hi-test x1 -snooze 1 -exit 21 -kill
+series /bin/echo asdf
+series hi-test x0 -snooze 2
+# stub hi-test x1 -snooze 1 -exit 21 -kill
qq hi-test x2 -snooze 10
diff --git a/tools/hi-test5.conf b/tools/hi-test5.conf
new file mode 100755
index 0000000..524c954
--- /dev/null
+++ b/tools/hi-test5.conf
@@ -0,0 +1,6 @@
+#! /usr/local/bin/bash-c set -x ; 1</tmp/a TCPREMOTEHOST=asf TCPREMOTEIP=1.2.3.4 hi-q $0 ; echo $?
+
+# another comment, with blank line between
+series /bin/echo asdfasdfasdf
+series /bin/cat
+qq hi-test -count