/* See LICENSE for copyright, license, and disclaimer. */

static const char *RCSID = "@(#) $Header: /u/cvsroot/nsd-modules/dqd_threadpool/threadpool.c,v 1.2 2000/11/23 01:14:36 mayoff Exp $, compiled: " __DATE__ " " __TIME__;

#include "ns.h"

typedef struct Job Job;
typedef struct PoolThread PoolThread;
typedef struct ThreadPool ThreadPool;

struct Job {
    Job *nextJobPtr;
    char jobid[22];
    int size;
    char *script;
    char *result;
};

struct PoolThread {
    PoolThread *nextThreadPtr;

    ThreadPool *poolPtr;
    unsigned long serial;
    int busy;
};

struct ThreadPool {

    char *server;
    char *module;

    int trace;

    /*
     * Maximum total size of the scripts in the queue.  If the size
     * would exceed this, then dqd_threadpool queue blocks.  Zero
     * means unlimited.
     */
    int maxSize;

    int minThreads;
    int maxThreads;
    int maxIdleSeconds;

    int shutdownPending;

    Ns_Mutex lock;
    Ns_Cond jobCond;
    Ns_Cond resultCond;

    int curThreads;
    int idleThreads;
    unsigned long serial;

    PoolThread *firstFreeThreadPtr;

    /* Current total size of the scripts in the queue. */
    int curSize;

    /* Number of threads blocked trying to enqueue jobs. */
    int jobPosters;

    Job *firstJobPtr;
    Job *lastJobPtr;

    Tcl_HashTable jobTable;
};

int Ns_ModuleInit(char *server, char *module);
static Ns_TclInterpInitProc InterpInit;
static int ConfigIntDefault(char *module, char *path, char *name, int def);
int ConfigBoolDefault(char *module, char *path, char *name, int def);
static void AtStartup(ThreadPool *tp);
static void CreateThread(ThreadPool *tp);
static void AtExit(ThreadPool *tp);
static void WarmUpThread(ThreadPool *tp);
static void ThreadPoolProc(PoolThread *pt);
static void DoJob(ThreadPool *tp, Job *jobPtr);
static void ThreadPoolArgProc(Tcl_DString *dsPtr, PoolThread *pt);
static ThreadPool *GetPoolByName(char *name);
static int ThreadPoolCmd(ClientData dummy, Tcl_Interp *interp, int argc,
    char *argv[]);
static int CreateCmd(Tcl_Interp *interp, ThreadPool *tp, char *script,
    int detach);
static int GetCmd(Tcl_Interp *interp, ThreadPool *tp, char *jobid);
static int WaitCmd(Tcl_Interp *interp, ThreadPool *tp, char *idlist);

int Ns_ModuleVersion = 1;
Ns_Mutex lock;

static int initialized = 0;
static Tcl_HashTable poolTable;

int
Ns_ModuleInit(char *server, char *module)
{
    ThreadPool *tp;
    char *path;
    Tcl_HashEntry *ePtr;
    int new;
    int i;

    if (!initialized) {
	initialized = 1;

	Ns_Log(Notice, "dqd_threadpool release 1.1", NULL);

	Tcl_InitHashTable(&poolTable, TCL_STRING_KEYS);
	Ns_MutexSetName2(&lock, "dqd_threadpool.master", NULL);
	Ns_RegisterProcInfo(ThreadPoolProc, "threadpool",
	    (Ns_ArgProc *) ThreadPoolArgProc);
	Ns_TclInitInterps(server, InterpInit, NULL);
    }

    path = Ns_ConfigGetPath(server, module, NULL);
    tp = (ThreadPool *) ns_calloc(1, sizeof *tp);
    tp->server = server;
    tp->module = module;

    tp->trace = ConfigBoolDefault(module, path, "trace", NS_FALSE);
    tp->maxSize = ConfigIntDefault(module, path, "maxsize", 0);

    tp->minThreads = ConfigIntDefault(module, path, "minthreads", 0);
    tp->maxThreads = ConfigIntDefault(module, path, "maxthreads", 4);
    if (tp->maxThreads < tp->minThreads) {
	tp->maxThreads = tp->minThreads;
	Ns_Log(Notice, "%s: maxthreads < minthreads; increased to %d",
	    module, tp->maxThreads);
    }

    if (tp->maxThreads < 1) {
	Ns_Log(Notice, "%s: maxthreads < 1: thread pool disabled", module);
	ns_free(tp);
	return NS_OK;
    }

    tp->maxIdleSeconds = ConfigIntDefault(module, path, "maxidle", 30);

    Ns_MutexSetName2(&tp->lock, module, NULL);

    if (tp->minThreads > 0) {
	Ns_RegisterAtStartup((Ns_Callback *) AtStartup, tp);
    }

    Ns_RegisterAtExit((Ns_Callback *) AtExit, tp);

    tp->firstFreeThreadPtr = (PoolThread *) ns_calloc(tp->maxThreads,
	sizeof *tp->firstFreeThreadPtr);
    for (i = 0; i < tp->maxThreads; i++) {
	tp->firstFreeThreadPtr[i].poolPtr = tp;
	tp->firstFreeThreadPtr[i].nextThreadPtr =
	    tp->firstFreeThreadPtr + i + 1;
    }
    tp->firstFreeThreadPtr[i - 1].nextThreadPtr = NULL;

    Tcl_InitHashTable(&tp->jobTable, TCL_STRING_KEYS);

    ePtr = Tcl_CreateHashEntry(&poolTable, module, &new);
    Tcl_SetHashValue(ePtr, tp);

    return NS_OK;
}

