CONNX Data Integration Suite 14.8.0 | CONNX Embedded SQL for Adabas | Threading SQL Applications | Implementing the Threading Models
 
Implementing the Threading Models
The three threading models can be achieved as follows:
*The Single Threading model is in operation when an application has only one thread and uses no ALLOCATE/DEALLOCATE SQLCONTEXT statements. A single SQL Context is allocated transparently by the client support libraries.
*The Bound Threading model is in operation when an application has multiple threads and uses no ALLOCATE/DEALLOCATE SQLCONTEXT statements. Multiple SQL Contexts (one per thread) are allocated transparently by the client support libraries. Each SQL Context is bound to one thread.
*The Free Threading model is in operation when an application has multiple threads and uses the ALLOCATE/DEALLOCATE SQLCONTEXT statements. The SQL application is responsible for allocating all SQL Contexts, and for managing their use by the threads. Any thread can use any SQL Context, as permitted by the application.
Note: 
It also possible to have a single-threaded application that uses multiple SQL Contexts; this is a variation of the Free Threading Model.
Example 1: Simple Use of ALLOCATE/DEALLOCATE SQLCONTEXT
In order to aid clarity, the first example is a Single Threaded application that simply connects and then disconnects to a database. The purpose is to demonstrate the basic use of the ALLOCATE SQLCONTEXT and DEALLOCATE SQLCONTEXT statements. Subsequent examples will demonstrate multi-threaded use of SQL Contexts.
#include <stdio.h>
void main(void)
{
exec sql begin declare section;
int SQLCODE;
exec sql end declare section;
SAGContext sqlCtx;

/* Declare the SQL context. */
memset(&sqlCtx, 0, sizeof(SAGContext));
exec sql allocate sqlcontext as :&sqlCtx

exec sql connect to 'testDB' user 'DBA' password 'dba';
printf("connect returned SQLCODE %i\n", SQLCODE);
if (SQLCODE != 0) exit(1);
exec sql disconnect;
printf("disconnect returned SQLCODE %i\n", SQLCODE);
if (SQLCODE != 0) exit(2);
/* Deallocate the SQL context. */
exec sql deallocate sqlcontext as :&sqlCtx

exit(0);
}
In this example, one SQL Context is defined, initialized, and identified in an ALLOCATE SQLCONTEXT statement. The program connects to a database and immediately disconnects. The SQL Context is then deallocated.
Things to note about this example:
*Storage for the SQL Context ( sqlCtx) is allocated by the user application, in this case by defining the context as a dynamic variable.
*It is very important that the SQL Context is set to zero before use. The application will fail if this is not done.
*The type of the host variable that is supplied in the ALLOCATE SQLCONTEXT statement must evaluate to (SAGContext *), hence :&sqlCtx.
*All connections must be closed before the DEALLOCATE SQLCONTEXT statement is executed. Otherwise, an error will be returned to the user.
*In this example, the ALLOCATE SQLCONTEXT and DEALLOCATE SQLCONTEXT statements are actually not needed. The application would function identically if they were removed, since the client support libraries will transparently allocate one SQL Context for the single thread. Their purpose in this example is simply to illustrate their use.
Example 2: Multiple SQL Contexts, Multiple Threads
The second example illustrates an online book sales application that uses the Free Threading model.
In this simplified example, there are three threads in the application which serve a customer population of ten. Each customer requests a book search, buys one book, and logs off. Each work thread has a main loop, in which it gets some work from a queue and does the work.
Therefore, any work packet could potentially be processed by any thread. There is no pre-ordained sequence of which thread processes which work packet - it is simply a matter of which thread becomes available first.
The ten customers are represented by an array of the structure AppContext. The AppContext structure encapsulates all the application data pertaining to one user. Note that this contains the SQL Context structure, as well as other application status data. The work queue is simulated by a statically initialized array of the structure WorkPacket.
The main() function starts three threads, which all run the workThread() function and wait for it to finish. The main loop of workThread() gets work from the commandQueue array and - depending on the commandCode in the WorkPacket - calls listBooks(), buyBook() or logoffUser().
The example uses threading and locking functions that are declared in kbthreads.h. These are not listed in order to aid clarity.
#include <stdio.h>
#include <stdlib.h>
#include <kbthreads.c>
static int debug_print = 1;
#define TESTAPP_DBNAME "testDB"
#define TESTAPP_USERNAME "DBA"
#define TESTAPP_PASSWORD "dba"
#define NUM_THREADS (3) /* Number of threads to start. */
#define NUM_CONTEXTS (10) /* Number of contexts in use. */
enum CommandCodes { LISTBOOKS, BUYBOOK, LOGOFF, TERMINATE, num_cmds };
/* Thread count & mutex. */
mutex_t thread_count_mutex = NULL ;
int thread_count = 0 ;
/* Application Context (includes SQL Context). */
typedef struct {
int userID;
int appFlags;
mutex_t appContext_mutex;
SAGContext sqlContext;
} AppContext;
#define APPFLAG_CONNECTED (1 << 0)
#define APPFLAG_WORK_DONE (1 << 1)
AppContext userTable[] = {
{ 1000, 0, 0, 0 }, { 1001, 0, 0, 0 }, { 1002, 0, 0, 0 },
{ 1003, 0, 0, 0 }, { 1004, 0, 0, 0 }, { 1005, 0, 0, 0 },
{ 1006, 0, 0, 0 }, { 1007, 0, 0, 0 }, { 1008, 0, 0, 0 },
{ 1009, 0, 0, 0 }
};
/* Command Queue & mutex. */
typedef struct {
int appContext;
int commandCode;
char userArg[20];
char results[10][100];
} WorkPacket;
WorkPacket commandQueue[] = {
{ 1, LISTBOOKS, "computers" }, { 2, LISTBOOKS, "languages" },
{ 0, LISTBOOKS, "politics" }, { 2, BUYBOOK, "0-002-90009-0" },
{ 0, BUYBOOK, "0-003-90009-0" }, { 1, BUYBOOK, "0-001-90009-0" },
{ 0, LOGOFF }, { 1, LOGOFF },
{ 2, LOGOFF }, { 3, LISTBOOKS, "travel" },
{ 3, BUYBOOK, "0-004-90009-0" }, { 3, LOGOFF },
{ 4, LISTBOOKS, "sport" }, { 4, BUYBOOK, "0-005-90009-0" },
{ 5, LISTBOOKS, "cinema" }, { 5, BUYBOOK, "0-006-90009-0" },
{ 6, LISTBOOKS, "health" }, { 6, BUYBOOK, "0-007-90009-0" },
{ 6, LOGOFF }, { 9, LISTBOOKS, "humour" },
{ 7, LISTBOOKS, "poetry" }, { 8, LISTBOOKS, "history" },
{ 8, BUYBOOK, "0-010-90009-0" }, { 7, BUYBOOK, "0-009-90009-0" },
{ 9, BUYBOOK, "0-008-90009-0" }, { 7, LOGOFF },
{ 5, LOGOFF }, { 4, LOGOFF },
{ 9, LOGOFF }, { 8, LOGOFF },
{ 0, TERMINATE }
};
mutex_t commandQueue_mutex = NULL;
int cqIndex = 0;
int cqLength = 31;
exec sql begin declare section;
char testDb[20];
char testUser[20];
char testPass[20];
exec sql end declare section;
int sqlError(SQL_INTEGER sqlcodeOrig,
char * stmt,
AppContext * pAppContext,
int appCtxNo)
{
exec sql begin declare section;
int SQLCODE;
int i;
int errCnt = 0;
char errTxt[128];
exec sql end declare section;
SAGContext * pSqlCtx = &pAppContext->sqlContext;
if (sqlcodeOrig < 0)
{
/* Error */
printf("C%i: %s: returns SQLCODE=%i\n", appCtxNo, stmt, sqlcodeOrig);
exec sql allocate sqlcontext as :pSqlCtx;
exec sql get diagnostics :errCnt = number;
for (i = 1; i <= errCnt; i++)
{
memset(errTxt, 0, 128);
exec sql get diagnostics exception :i :errTxt = MESSAGE_TEXT;
printf("C%i: %s: SQLCODE=%i, %s\n",
appCtxNo, stmt, sqlcodeOrig, errTxt);
}
/* Tidy-up work/connections. */
if (pAppContext->appFlags & APPFLAG_WORK_DONE)
{
pAppContext->appFlags &= ~APPFLAG_WORK_DONE;
exec sql rollback work;
sqlError(SQLCODE, "rollback work", pAppContext, appCtxNo);
}
if (pAppContext->appFlags & APPFLAG_CONNECTED)
{
pAppContext->appFlags &= ~APPFLAG_CONNECTED;
exec sql disconnect;
sqlError(SQLCODE, "disconnect", pAppContext, appCtxNo);
}
return(-1);
}
else
{
return(0);
}
}
int listBooks(WorkPacket * pWork, int threadNo, int workPacketNo)
{
exec sql begin declare section;
int SQLCODE = 0;
char userChoice[20] = { 0 };
char theISBN[20] = { 0 };
char theTitle[60] = { 0 };
int thePrice = 0;
exec sql end declare section;
int rowCount;
AppContext * pAppCtx = &userTable[pWork->appContext];
SAGContext * pSqlCtx = &pAppCtx->sqlContext;
/* Set up the SQL context host variable. */
exec sql allocate sqlcontext as :pSqlCtx;
if (!(pAppCtx->appFlags & APPFLAG_CONNECTED))
{
exec sql connect to :testDb user :testUser password :testPass;
if (sqlError(SQLCODE, "connect", pAppCtx, pWork->appContext))
{
return(-1);
}
else
{
pAppCtx->appFlags |= APPFLAG_CONNECTED;
}
}
strcpy(userChoice, pWork->userArg);
exec sql declare listCur cursor for
select isbn, title, price from books where category = :userChoice;
exec sql open listCur;
if (sqlError(SQLCODE, "open", pAppCtx, pWork->appContext))
{
return(-1);
}
for (SQLCODE = 0, rowCount = 0; SQLCODE == 0; rowCount++)
{
exec sql fetch listCur into :theISBN, :theTitle, :thePrice;
if (SQLCODE == 100)
{
break;
}
else if (sqlError(SQLCODE, "fetch", pAppCtx, pWork->appContext))
{
return(-1);
}
else
{
sprintf(pWork->results[rowCount],
"%s,%s,%i\n", theISBN, theTitle, thePrice);
}
}
exec sql close listCur;
if (sqlError(SQLCODE, "close", pAppCtx, pWork->appContext))
{
return(-1);
}
if (debug_print)
{
printf(" listBooks(): T%i C%i WP%i - success\n",
threadNo, pWork->appContext, workPacketNo);
}
return (0);
}
int buyBook(WorkPacket * pWork, int threadNo, int workPacketNo)
{
exec sql begin declare section;
int SQLCODE = 0;
int thisOrdnum = 0;
int thisUser = 0;
char theISBN[20] = { 0 };
exec sql end declare section;
AppContext * pAppCtx = &userTable[pWork->appContext];
SAGContext * pSqlCtx = &pAppCtx->sqlContext;
/* Set up the SQL context host variable. */
exec sql allocate sqlcontext as :pSqlCtx;
thisOrdnum = workPacketNo;
thisUser = pAppCtx->userID;
strcpy(theISBN, pWork->userArg);
exec sql insert into orders (ordnum, userid, isbn)
values (:thisOrdnum, :thisUser, :theISBN);
if (sqlError(SQLCODE, "insert", pAppCtx, pWork->appContext))
{
return(-1);
}
exec sql commit work;
if (sqlError(SQLCODE, "insert", pAppCtx, pWork->appContext))
{
return(-1);
}
if (debug_print)
{
printf(" buyBook(): T%i C%i WP%i - success\n",
threadNo, pWork->appContext, workPacketNo);
}
return (0);
}
int logoffUser(WorkPacket * pWork, int threadNo, int workPacketNo)
{
exec sql begin declare section;
int SQLCODE = 0;
exec sql end declare section;
AppContext * pAppCtx = &userTable[pWork->appContext];
SAGContext * pSqlCtx = &pAppCtx->sqlContext;
/* Set up the SQL context host variable. */
exec sql allocate sqlcontext as :pSqlCtx;
exec sql disconnect current;
if (sqlError(SQLCODE, "disconnect", pAppCtx, pWork->appContext))
{
return(-1);
}
else
{
pAppCtx->appFlags &= ~APPFLAG_CONNECTED;
if (debug_print)
{
printf("logoffUser(): T%i C%i WP%i - success\n",
threadNo, pWork->appContext, workPacketNo);
}
return (0);
}
}
static DWORD WINAPI workThread(LPVOID threadArg)
{
int threadNo = (int) threadArg;
int retval = 0;
WorkPacket * pWork;
AppContext * pAppCtx;
int workCode;
int workPacketNo;
Sleep(THREAD_INIT_SLEEP_TIME) ;
do
{
if (!lock_mutex(commandQueue_mutex, INFINITE))
{
printf("FAILURE: workThread(%i): lock_mutex(cq) FAILS\n", threadNo);
}
workPacketNo = cqIndex;
if (cqIndex < (cqLength - 1))
{
cqIndex++;
}
pWork = &commandQueue[workPacketNo];
workCode = pWork->commandCode;
if (workCode != TERMINATE)
{
pAppCtx = &userTable[pWork->appContext];
if (!lock_mutex(pAppCtx->appContext_mutex, INFINITE))
{
printf("FAILURE: workThread(%i): lock_mutex(aq %i) FAILS\n",
threadNo, pWork->appContext);
}
}
/* Safe to unlock command queue now. */
if (!unlock_mutex(commandQueue_mutex))
{
printf("FAILURE: workThread(%i): unlock_mutex(cq) FAILS\n",
threadNo);
}
switch (workCode)
{
case LISTBOOKS:
retval = listBooks(pWork, threadNo, workPacketNo);
break;
case BUYBOOK:
retval = buyBook(pWork, threadNo, workPacketNo);
break;
case LOGOFF:
retval = logoffUser(pWork, threadNo, workPacketNo);
break;
case TERMINATE:
printf(" <TERMINATE>: T%i C%i WP%i - success\n",
threadNo, pWork->appContext, workPacketNo);
}
if (workCode != TERMINATE)
{
if (!unlock_mutex(pAppCtx->appContext_mutex))
{
printf("FAILURE: workThread(%i): unlock_mutex(aq %i) FAILS\n",
threadNo, pWork->appContext);
}
}
} while ((workCode != TERMINATE) && (retval == 0));
/* Reduce thread_count. */
if (!lock_mutex(thread_count_mutex, INFINITE))
{
printf("FAILURE: workThread(%i): lock_mutex(tc) FAILS\n", threadNo);
}
thread_count--;
if (!unlock_mutex(thread_count_mutex))
{
printf("FAILURE: workThread(%i): unlock_mutex(tc) FAILS\n", threadNo);
}
exit_thread(EXIT_THREAD_CODE);
return(0);
}
int main(int argc, char ** argv)
{
int threadNo, ctxCount, old_count = 0;
bool_t finished = FALSE;
thread_t thread_h[NUM_THREADS];
SAGContext * pSqlCtx = 0;
exec sql begin declare section;
int SQLCODE;
exec sql end declare section;
/* Set up the db, user & passwd. */
strcpy(testDb, TESTAPP_DBNAME);
strcpy(testUser, TESTAPP_USERNAME);
strcpy(testPass, TESTAPP_PASSWORD);
/* Create thread-count and command queue mutex locks. */
if (!create_mutex(FALSE, &thread_count_mutex)
|| !create_mutex(FALSE, &commandQueue_mutex))
{
printf("Can't create TC/CQ mutex lock\n");
exit(ABORT_THREAD_CODE);
}
/* Create mutex locks for application contexts. */
for (ctxCount = 0; ctxCount < 10; ctxCount++)
{
if (!create_mutex(FALSE, &userTable[ctxCount].appContext_mutex))
{
printf("Can't create AC mutex lock\n");
exit(ABORT_THREAD_CODE);
}
}
/* Start the worker threads. */
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++)
{
if (!lock_mutex(thread_count_mutex, INFINITE))
{
printf("FAILURE: main: lock_mutex 1 FAILS\n");
}
thread_count++;
create_thread(workThread, (void *)threadNo, &thread_h[threadNo]);
if (!unlock_mutex(thread_count_mutex))
{
printf("FAILURE: main: unlock_mutex 1 FAILS\n");
}
}
/* Wait for threads to finish. */
while (!finished)
{
if (!lock_mutex(thread_count_mutex, INFINITE))
{
printf("FAILURE: main: lock_mutex 2 FAILS\n");
}
if (thread_count != old_count)
{
old_count = thread_count;
printf("%d threads running\n", thread_count);
}
finished = (thread_count <= 0);
if (!unlock_mutex(thread_count_mutex))
{
printf("FAILURE: main: unlock_mutex 2 FAILS\n");
}
if (!finished)
{
Sleep(TIMEOUT_SLEEP);
}
}
/* Deallocate all SQL contexts. */
exec sql allocate sqlcontext as :pSqlCtx;
for (ctxCount = 0; ctxCount < 10; ctxCount++)
{
pSqlCtx = &userTable[ctxCount].sqlContext;
exec sql deallocate sqlcontext as :pSqlCtx;
}
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++)
{
CloseHandle(thread_h[threadNo]);
}
printf("Finished.\n");
}
Things to note:
*The initialization of the userTable structure ensures that the SAGContext structure (the SQL Context) is initialized to zeroes before it is used. It is essential that an SQL application does this.
*The AppContext structure contains a lock mechanism ( appContext_mutex) which is used to serialize access on a customer's SQL Context. This avoids errors being returned by the client support libraries, which will detect whether an SQL Context is already in use. Applications are responsible for managing access to SQL Contexts. They must either ensure that only one thread can access an SQL Context at a time (as this example does), or be prepared to handle errors from the SQL interface.
*SQL diagnostic information is held in the SQL Context. Therefore, the error reporting function sqlError() also specifies which SQL Context it is using.
*Note that all the SQL Contexts are deallocated by the main thread after all the work threads have finished. It is possible to deallocate the active context after the disconnect statement in the logoffUser() function. The application can deallocate an SQL Context whenever it has finished with it, providing it contains no active SQL connections.