2 #include <netinet/tcp.h>
5 inline int test_and_set(volatile unsigned int *addr) {
7 /* Note: the "xchg" instruction does not need a "lock" prefix */
8 __asm__ __volatile__ ("xchgl %0, %1"
9 : "=r" (oldval), "=m" (*(addr))
10 : "0" (1), "m" (*(addr)));
13 inline void UnLock(volatile unsigned int *addr) {
15 /* Note: the "xchg" instruction does not need a "lock" prefix */
16 __asm__ __volatile__ ("xchgl %0, %1"
17 : "=r" (oldval), "=m" (*(addr))
18 : "0" (0), "m" (*(addr)));
21 # error need implementation of test_and_set
26 inline void Lock(volatile unsigned int *s) {
27 while(test_and_set(s)) {
38 sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size) {
39 if((sockhash = calloc(1, sizeof(sockPoolHashTable_t))) == NULL) {
40 printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
44 socknode_t **nodelist;
45 if ((nodelist = calloc(size, sizeof(socknode_t *))) < 0) {
46 printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
51 sockhash->table = nodelist;
52 sockhash->inuse = NULL;
53 sockhash->size = size;
54 sockhash->mask = size - 1;
60 int createNewSocket(unsigned int mid) {
63 if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
64 printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
67 setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
68 struct sockaddr_in remoteAddr;
69 bzero(&remoteAddr, sizeof(remoteAddr));
70 remoteAddr.sin_family = AF_INET;
71 remoteAddr.sin_port = htons(LISTEN_PORT);
72 remoteAddr.sin_addr.s_addr = htonl(mid);
73 if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
74 perror("socket connect: ");
75 printf("%s(): Error %d connecting to %s:%d\n", __func__, errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
82 int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
84 int key = mid&(sockhash->mask);
87 Lock(&sockhash->mylock);
88 ptr=&(sockhash->table[key]);
91 if (mid == (*ptr)->mid) {
95 tmp->next=sockhash->inuse;
97 UnLock(&sockhash->mylock);
102 UnLock(&sockhash->mylock);
103 if((sd = createNewSocket(mid)) != -1) {
104 socknode_t *inusenode = calloc(1, sizeof(socknode_t));
106 inusenode->mid = mid;
107 insToListWithLock(sockhash, inusenode);
114 int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) {
116 int key = mid&(sockhash->mask);
119 ptr=&(sockhash->table[key]);
122 if (mid == (*ptr)->mid) {
123 socknode_t *tmp=*ptr;
126 tmp->next=sockhash->inuse;
132 if((sd = createNewSocket(mid)) != -1) {
133 socknode_t *inusenode = calloc(1, sizeof(socknode_t));
134 inusenode->next=sockhash->inuse;
135 sockhash->inuse=inusenode;
142 int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) {
144 int key = mid&(sockhash->mask);
147 ptr=&(sockhash->table[key]);
150 if (mid == (*ptr)->mid) {
155 if((sd = createNewSocket(mid)) != -1) {
156 *ptr=calloc(1, sizeof(socknode_t));
165 /*socket pool with multiple TR threads asking to connect to same machine */
166 int getSock2WithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
168 int key = mid&(sockhash->mask);
171 Lock(&sockhash->mylock);
172 ptr=&(sockhash->table[key]);
174 if (mid == (*ptr)->mid) {
175 UnLock(&sockhash->mylock);
180 UnLock(&sockhash->mylock);
181 if((sd = createNewSocket(mid)) != -1) {
182 socknode_t *inusenode = calloc(1, sizeof(socknode_t));
184 inusenode->mid = mid;
185 addSockWithLock(sockhash, inusenode);
192 void addSockWithLock(sockPoolHashTable_t *sockhash, socknode_t *ptr) {
193 int key = ptr->mid&(sockhash->mask);
194 Lock(&sockhash->mylock);
195 ptr->next = sockhash->table[key];
196 sockhash->table[key] = ptr;
197 UnLock(&sockhash->mylock);
200 void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
201 Lock(&sockhash->mylock);
202 inusenode->next = sockhash->inuse;
203 sockhash->inuse = inusenode;
204 UnLock(&sockhash->mylock);
207 void freeSock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) {
208 int key = mid&(sockhash->mask);
209 socknode_t *ptr = sockhash->inuse;
210 sockhash->inuse = ptr->next;
213 ptr->next = sockhash->table[key];
214 sockhash->table[key] = ptr;
217 void freeSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) {
218 int key = mid&(sockhash->mask);
220 Lock(&sockhash->mylock);
221 ptr = sockhash->inuse;
222 sockhash->inuse = ptr->next;
225 ptr->next = sockhash->table[key];
226 sockhash->table[key] = ptr;
227 UnLock(&sockhash->mylock);
231 /***************************************
232 * Array Implementation for socket reuse
233 ****************************************/
237 sock_pool_t *initSockPool(unsigned int *mid, int machines) {
238 sock_pool_t *sockpool;
239 num_machines = machines;
240 if ((sockpool = calloc(num_machines, sizeof(sock_pool_t))) < 0) {
241 printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
245 for (i = 0; i < num_machines; i++) {
246 if ((sockpool[i].sd = calloc(MAX_CONN_PER_MACHINE, sizeof(int))) < 0) {
247 printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
250 if ((sockpool[i].inuse = calloc(MAX_CONN_PER_MACHINE, sizeof(char))) < 0) {
251 printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
254 sockpool[i].mid = mid[i];
256 for(j = 0; j < MAX_CONN_PER_MACHINE; j++) {
257 sockpool[i].sd[j] = -1;
264 int getSock(sock_pool_t *sockpool, unsigned int mid) {
266 for (i = 0; i < num_machines; i++) {
267 if (sockpool[i].mid == mid) {
269 for (j = 0; j < MAX_CONN_PER_MACHINE; j++) {
270 if (sockpool[i].sd[j] != -1 && (sockpool[i].inuse[j] == 0)) {
271 sockpool[i].inuse[j] = 1;
272 return sockpool[i].sd[j];
274 if (sockpool[i].sd[j] == -1) {
277 if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
278 printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
281 struct sockaddr_in remoteAddr;
282 bzero(&remoteAddr, sizeof(remoteAddr));
283 remoteAddr.sin_family = AF_INET;
284 remoteAddr.sin_port = htons(LISTEN_PORT);
285 remoteAddr.sin_addr.s_addr = htonl(mid);
287 if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
288 printf("%s(): Error %d connecting to %s:%d\n", __func__, errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
292 sockpool[i].sd[j] = sd;
293 sockpool[i].inuse[j] = 1;
294 return sockpool[i].sd[j];
297 printf("%s()->Error: Less number of MAX_CONN_PER_MACHINE\n", __func__);
301 printf("%s()-> Error: Machine id not found\n", __func__);
306 int freeSock(sock_pool_t *sockpool, int sd) {
308 for (i = 0; i < num_machines; i++) {
310 for (j = 0; j < MAX_CONN_PER_MACHINE; j++) {
311 if (sockpool[i].sd[j] == sd) {
312 sockpool[i].inuse[j] = 0;
317 printf("%s() Error: Illegal socket descriptor %d\n", __func__, sd);