static int
InterpInit(Tcl_Interp *interp, void *arg)
{
    Tcl_CreateCommand(interp, "dqd_threadpool", ThreadPoolCmd, NULL, NULL);
    return NS_OK;
}

static int
ConfigIntDefault(char *module, char *path, char *name, int def)
{
    int value;
    if (Ns_ConfigGetInt(path, name, &value) == NS_FALSE) {
        value = def;
    }
    Ns_Log(Notice, "%s: %s = %d", module, name, value);
    return value;
}

int
ConfigBoolDefault(char *module, char *path, char *name, int def)
{
    int value;
    if (Ns_ConfigGetBool(path, name, &value) == NS_FALSE) {
        value = def;
    }
    Ns_Log(Notice, "%s: %s = %d", module, name, value);
    return value;
}

static void
CreateThread(ThreadPool *tp)
{
    PoolThread *pt;

    if (tp->curThreads < tp->maxThreads) {
	pt = tp->firstFreeThreadPtr;
	tp->firstFreeThreadPtr = pt->nextThreadPtr;
	pt->serial = ++tp->serial;
	pt->busy = 1;
	if (tp->trace) {
	    Ns_Log(Notice, "%s: creating thread %d",
		tp->module, pt->serial);
	}
	Ns_ThreadCreate((Ns_ThreadProc *) ThreadPoolProc, pt, 0, 0);
	tp->curThreads++;
    } else if (tp->trace) {
	Ns_Log(Notice, "%s: not creating new thread", tp->module);
    }
}

static void
AtStartup(ThreadPool *tp)
{
    Ns_MutexLock(&tp->lock);
    while (tp->curThreads < tp->minThreads) {
	CreateThread(tp);
    }
    Ns_MutexUnlock(&tp->lock);
}

static void
AtExit(ThreadPool *tp)
{
    tp->shutdownPending = 1;
    Ns_CondBroadcast(&tp->jobCond);
}

static void
WarmUpThread(ThreadPool *tp)
{
    /*
     * Creating the Tcl interpreter can result in a lot of calls to
     * malloc.  If lots of threads try to start up simultaneously,
     * they can spend a lot of time waiting for the system malloc.
     * The global lock prevents that.
     */
    Ns_MutexLock(&lock);
    Ns_TclDeAllocateInterp(Ns_TclAllocateInterp(tp->server));
    Ns_MutexUnlock(&lock);
}

