commit 9ddbee573566ad0106d1583a3862e15417480ee6
parent 839210cbb387a66eb3b06143923be6e283ddca0b
Author: Antoine Amarilli <a3nm@a3nm.net>
Date: Sun, 17 Jun 2012 00:13:19 +0200
preliminary attempts for fifos
Diffstat:
irctk.c | | | 178 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------- |
1 file changed, 141 insertions(+), 37 deletions(-)
diff --git a/irctk.c b/irctk.c
@@ -5,6 +5,7 @@
#include <argp.h>
+#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <stdarg.h>
@@ -29,6 +30,7 @@ enum event_tos {NOTHING, COMMAND, MESSAGE};
// TODO get rid of that
#define MAX_LEN 4096
#define MAX_NICK_LEN 4096
+#define LINE_BUFFER 4096
#define max(a, b) ((b) > (a) ? (b) : (a))
@@ -447,6 +449,82 @@ void debug_args()
}
+typedef struct line {
+ char* line;
+ char* last_in;
+ char* last_out;
+} line;
+
+typedef struct fifo {
+ int hd;
+ int tl;
+ line queue[LINE_BUFFER];
+ pthread_cond_t empty;
+ pthread_cond_t full;
+ pthread_mutex_t mutex;
+} fifo;
+
+int full(fifo f) {
+ return f.hd == f.tl;
+}
+int empty(fifo f) {
+ return ((f.tl - f.hd + LINE_BUFFER) % LINE_BUFFER) == 1;
+}
+
+line pop(fifo f) {
+ line result;
+ int was_full;
+ pthread_mutex_lock(&f.mutex);
+ while (empty(f))
+ pthread_cond_wait(&f.empty, &f.mutex);
+ // now fifo isn't empty
+ was_full = full(f);
+ result = f.queue[f.hd];
+ f.hd++;
+ f.hd %= LINE_BUFFER;
+ if (was_full)
+ pthread_cond_signal(&f.full);
+ pthread_mutex_unlock(&f.mutex);
+ return result;
+}
+
+void push(fifo f, char* l, char* li, char* lo) {
+ int was_empty;
+ pthread_mutex_lock(&f.mutex);
+ while (full(f))
+ pthread_cond_wait(&f.full, &f.mutex);
+ // now fifo isn't full
+ was_empty = empty(f);
+ f.queue[f.tl].line = l;
+ f.queue[f.tl].last_in = li;
+ f.queue[f.tl].last_out = lo;
+ f.tl++;
+ f.tl %= LINE_BUFFER;
+ if (was_empty)
+ pthread_cond_signal(&f.empty);
+ pthread_mutex_unlock(&f.mutex);
+}
+
+void init(fifo f) {
+ f.hd = 0;
+ f.tl = 1;
+ assert(!pthread_cond_init(&f.empty, NULL));
+ assert(!pthread_cond_init(&f.full, NULL));
+ assert(!pthread_mutex_init(&f.mutex, NULL));
+}
+
+void destroy(fifo f) {
+ pthread_cond_destroy(&f.empty);
+ pthread_cond_destroy(&f.full);
+ pthread_mutex_destroy(&f.mutex);
+}
+
+// fifos for input and output
+// we want to read lines at the time we get them even if we are delaying writes
+// on the irc channel, and we want to output lines without blocking
+fifo fifo_in, fifo_out;
+
+
/* Our main chan is either the first CLI chan or our own chan if there are no
* CLI chans */
char* first_chan()
@@ -576,7 +654,7 @@ int cmd_msg_chan(irc_session_t *s, char *target, char* line)
}
// Handle a command request which might not specify a destination
-int cmd_msg(irc_session_t *s, char* target, char* line)
+int cmd_msg(irc_session_t *s, char* target, line l)
{
int i = 0;
int ret;
@@ -588,33 +666,33 @@ int cmd_msg(irc_session_t *s, char* target, char* line)
switch(args.default_destination)
{
case DEFAULT_FIRST:
- return cmd_msg_chan(s, first_chan(), line);
+ return cmd_msg_chan(s, first_chan(), l.line);
case DEFAULT_LAST_IN:
if (args.show_nick_prefix && args.last_nick_in[0] && args.last_chan_in[0] == '#')
{
- if (line[0] != '\n')
+ if (l.line[0] != '\n')
{
msg[0] = 0;
strncat((char*) msg, args.last_nick_in, MAX_LEN-1);
strcat((char*) msg, ": ");
- strncat((char*) msg, line, MAX_LEN-1);
- line = (char*) msg;
+ strncat((char*) msg, l.line, MAX_LEN-1);
+ l.line = (char*) msg;
} else {
args.last_nick_in[0] = 0;
}
}
- return cmd_msg_chan(s, args.last_chan_in, line);
+ return cmd_msg_chan(s, l.last_in, l.line);
case DEFAULT_LAST_OUT:
- return cmd_msg_chan(s, args.last_chans_out, line);
+ return cmd_msg_chan(s, l.last_out, l.line);
case DEFAULT_ALL:
//TODO also for chans joined at runtime
//TODO2: ugly, could have generated the comma-separated string
//like for last_chans_out!
for (i = 0; i < args.n_channels; i++)
- ret = max(ret, cmd_msg_chan(s, args.channels[i], line));
+ ret = max(ret, cmd_msg_chan(s, args.channels[i], l.line));
/* TODO return value */
return ret;
@@ -622,7 +700,7 @@ int cmd_msg(irc_session_t *s, char* target, char* line)
return 42; // won't happen
}
} else {
- return cmd_msg_chan(s, target, line);
+ return cmd_msg_chan(s, target, l.line);
}
}
@@ -900,7 +978,7 @@ irc_session_t* do_connect()
}
// The IRC thread entry point
-static void* irc_listen (void *arg)
+static void* irc_thread (void *arg)
{
irc_session_t * sp = (irc_session_t *) arg;
// TODO, also do you need '#' or not?
@@ -909,13 +987,33 @@ static void* irc_listen (void *arg)
return 0;
}
+// The fifo_in thread entry point
+static void* fifo_in_thread (void *arg) {
+ char *line = NULL;
+ int res, size=100;
+ line = (char*) malloc(size+1);
+
+ while ((res = getline((char**) &line, (size_t*) &size, stdin)) != -1) {
+ push(fifo_in, line, args.last_chan_in, args.last_chans_out);
+ }
+ // sentinel
+ push(fifo_in, NULL, NULL, NULL);
+ free(line);
+ return 0;
+}
+
+// The fifo_out thread entry point
+static void* fifo_out_thread (void *arg) {
+ return 0;
+}
+
+
// Start the IRC thread and monitor stdin
int start (int max_wait)
{
irc_session_t * s;
- pthread_t tid;
- char *line = NULL;
- int res, size=100;
+ pthread_t tid_irc, tid_in, tid_out;
+ line l;
struct timeval tp1, tp2;
long tp;
int first = 1;
@@ -925,15 +1023,16 @@ int start (int max_wait)
debug("Connection request successful!");
- debug("Starting thread...");
-
- if (pthread_create (&tid, 0, irc_listen, (void*) s) != 0)
+ debug("Starting threads...");
+ if (pthread_create (&tid_irc, 0, irc_thread, (void*) s) != 0)
+ die(E_THREAD, "Could not create thread: %s", strerror(errno));
+ if (pthread_create (&tid_in, 0, fifo_in_thread, (void*) NULL) != 0)
+ die(E_THREAD, "Could not create thread: %s", strerror(errno));
+ if (pthread_create (&tid_out, 0, fifo_out_thread, (void*) NULL) != 0)
die(E_THREAD, "Could not create thread: %s", strerror(errno));
debug("Thread started!");
- line = (char*) malloc(size+1);
-
if (args.show_inferred && args.default_destination == DEFAULT_LAST_OUT)
fprintf(stderr, "[%s] ", args.last_chans_out);
@@ -959,49 +1058,52 @@ int start (int max_wait)
}
}
- while ((res = getline((char**) &line, (size_t*) &size, stdin)) != -1) {
- debug("startloop : got %s, waiting", line);
+ while ((l = pop(fifo_in)).line) {
+ debug("startloop : got %s, waiting", l.line);
if (!args.ready) {
info("Connection lost, reconnecting...");
return start(max_wait);
}
- debug("ready", line);
-
- /* TODO don't do that but have a buffer of things to send, so we can save to
- * whom we are sending what despite the delays */
+ debug("ready", l.line);
/* TODO no wait on empty lines */
-
+ /* TODO delay per channel */
gettimeofday(&tp2, NULL);
tp = 1000000*(tp2.tv_sec - tp1.tv_sec) + tp2.tv_usec - tp1.tv_usec;
if (tp < args.interval && !first)
usleep(args.interval - tp);
first = 0;
- if (line[0] == '[' && args.destination_prefix)
+ if (l.line[0] == '[' && args.destination_prefix)
{
int i=0;
char *msg;
- char *target = line+1;
+ char *tmp;
+ char *target = l.line+1;
+ if (!l.line[0])
+ target = NULL;
- while (line[i] != ']' && line[i])
+ while (l.line[i] != ']' && l.line[i])
i++;
- if (!line[i])
+ if (!l.line[i])
die(E_BADLINE, "Malformed address prefix");
- msg = line + i + 1;
+ msg = l.line + i + 1;
if (msg[0] == ' ')
msg++;
- line[i] = 0;
- cmd_msg(s, target, msg);
- strncpy(args.last_chans_out, target, MAX_LEN-1);
+ l.line[i] = 0;
+ tmp = l.line;
+ l.line = msg;
+ cmd_msg(s, target, l);
+ l.line = tmp;
+ strncpy(l.last_out, target, MAX_LEN-1);
} else {
/* No target specified, we attempt the default */
- cmd_msg(s, "", line);
+ cmd_msg(s, "", l);
}
if (args.show_inferred && args.default_destination == DEFAULT_LAST_OUT)
- fprintf(stderr, "[%s] ", args.last_chans_out);
+ fprintf(stderr, "[%s] ", l.last_out);
usleep(args.interval);
debug("endloop");
@@ -1009,10 +1111,8 @@ int start (int max_wait)
debug("exiting");
- line = NULL;
usleep(2*args.interval);
irc_disconnect(s);
- free(line);
return 0;
}
@@ -1029,6 +1129,10 @@ int main (int argc, char **argv)
strncpy(args.last_chan_in, first_chan(), MAX_LEN-1);
strncpy(args.last_chans_out, first_chan(), MAX_LEN-1);
+ // initialize the fifos
+ init(fifo_in);
+ init(fifo_out);
+
// start trying to connet with the initial retry interval
return start(args.retry_after);
}