Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lnet / klnds / iblnd / uagent.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <pthread.h>
4
5
6 #include <linux/shm.h>
7 #include <linux/ipc.h>
8 #include <linux/stat.h>
9 #include <linux/types.h>
10
11 #include <sys/socket.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <unistd.h>
15
16 // Infiniband VAPI/EVAPI header files Mellanox MT23108 VAPI
17 #include <vapi.h>
18 #include <vapi_types.h>
19 #include <vapi_common.h>
20 #include <evapi.h>
21
22 // Remote HCA Info information
23  typedef struct Remote_HCA_Info {
24        unsigned long     opcode;
25        unsigned long     length;
26        IB_lid_t          dlid[256];
27        VAPI_qp_num_t     rqp_num[256];
28        VAPI_rkey_t       rkey;   // for remote RDAM request
29        unsigned long     vaddr1; // virtual address fisrt 4 bytes
30        unsigned long     vaddr2; // virtual address second 4 bytes
31        u_int32_t         size;   // size of RDMA memory buffer
32        char              dest_ip[256]; //destination server IP address 
33  } Remote_HCA_Info;
34
35 #define SHARED_SEGMENT_SIZE  0x10000 // 16KB shared memory between U and K
36
37 // some internals opcodes for IB operations used in IBNAL
38 #define SEND_QP_INFO          0X00000001
39 #define RECV_QP_INFO          0X00000010
40 #define DEFAULT_SOCKET_PORT   11211 
41 #define LISTEN_QUEUE_SIZE     2048 
42 #define DEST_IP               "10.128.105.26"
43
44 // server_thread
45 // + wait for an incoming connection from remote node 
46 // + receive remote HCA's data 
47 //
48 //
49 //
50 //
51 // 
52 void *server_thread(void *vargp)
53 {
54   Remote_HCA_Info   *hca_data;
55   Remote_HCA_Info   hca_data_buffer;
56   
57   int    serverfd;
58   int    infd;
59   struct hostent  *hp;
60   struct sockaddr_in serveraddr;
61   struct sockaddr_in clientaddr;
62   int    sin_size=sizeof(struct sockaddr_in);
63   int    bytes_recv;
64   int    i;
65
66
67   hca_data = (Remote_HCA_Info *) vargp;
68   
69   if((serverfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
70     printf("server_thread couldnot create a socket \n");
71     pthread_exit((void *) 0);
72   }
73  
74   printf("server_thread create a socket \n");
75
76   bzero((char *) &serveraddr, sizeof(serveraddr));
77
78   serveraddr.sin_family = AF_INET;
79   serveraddr.sin_addr.s_addr = htons(INADDR_ANY);
80   serveraddr.sin_port = htons((unsigned short) DEFAULT_SOCKET_PORT);
81   
82   if(bind(serverfd,(struct sockaddr *)&serveraddr,sizeof(struct sockaddr)) < 0) {
83     printf("server_thread couldnot bind to a socket \n");
84     pthread_exit((void *) 0);
85   }
86
87   printf("server_thread bind to a socket \n");
88
89   if(listen(serverfd, LISTEN_QUEUE_SIZE) < 0) {
90     printf("server_thread couldnot listen to a socket \n");
91     pthread_exit((void *) 0);
92   }
93
94   printf("server_thread listen to a socket \n");
95
96   //
97   // I only expect to receive one HCA data from a remote HCA 
98   //
99   printf("server_thread: Waiting for a connection\n");
100   infd= accept(serverfd,(struct sockaddr*)&clientaddr,&sin_size);
101   printf("server_thread: Got an incoming connection");
102
103   /* receive data from socket into buffer */
104   bytes_recv = recv(infd,
105                     &hca_data_buffer,  
106                     sizeof(Remote_HCA_Info),
107                     0);
108
109   if(bytes_recv > 0) {
110 /*        
111       printf("server_thread receive data\n");
112       printf("opcode is 0x%X\n", hca_data_buffer.opcode);
113       printf("length is 0x%X\n", hca_data_buffer.length);
114
115       for(i=0; i < 256; i++) {
116         printf("dlid %d is 0x%X\n", i, hca_data_buffer.dlid[i]);
117         printf("rqp_num %d is 0x%X\n", hca_data_buffer.rqp_num[i]);
118       }
119
120       printf("rkey is 0x%X\n", hca_data_buffer.rkey);
121       printf("vaddr1 is 0x%X\n", hca_data_buffer.vaddr1);
122       printf("vaddr2 is 0x%X\n", hca_data_buffer.vaddr2);
123       printf("size is 0x%X\n", hca_data_buffer.size);
124       printf("After conversion hton \n");
125       printf("opcode is 0x%X\n", htonl(hca_data_buffer.opcode));
126       printf("length is 0x%X\n", htonl(hca_data_buffer.length));
127
128       for(i=0; i < 256; i++) {
129         printf("dlid %d is 0x%X\n", htons(hca_data_buffer.dlid[i]));
130         printf("rqp_num %d is 0x%X\n", htonl(hca_data_buffer.rqp_num[i]));
131       }
132
133       printf("rkey is 0x%X\n", htonl(hca_data_buffer.rkey));
134       printf("vaddr1 is 0x%X\n", htonl(hca_data_buffer.vaddr1));
135       printf("vaddr2 is 0x%X\n", htonl(hca_data_buffer.vaddr2));
136       printf("size is 0x%X\n", htonl(hca_data_buffer.size));
137 */     
138
139       hca_data->opcode  = ntohl(hca_data_buffer.opcode); // long 
140       hca_data->length  = ntohl(hca_data_buffer.length); // long
141
142       for(i=0; i < 256; i++) {
143         hca_data->dlid[i]    = ntohs(hca_data_buffer.dlid[i]);   // u_int16
144         hca_data->rqp_num[i] = ntohl(hca_data_buffer.rqp_num[i]);// u_int32
145       }
146
147       hca_data->rkey    = ntohl(hca_data_buffer.rkey);   // u_int32
148       hca_data->vaddr1  = ntohl(hca_data_buffer.vaddr1); // first word u_int32
149       hca_data->vaddr2  = ntohl(hca_data_buffer.vaddr2); // second word u_int32
150       hca_data->size    = ntohl(hca_data_buffer.size);   // u_int32
151     }
152     else {
153       printf("server_thread receive ERROR bytes_recv = %d\n", bytes_recv);
154     }
155
156     close(infd);
157     close(serverfd);
158
159   printf("server_thread EXIT \n");
160       
161   pthread_exit((void *) 0);
162
163 }
164
165 //
166 // client_thread 
167 // + connect to a remote server_thread
168 // + send local HCA's data to remote server_thread
169 //
170 void *client_thread(void *vargp)
171 {
172
173   Remote_HCA_Info   *hca_data;
174   Remote_HCA_Info   hca_data_buffer;
175
176   int    clientfd;
177   struct hostent  *hp;
178   struct sockaddr_in clientaddr;
179   int    bytes_send;
180   int    i;
181   
182   hca_data = (Remote_HCA_Info *) vargp;
183
184   if((clientfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
185     printf("client_thread couldnot create a socket \n");
186     pthread_exit((void *) 0);
187   }
188  
189   printf("client_thread create a socket \n");
190   
191   bzero((char *) &clientaddr, sizeof(clientaddr));
192
193   clientaddr.sin_family = AF_INET;
194   clientaddr.sin_addr.s_addr = inet_addr(hca_data->dest_ip);
195   printf("client_thread get server Ip address = %s\n", hca_data->dest_ip);
196   clientaddr.sin_port = htons((unsigned short) DEFAULT_SOCKET_PORT);
197   memset(&(clientaddr.sin_zero), '\0', 8);
198
199   connect(clientfd, (struct sockaddr *) &clientaddr, sizeof(struct sockaddr));
200
201   printf("client_thread connect to  server Ip address = %s\n", hca_data->dest_ip);
202
203   hca_data_buffer.opcode  = htonl(hca_data->opcode); // long 
204   hca_data_buffer.length  = htonl(hca_data->length); // long
205
206   for(i=0; i < 256; i++) {
207     hca_data_buffer.dlid[i]    = htons(hca_data->dlid[i]);   // u_int16
208     hca_data_buffer.rqp_num[i] = htonl(hca_data->rqp_num[i]);// u_int32
209   }
210
211   hca_data_buffer.rkey    = htonl(hca_data->rkey);   // u_int32
212   hca_data_buffer.vaddr1  = htonl(hca_data->vaddr1); // first word u_int32
213   hca_data_buffer.vaddr2  = htonl(hca_data->vaddr2); // second word u_int32
214   hca_data_buffer.size    = htonl(hca_data->size);   // u_int32
215  
216   bytes_send = send(clientfd, & hca_data_buffer, sizeof(Remote_HCA_Info), 0); 
217   
218   if(bytes_send == sizeof(Remote_HCA_Info)) {
219     printf("client_thread: send successfully \n");
220   }
221   else {
222     printf("client_thread: send failed \n");
223   }
224
225   printf("client_thread EXIT \n");
226
227   pthread_exit((void *) 0);
228 }
229
230
231 //
232 //  main 
233 //  + create a shared-memory between this main()/user address and
234 //    a kernel thread/kernel address space associated with inbal 
235 //    kernel module 
236 //  + access local HCA's data through this shared memory 
237 //
238 //  + create a server_thread for receiving remote HCA's data
239 //  + create a client_thread for sending out local HCA's data
240 //  + after receiving remote HCA's data update this shared memory
241 //
242 int  main(int argc , char *argv[])
243 {
244   int              segment_id;
245   struct shmid_ds  shmbuffer;
246   int              segment_size;
247   const int        shared_segment_size = sizeof(Remote_HCA_Info);
248   key_t            key = 999;
249   unsigned long    raddr;
250   Remote_HCA_Info  *shared_memory;
251   Remote_HCA_Info  exchange_hca_data;
252   Remote_HCA_Info  remote_hca_data;
253   int i; 
254
255   /* pthread */
256   pthread_t          sid;
257   pthread_t          cid;
258   pthread_attr_t     attr; 
259   int                rc, status;
260
261   char dest_ip[256];
262
263   if(argc != 2) {
264           printf("USAGE:   uagent   server_ip_address\n");
265           printf("argc = %d \n", argc);
266           exit(1);
267   }
268
269   strcpy(&exchange_hca_data.dest_ip[0], argv[1]);
270   printf("the destinational server IP address = %s\n", 
271                                        &exchange_hca_data.dest_ip); 
272
273   segment_id =  shmget(key, shared_segment_size, IPC_CREAT | 0666);
274
275   printf("sys_shmget is done segment_id = %d\n", segment_id);
276
277   shared_memory = (Remote_HCA_Info *) shmat(segment_id, 0, 0);
278
279   if(shared_memory == (char *) -1) {
280     printf("Shared memory attach failed shared_memory=%p\n",shared_memory);
281     exit(0);
282   }
283
284   printf("shared menory attached at address %p\n", shared_memory);
285
286   while (1) {
287     if(shared_memory->opcode ==  SEND_QP_INFO) {
288       printf("Local HCA data received from kernel thread\n");
289       break;
290     }
291     usleep(1000);
292     continue;
293   }
294
295   printf("Local HCA data received from kernel thread\n");
296
297   // save local HCA's data in exchange_hca_data
298   //
299   exchange_hca_data.opcode  = shared_memory->opcode;
300   exchange_hca_data.length  = shared_memory->length;
301
302   for(i=0; i < 256; i++) {
303     exchange_hca_data.dlid[i]    = shared_memory->dlid[i];
304     exchange_hca_data.rqp_num[i] = shared_memory->rqp_num[i];
305   }
306
307   exchange_hca_data.rkey    = shared_memory->rkey;
308   exchange_hca_data.vaddr1  = shared_memory->vaddr1;
309   exchange_hca_data.vaddr2  = shared_memory->vaddr2;
310   exchange_hca_data.size    = shared_memory->size;
311
312   /* Initialize and set thread detached attribute */
313   pthread_attr_init(&attr);
314   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
315
316   /* create a server thread for procsssing incoming remote node socket data */
317   // 
318   pthread_create(&sid, 
319                   &attr, 
320                   server_thread,
321                   (Remote_HCA_Info *) &remote_hca_data);
322
323   printf("Main: created a server thread \n");
324
325   sleep(10);
326   
327   /* create a clint thread to send out local HCA data to remote node */
328   pthread_create(&cid, 
329                   &attr, 
330                   client_thread,
331                   (Remote_HCA_Info *) &exchange_hca_data);
332
333   printf("Main: created a client  thread \n");
334
335   /* synchronization between server_thread and client_thread */
336   pthread_attr_destroy(&attr);
337
338   rc = pthread_join(sid, (void **) &status);
339   if(rc) {
340     printf("Error: return code from pthread_join() is %d\n", rc);
341     exit(-1);
342   }
343
344   printf("completed join with thread %d status = %d\n", sid, status);
345
346   rc = pthread_join(cid, (void **) &status);
347   if(rc) {
348     printf("Error: return code from pthread_join() is %d\n", rc);
349     exit(-1);
350   }
351   printf("completed join with thread %d status = %d\n", cid, status);
352
353   // update shared memory with remote HCA's data 
354
355   shared_memory->opcode = RECV_QP_INFO;
356   shared_memory->length = remote_hca_data.length;
357   for(i=0; i < 256; i++) {
358     shared_memory->dlid[i]   = remote_hca_data.dlid[i];
359     shared_memory->rqp_num[i]= remote_hca_data.rqp_num[i];
360   }
361   shared_memory->rkey   = remote_hca_data.rkey;
362   shared_memory->vaddr1 = remote_hca_data.vaddr1;
363   shared_memory->vaddr2 = remote_hca_data.vaddr2;
364   shared_memory->size   = remote_hca_data.size;
365
366   sleep(5);
367
368   shared_memory->opcode = RECV_QP_INFO;
369   shared_memory->length = remote_hca_data.length;
370   for(i=0; i < 256; i++) {
371     shared_memory->dlid[i]   = remote_hca_data.dlid[i];
372     shared_memory->rqp_num[i]= remote_hca_data.rqp_num[i];
373   }
374   
375   shared_memory->rkey   = remote_hca_data.rkey;
376   shared_memory->vaddr1 = remote_hca_data.vaddr1;
377   shared_memory->vaddr2 = remote_hca_data.vaddr2;
378   shared_memory->size   = remote_hca_data.size;
379
380   sleep(10);
381   
382 //  shmdt(shared_memory);
383    
384   printf("uagent is DONE \n");
385   
386  
387
388   exit(0);
389
390 }
391