static void
ThreadPoolProc(PoolThread *pt)
{
    ThreadPool *tp;
    char buf[100];
    Ns_Time idleTimeout;
    Ns_Time *idleTimeoutPtr;
    int status;
    Job *jobPtr;
    Tcl_Interp *interp;
    char *script;
    int size;
    int freeJob;

    tp = pt->poolPtr;

    sprintf(buf, "%s.%lu", tp->module, pt->serial);
    Ns_ThreadSetName(buf);

    if (tp->trace) Ns_Log(Notice, "initializing");
    WarmUpThread(tp);

    if (tp->trace) Ns_Log(Notice, "starting");

    Ns_MutexLock(&tp->lock);
    tp->idleThreads++;
    pt->busy = 0;

    while (1) {

	if (tp->curThreads > tp->minThreads && tp->maxIdleSeconds > 0) {
	    Ns_GetTime(&idleTimeout);
	    Ns_IncrTime(&idleTimeout, tp->maxIdleSeconds, 0);
	    idleTimeoutPtr = &idleTimeout;
	} else {
	    idleTimeoutPtr = NULL;
	}

	status = NS_OK;
	while (
	    !tp->shutdownPending
	    && status == NS_OK
	    && tp->firstJobPtr == NULL
	) {
	    if (tp->trace) Ns_Log(Notice, "waiting");
	    status = Ns_CondTimedWait(&tp->jobCond, &tp->lock, idleTimeoutPtr);
	}

	if (tp->firstJobPtr == NULL) {
	    break;
	}

	pt->busy = 1;
	tp->idleThreads--;

	jobPtr = tp->firstJobPtr;
	tp->firstJobPtr = jobPtr->nextJobPtr;
	if (tp->firstJobPtr == NULL) {
	    tp->lastJobPtr = NULL;
	}

	if (tp->trace) {
	    Ns_Log(Notice, "got job %s", jobPtr->jobid);
	    Ns_Log(Notice, "tp->firstJobPtr = %p", tp->firstJobPtr);
	    Ns_Log(Notice, "idleThreads = %d", tp->idleThreads);
	}

	Ns_MutexUnlock(&tp->lock);

	/*
	 * Save these away, because by the time DoJob returns,
	 * jobPtr may have been freed.
	 */
	freeJob = jobPtr->jobid[0] == '\000';
	size = jobPtr->size;
	script = jobPtr->script;

	DoJob(tp, jobPtr);

	if (tp->trace) Ns_Log(Notice, "done with job");

	ns_free(script);
	if (freeJob) {
	    ns_free(jobPtr);
	}

	Ns_MutexLock(&tp->lock);

	tp->idleThreads++;
	tp->curSize -= size;
	if (tp->jobPosters > 0) {
	    Ns_CondBroadcast(&tp->jobCond);
	}

	if (tp->trace) {
	    Ns_Log(Notice, "idleThreads = %d", tp->idleThreads);
	    Ns_Log(Notice, "curSize = %d", tp->curSize);
	    Ns_Log(Notice, "jobPosters = %d", tp->jobPosters);
	}

	pt->busy = 0;
    }

    pt->nextThreadPtr = tp->firstFreeThreadPtr;
    tp->firstFreeThreadPtr = pt;
    tp->curThreads--;
    tp->idleThreads--;

    Ns_MutexUnlock(&tp->lock);

    if (tp->trace) Ns_Log(Notice, "exiting");
    Ns_ThreadExit(0);
}

static void
DoJob(ThreadPool *tp, Job *jobPtr)
{
    Tcl_Interp *interp;
    int status;
    char buf[22];
    Tcl_DString ds;
    Tcl_HashEntry *ePtr;
    int new;

    interp = Ns_TclAllocateInterp(tp->server);
    status = Tcl_GlobalEval(interp, jobPtr->script);

    if (jobPtr->jobid[0] != '\000') {
	Tcl_DStringInit(&ds);
	sprintf(buf, "%d", status);
	Tcl_DStringAppendElement(&ds, buf);
	Tcl_DStringAppendElement(&ds, interp->result);
	jobPtr->result = ns_strdup(Tcl_DStringValue(&ds));
	Tcl_DStringFree(&ds);

	Ns_MutexLock(&tp->lock);
	Ns_CondBroadcast(&tp->resultCond);
	Ns_MutexUnlock(&tp->lock);
    }

    Ns_TclDeAllocateInterp(interp);
}

static void
ThreadPoolArgProc(Tcl_DString *dsPtr, PoolThread *pt)
{
    Tcl_DStringAppendElement(dsPtr, pt->busy ? "busy" : "idle");
}

ThreadPool *
GetPoolByName(char *name)
{
    Tcl_HashEntry *ePtr;

    ePtr = Tcl_FindHashEntry(&poolTable, name);
    return (ePtr == NULL) ? NULL : (ThreadPool *) Tcl_GetHashValue(ePtr);
}

static int
ThreadPoolCmd(ClientData dummy, Tcl_Interp *interp, int argc, char *argv[])
{
    ThreadPool *tp;

    if (argc == 4) {
	tp = GetPoolByName(argv[2]);
	if (tp == NULL) {
	    Tcl_AppendResult(interp, "invalid thread pool name \"",
		argv[2], "\"", NULL);
	    return TCL_ERROR;
	}

	switch (argv[1][0]) {
	    case 'c':
		if (strcmp(argv[1], "create") != 0) {
		    break;
		}
		return CreateCmd(interp, tp, argv[3], 0);

	    case 'd':
		if (strcmp(argv[1], "detach") != 0) {
		    break;
		}
		return CreateCmd(interp, tp, argv[3], 1);

	    case 'w':
		if (strcmp(argv[1], "wait") != 0) {
		    break;
		}
		return WaitCmd(interp, tp, argv[3]);

	    case 'g':
		if (strcmp(argv[1], "get") != 0) {
		    break;
		}
		return GetCmd(interp, tp, argv[3]);
	}
    }

    Tcl_AppendResult(interp, "usage: ", argv[0],
	" command threadpool ?args?\n"
	"    command is one of: create detach wait get", NULL);
    return TCL_ERROR;
}

static int
CreateCmd(Tcl_Interp *interp, ThreadPool *tp, char *script, int detach)
{
    Tcl_HashEntry *ePtr;
    Job *jobPtr;
    int size;
    int new;

    jobPtr = (Job *) ns_calloc(1, sizeof *jobPtr);
    jobPtr->size = strlen(script);
    jobPtr->script = ns_strdup(script);

    Ns_MutexLock(&tp->lock);

	if (!detach) {
	    sprintf(jobPtr->jobid, "%lu", ++tp->serial);
	    Tcl_SetResult(interp, jobPtr->jobid, TCL_VOLATILE);

	    ePtr = Tcl_CreateHashEntry(&tp->jobTable, jobPtr->jobid, &new);
	    if (!new) {
		Ns_Fatal("%s: collision on job id %s",
		    tp->module, jobPtr->jobid);
	    }
	    Tcl_SetHashValue(ePtr, jobPtr);
	}

	if (tp->maxSize > 0) {
	    tp->jobPosters++;
	    while (
		tp->curSize > 0
		&& tp->curSize + jobPtr->size > tp->maxSize
	    ) {
		Ns_CondWait(&tp->jobCond, &tp->lock);
	    }
	    tp->jobPosters--;
	}

	tp->curSize += jobPtr->size;

	if (tp->firstJobPtr == NULL) {
	    tp->firstJobPtr = jobPtr;
	} else {
	    tp->lastJobPtr->nextJobPtr = jobPtr;
	}
	tp->lastJobPtr = jobPtr;
	jobPtr->nextJobPtr = NULL;

	if (tp->idleThreads == 0) {
	    CreateThread(tp);
	}

	Ns_CondBroadcast(&tp->jobCond);

    Ns_MutexUnlock(&tp->lock);

    return TCL_OK;
}

static int
GetCmd(Tcl_Interp *interp, ThreadPool *tp, char *jobid)
{
    Tcl_HashEntry *ePtr;
    Job *jobPtr = NULL;
    int status;

    Ns_MutexLock(&tp->lock);

    ePtr = Tcl_FindHashEntry(&tp->jobTable, jobid);
    if (ePtr == NULL) {
	Tcl_AppendResult(interp, "invalid job id \"", jobid, "\"", NULL);
	status = TCL_ERROR;
    } else {
	jobPtr = Tcl_GetHashValue(ePtr);

	if (jobPtr->result == NULL) {
	    Tcl_AppendResult(interp, "job ", jobid, " not ready", NULL);
	    status = TCL_ERROR;
	} else {
	    Tcl_DeleteHashEntry(ePtr);
	    status = TCL_OK;
	}
    }

    Ns_MutexUnlock(&tp->lock);

    if (status == TCL_OK) {
	Tcl_SetResult(interp, jobPtr->result, TCL_DYNAMIC);
	ns_free(jobPtr);
    }

    return status;
}

static int
WaitCmd(Tcl_Interp *interp, ThreadPool *tp, char *idlist)
{
    int idc;
    char **idv;
    char *mergePtr;
    void *splitPtr;
    Tcl_DString ds;
    int i;
    Tcl_HashEntry *ePtr;
    Job *jobPtr;

    if (Tcl_SplitList(interp, idlist, &idc, &idv) != TCL_OK) {
	return TCL_ERROR;
    }

    splitPtr = idv;

    Tcl_DStringInit(&ds);

    Ns_MutexLock(&tp->lock);

    while (1) {
	for (i = 0; i < idc; ) {
	    ePtr = Tcl_FindHashEntry(&tp->jobTable, idv[i]);
	    if (ePtr == NULL) {
		idv[i] = idv[idc - 1];
		idc--;
	    } else {
		jobPtr = Tcl_GetHashValue(ePtr);
		if (jobPtr->result == NULL) { 
		    i++;
		} else {
		    Tcl_DStringAppendElement(&ds, idv[i]);
		    idv[i] = idv[idc - 1];
		    idc--;
		}
	    }
	}

	if (idc == 0 || Tcl_DStringLength(&ds) > 0) {
	    break;
	}

	Ns_CondWait(&tp->resultCond, &tp->lock);
    }

    Ns_MutexUnlock(&tp->lock);

    Tcl_AppendElement(interp, Tcl_DStringValue(&ds));
    Tcl_DStringFree(&ds);
    mergePtr = Tcl_Merge(idc, idv);
    Tcl_AppendElement(interp, mergePtr);

    ns_free(splitPtr);
    ns_free(mergePtr);

    return TCL_OK;
}

