Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / portals / knals / ibnal / ibnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Based on ksocknal and qswnal
5  *
6  *  Author: Hsing-bung Chen <hbchen@lanl.gov>
7  *
8  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
9  *
10  *   Portals is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Portals is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Portals; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24
25 #include "ibnal.h"
26
27
28
29
30 RDMA_Info_Exchange   Rdma_nfo;
31 int  Cts_Msg_Arrived = NO;
32
33
34 /*
35  *  LIB functions follow
36  */
37
38 //
39 // read
40 // copy a block of data from scr_addr to dst_addr 
41 // it all happens in kernel space - dst_addr and src_addr 
42 //
43 // original definition is to read a block od data from a 
44 // specified user address  
45 // 
46 // cb_read
47
48 int kibnal_read (nal_cb_t *nal, 
49                  void     *private, 
50                  void     *dst_addr, 
51                  user_ptr src_addr, 
52                  size_t   len)
53 {
54         CDEBUG(D_NET, "kibnal_read: 0x%Lx: reading %ld bytes from %p -> %p\n",
55                nal->ni.nid, (long)len, src_addr, dst_addr );
56
57         memcpy( dst_addr, src_addr, len );
58
59         return 0;
60 }
61
62 //
63 // it seems that read and write are doing the same thing
64 // because they all happen in kernel space 
65 // why do we need two functions like read and write 
66 // to make PORTALS API compatable 
67 //
68
69 //
70 // write 
71 // copy a block of data from scr_addr to dst_addr 
72 // it all happens in kernel space - dst_addr and src_addr 
73 //
74 // original definition is to write a block od data to a 
75 // specified user address  
76 // 
77 // cb_write
78
79 int kibnal_write(nal_cb_t   *nal, 
80                  void       *private, 
81                  user_ptr   dst_addr, 
82                  void       *src_addr, 
83                  size_t     len)
84 {
85         CDEBUG(D_NET, "kibnal_write: 0x%Lx: writing %ld bytes from %p -> %p\n",
86                nal->ni.nid, (long)len, src_addr, dst_addr );
87
88
89         memcpy( dst_addr, src_addr, len );
90
91         return 0;
92 }
93
94 //
95 // malloc
96 //
97 // either vmalloc or kmalloc is used 
98 // dynamically allocate a block of memory based on the size of buffer  
99 //
100 // cb_malloc
101
102 void * kibnal_malloc(nal_cb_t *nal, size_t length)
103 {
104         void *buffer;
105
106         // PORTAL_ALLOC will do the job 
107         // allocate a buffer with size "length"
108         PORTAL_ALLOC(buffer, length);
109
110         return buffer;
111 }
112
113 //
114 // free
115 // release a dynamically allocated memory pointed by buffer pointer 
116 //
117 // cb_free
118
119 void kibnal_free(nal_cb_t *nal, void *buffer, size_t length)
120 {
121         //
122         // release allocated buffer to system 
123         //
124         PORTAL_FREE(buffer, length);
125 }
126
127 //
128 // invalidate 
129 // because evernthing is in kernel space (LUSTRE)
130 // there is no need to mark a piece of user memory as no longer in use by
131 // the system
132 //
133 // cb_invalidate
134
135 void kibnal_invalidate(nal_cb_t      *nal, 
136                               void          *base, 
137                               size_t        extent, 
138                               void          *addrkey)
139 {
140   // do nothing 
141   CDEBUG(D_NET, "kibnal_invalidate: 0x%Lx: invalidating %p : %d\n", 
142                                         nal->ni.nid, base, extent);
143   return;
144 }
145
146
147 //
148 // validate 
149 // because everything is in kernel space (LUSTRE)
150 // there is no need to mark a piece of user memory in use by
151 // the system
152 //
153 // cb_validate
154
155 int kibnal_validate(nal_cb_t        *nal,  
156                            void            *base, 
157                            size_t          extent, 
158                            void            **addrkey)
159 {
160   // do nothing 
161   CDEBUG(D_NET, "kibnal_validate: 0x%Lx: validating %p : %d\n", 
162                                         nal->ni.nid, base, extent);
163
164   return 0;
165 }
166
167
168 //
169 // log messages from kernel space 
170 // printk() is used 
171 //
172 // cb_printf
173
174 void kibnal_printf(nal_cb_t *nal, const char *fmt, ...)
175 {
176         va_list ap;
177         char    msg[256];
178
179         if (portal_debug & D_NET) {
180                 va_start( ap, fmt );
181                 vsnprintf( msg, sizeof(msg), fmt, ap );
182                 va_end( ap );
183
184                 printk("CPUId: %d %s",smp_processor_id(), msg);
185         }
186 }
187
188 //
189 // clear interrupt
190 // use spin_lock to lock protected area such as MD, ME...
191 // so a process can enter a protected area and do some works
192 // this won't physicall disable interrup but use a software 
193 // spin-lock to control some protected areas 
194 //
195 // cb_cli 
196
197 void kibnal_cli(nal_cb_t *nal, unsigned long *flags) 
198
199         kibnal_data_t *data= nal->nal_data;
200
201         CDEBUG(D_NET, "kibnal_cli \n");
202
203         spin_lock_irqsave(&data->kib_dispatch_lock,*flags);
204
205 }
206
207 //
208 // set interrupt
209 // use spin_lock to unlock protected area such as MD, ME...
210 // this won't physicall enable interrup but use a software 
211 // spin-lock to control some protected areas 
212 //
213 // cb_sti
214
215 void kibnal_sti(nal_cb_t *nal, unsigned long *flags)
216 {
217         kibnal_data_t *data= nal->nal_data;
218
219         CDEBUG(D_NET, "kibnal_sti \n");
220
221         spin_unlock_irqrestore(&data->kib_dispatch_lock,*flags);
222 }
223
224
225
226 //
227 // nic distance 
228 // 
229 // network distance doesn't mean much for this nal 
230 // here we only indicate 
231 //      0 - operation is happened on the same node 
232 //      1 - operation is happened on different nodes 
233 //          router will handle the data routing 
234 //
235 // cb_dist
236
237 int kibnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
238 {
239         CDEBUG(D_NET, "kibnal_dist \n");
240
241         if ( nal->ni.nid == nid ) {
242                 *dist = 0;
243         } 
244         else {
245                 *dist = 1;
246         }
247
248         return 0; // always retrun 0 
249 }
250
251
252 //
253 // This is the cb_send() on IB based interconnect system
254 // prepare a data package and use VAPI_post_sr() to send it
255 // down-link out-going message 
256 //
257
258
259 int
260 kibnal_send(nal_cb_t        *nal,
261             void            *private,
262             lib_msg_t       *cookie,
263             ptl_hdr_t       *hdr,
264             int              type,
265             ptl_nid_t        nid,
266             ptl_pid_t        pid,
267             unsigned int     niov,
268             ptl_kiov_t      *iov,
269             size_t           len)
270 {
271         
272         int           rc=0;
273         void         *buf = NULL; 
274         unsigned long buf_length = sizeof(ptl_hdr_t) + len;
275         int           expected_buf_size = 0;
276         VAPI_ret_t    vstat;
277
278         PROF_START(kibnal_send); // time stamp send start 
279
280         CDEBUG(D_NET,"kibnal_send: sending %d bytes from %p to nid: 0x%Lx pid %d\n",
281                buf_length, iov, nid, HCA_PORT_1);
282
283
284         // do I need to check the gateway information
285         // do I have problem to send direct 
286         // do I have to forward a data packet to gateway
287         // 
288         // The current connection is back-to-back 
289         // I always know that data will be send from one-side to
290         // the other side
291         //
292         
293         //
294         //  check data buffer size 
295         //
296         //  MSG_SIZE_SMALL 
297         //      regular post send 
298         //  
299         //  MSG_SIZE_LARGE
300         //      rdma write
301         
302         if(buf_length <= SMALL_MSG_SIZE) {  
303            expected_buf_size = MSG_SIZE_SMALL;
304         } 
305         else { 
306           if(buf_length > MAX_MSG_SIZE) { 
307              CERROR("kibnal_send:request exceeds Transmit data size (%d).\n",
308                       MAX_MSG_SIZE);
309              rc = -1;
310              return rc;
311           }
312           else {
313              expected_buf_size = MSG_SIZE_LARGE; // this is a large data package 
314           } 
315         }
316                 
317         // prepare data packet for send operation 
318         //
319         // allocate a data buffer "buf" with size of buf_len(header + payload)
320         //                 ---------------
321         //  buf            | hdr         |  size = sizeof(ptl_hdr_t)
322         //                 --------------
323         //                 |payload data |  size = len
324         //                 ---------------
325         
326         // copy header to buf 
327         memcpy(buf, hdr, sizeof(ptl_hdr_t));
328
329         // copy payload data from iov to buf
330         // use portals library function lib_copy_iov2buf()
331         
332         if (len != 0)
333            lib_copy_iov2buf(((char *)buf) + sizeof (ptl_hdr_t),
334                             niov, 
335                             iov, 
336                             len);
337
338         // buf is ready to do a post send 
339         // the send method is base on the buf_size 
340
341         CDEBUG(D_NET,"ib_send %d bytes (size %d) from %p to nid: 0x%Lx "
342                " port %d\n", buf_length, expected_buf_size, iov, nid, HCA_PORT_1);
343
344         switch(expected_buf_size) {
345           case MSG_SIZE_SMALL:
346             // send small message 
347             if((vstat = Send_Small_Msg(buf, buf_length)) != VAPI_OK){
348                 CERROR("Send_Small_Msg() is failed\n");
349             } 
350             break;
351
352           case MSG_SIZE_LARGE:
353             // send small message 
354             if((vstat = Send_Large_Msg(buf, buf_length)) != VAPI_OK){
355                 CERROR("Send_Large_Msg() is failed\n");
356             } 
357             break;
358
359           default:
360             CERROR("Unknown message size %d\n", expected_buf_size);
361             break;
362         }
363
364         PROF_FINISH(kibnal_send); // time stapm of send operation 
365
366         rc = 1;
367
368         return rc; 
369 }
370
371 //
372 // kibnal_send_pages
373 //
374 // no support 
375 //
376 // do you need this 
377 //
378 int kibnal_send_pages(nal_cb_t * nal, 
379                       void *private, 
380                       lib_msg_t * cookie,
381                       ptl_hdr_t * hdr, 
382                       int type, 
383                       ptl_nid_t nid, 
384                       ptl_pid_t pid,
385                       unsigned int niov, 
386                       ptl_kiov_t *iov, 
387                       size_t mlen)
388 {
389    int rc = 1;
390
391    CDEBUG(D_NET, "kibnal_send_pages\n");
392
393    // do nothing now for Infiniband 
394    
395    return rc;
396 }
397
398
399
400
401
402 //
403 // kibnal_fwd_packet 
404 //
405 // no support 
406 //
407 // do you need this 
408 //
409 void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
410 {
411         CDEBUG(D_NET, "forwarding not implemented\n");
412         return;
413       
414 }
415
416 //
417 // kibnal_callback 
418 //
419 // no support 
420 //
421 // do you need this 
422 //
423 int kibnal_callback(nal_cb_t * nal, 
424                            void *private, 
425                            lib_eq_t *eq,
426                            ptl_event_t *ev)
427 {
428         CDEBUG(D_NET,  "callback not implemented\n");
429         return PTL_OK;
430 }
431
432
433 /* Process a received portals packet */
434 //
435 //  conver receiving data in to PORTALS header 
436 //
437
438 void kibnal_rx(kibnal_data_t    *kib, 
439                       VAPI_virt_addr_t buffer_addr,
440                       u_int32_t        buffer_len,
441                       u_int32_t        buffer_size,
442                       unsigned int     priority) 
443 {
444         ptl_hdr_t  *hdr = (ptl_hdr_t *)  buffer_addr; // case to ptl header format 
445         kibnal_rx_t krx;
446
447         CDEBUG(D_NET,"kibnal_rx: buf %p, len %ld\n", buffer_addr, buffer_len);
448
449         if ( buffer_len < sizeof( ptl_hdr_t ) ) {
450                 /* XXX what's this for? */
451                 if (kib->kib_shuttingdown)
452                         return;
453                 CERROR("kibnal_rx: did not receive complete portal header, "
454                        "len= %ld", buffer_len);
455
456                 return;
457         }
458
459        // typedef struct {
460        //         char             *krx_buffer; // pointer to receiving buffer
461        //         unsigned long     krx_len;  // length of buffer
462        //         unsigned int      krx_size; //
463        //         unsigned int      krx_priority; // do we need this
464        //         struct list_head  krx_item;
465        // } kibnal_rx_t;
466        //
467         krx.krx_buffer    = hdr;
468         krx.krx_len       = buffer_len;
469         krx.krx_size      = buffer_size;
470         krx.krx_priority  = priority;
471
472         if ( hdr->dest_nid == kibnal_lib.ni.nid ) {
473            // this is my data 
474            PROF_START(lib_parse);
475
476            lib_parse(&kibnal_lib, (ptl_hdr_t *)krx.krx_buffer, &krx);
477
478            PROF_FINISH(lib_parse);
479         } else {
480            /* forward to gateway */
481            // Do we expect this happened ?
482            //      
483            CERROR("kibnal_rx: forwarding not implemented yet");
484         }
485
486         return;
487 }
488
489
490
491
492 //
493 // kibnal_recv_pages 
494 //
495 // no support 
496 //
497 // do you need this 
498 //
499 int
500 kibnal_recv_pages(nal_cb_t * nal, 
501                   void *private, 
502                   lib_msg_t * cookie,
503                   unsigned int niov, 
504                   ptl_kiov_t *iov, 
505                   size_t mlen,
506                   size_t rlen)
507 {
508
509   CDEBUG(D_NET, "recv_pages not implemented\n");
510   return PTL_OK;
511        
512 }
513
514
515 int 
516 kibnal_recv(nal_cb_t     *nal,
517             void         *private,
518             lib_msg_t    *cookie,
519             unsigned int  niov,
520             struct iovec *iov,
521             size_t        mlen,
522             size_t        rlen)
523 {
524         kibnal_rx_t *krx = private;
525
526         CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen);
527
528         /* What was actually received must be >= what sender claims to
529          * have sent.  This is an LASSERT, since lib-move doesn't
530          * check cb return code yet. */
531         LASSERT (krx->krx_len >= sizeof (ptl_hdr_t) + rlen);
532         LASSERT (mlen <= rlen);
533
534         PROF_START(kibnal_recv);
535
536         if(mlen != 0) {
537                 PROF_START(memcpy);
538                 lib_copy_buf2iov (niov, iov, krx->krx_buffer +
539                                   sizeof (ptl_hdr_t), mlen);
540                 PROF_FINISH(memcpy);
541         }
542
543         PROF_START(lib_finalize);
544         
545         lib_finalize(nal, private, cookie);
546         
547         PROF_FINISH(lib_finalize);
548         PROF_FINISH(kibnal_recv);
549
550         return rlen;
551 }
552
553 //
554 // kibnal_map 
555 // no support 
556 // do you need this 
557 //
558 int kibnal_map(nal_cb_t * nal, 
559                unsigned int niov, 
560                struct iovec *iov,
561                void **addrkey)
562 {
563   CDEBUG(D_NET, "map not implemented\n");
564   return PTL_OK; 
565 }
566
567
568
569 //
570 // kibnal_unmap
571 //
572 // no support 
573 //
574 // do you need this 
575 //
576 void kibnal_unmap(nal_cb_t * nal, 
577                   unsigned int niov, 
578                   struct iovec *iov,
579                   void **addrkey)
580 {
581   CDEBUG(D_NET, "unmap not implemented\n");
582   return;
583 }
584
585
586
587 //
588 // kibnal_map_pages 
589 // no support 
590 // do you need this 
591 /* as (un)map, but with a set of page fragments */
592 int kibnal_map_pages(nal_cb_t * nal, 
593                      unsigned int niov, 
594                      ptl_kiov_t *iov,
595                      void **addrkey)
596 {
597   CDEBUG(D_NET, "map_pages not implemented\n");
598   return PTL_OK;
599 }
600
601
602
603 //
604 // kibnal_unmap_pages 
605 //
606 // no support 
607 //
608 // do you need this 
609 //
610 void kibnal_unmap_pages(nal_cb_t * nal, 
611                                unsigned int niov, 
612                                ptl_kiov_t *iov,
613                                void **addrkey)
614 {
615   CDEBUG(D_NET, "unmap_pages not implemented\n");
616   return ;
617 }
618
619
620 int kibnal_end(kibnal_data_t *kib)
621 {
622
623   /* wait for sends to finish ? */
624   /* remove receive buffers */
625   /* shutdown receive thread */
626
627   CDEBUG(D_NET, "kibnal_end\n");
628   IB_Close_HCA();
629
630   return 0;
631 }
632
633
634 //
635 //
636 //  asynchronous event handler: response to some unexpetced operation errors 
637 //    
638 //  void async_event_handler(VAPI_hca_hndl_t      hca_hndl,
639 //                           VAPI_event_record_t *event_record_p,
640 //                           void*                private_data)
641 //  the HCA drive will prepare evetn_record_p                        
642 //
643 //  this handler is registered with VAPI_set_async_event_handler()
644 //  VAPI_set_async_event_handler() is issued when an HCA is created 
645 //
646 //
647 void async_event_handler(VAPI_hca_hndl_t      hca_hndl,
648                          VAPI_event_record_t *event_record_p,  
649                          void*                private_data)
650 {
651   //
652   // * event_record_p is prepared by the system when an async
653   //   event happened
654   // * what to do with private_data 
655   // * do we expect more async events happened if so what are they 
656   //
657   //   only log ERROR message now 
658
659   switch (event_record_p->type) {
660     case VAPI_PORT_ERROR:
661          printk("Got PORT_ERROR event. port number=%d\n", 
662                  event_record_p->modifier.port_num);
663          break;
664     case VAPI_PORT_ACTIVE:
665          printk("Got PORT_ACTIVE event. port number=%d\n", 
666                  event_record_p->modifier.port_num);
667          break;
668     case VAPI_QP_PATH_MIGRATED:    /*QP*/
669          printk("Got P_PATH_MIGRATED event. qp_hndl=%lu\n", 
670                  event_record_p->modifier.qp_hndl);
671          break;
672     case VAPI_EEC_PATH_MIGRATED:   /*EEC*/
673          printk("Got EEC_PATH_MIGRATED event. eec_hndl=%d\n", 
674                  event_record_p->modifier.eec_hndl);
675          break;
676     case VAPI_QP_COMM_ESTABLISHED: /*QP*/
677          printk("Got QP_COMM_ESTABLISHED event. qp_hndl=%lu\n", 
678                  event_record_p->modifier.qp_hndl);
679          break;
680     case VAPI_EEC_COMM_ESTABLISHED: /*EEC*/
681          printk("Got EEC_COMM_ESTABLISHED event. eec_hndl=%d\n",
682                  event_record_p->modifier.eec_hndl);
683          break;
684     case VAPI_SEND_QUEUE_DRAINED:  /*QP*/
685          printk("Got SEND_QUEUE_DRAINED event. qp_hndl=%lu\n", 
686                  event_record_p->modifier.qp_hndl);
687          break;
688     case VAPI_CQ_ERROR:            /*CQ*/
689          printk("Got CQ_ERROR event. cq_hndl=%lu\n", 
690                  event_record_p->modifier.cq_hndl);
691          break;
692     case VAPI_LOCAL_WQ_INV_REQUEST_ERROR: /*QP*/
693          printk("Got LOCAL_WQ_INV_REQUEST_ERROR event. qp_hndl=%lu\n", 
694                  event_record_p->modifier.qp_hndl);
695          break;
696     case VAPI_LOCAL_WQ_ACCESS_VIOL_ERROR: /*QP*/
697          printk("Got LOCAL_WQ_ACCESS_VIOL_ERROR event. qp_hndl=%lu\n", 
698                  event_record_p->modifier.qp_hndl);
699          break;
700     case VAPI_LOCAL_WQ_CATASTROPHIC_ERROR: /*QP*/
701          printk("Got LOCAL_WQ_CATASTROPHIC_ERROR event. qp_hndl=%lu\n", 
702                  event_record_p->modifier.qp_hndl);
703          break;
704     case VAPI_PATH_MIG_REQ_ERROR:  /*QP*/
705          printk("Got PATH_MIG_REQ_ERROR event. qp_hndl=%lu\n", 
706                  event_record_p->modifier.qp_hndl);
707          break;
708     case VAPI_LOCAL_CATASTROPHIC_ERROR: /*none*/
709          printk("Got LOCAL_CATASTROPHIC_ERROR event. \n");
710          break;
711     default:
712          printk(":got non-valid event type=%d. IGNORING\n",
713                     event_record_p->type);
714   }
715
716 }
717
718
719
720
721 VAPI_wr_id_t 
722 search_send_buf(int buf_length)
723 {
724   VAPI_wr_id_t send_id = -1;
725   u_int32_t    i;
726   int          flag = NO;
727   int          loop_count = 0;  
728
729   CDEBUG(D_NET, "search_send_buf \n");
730   
731   while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
732     for(i=0; i < NUM_ENTRY; i++) {
733       // problem about using spinlock
734       spin_lock(&MSB_mutex[i]);
735       if(MSbuf_list[i].status == BUF_REGISTERED)  {
736         MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
737         flag =  YES;
738         spin_unlock(&MSB_mutex[i]);
739         break;
740       }
741       else
742         spin_unlock(&MSB_mutex[i]); 
743     }
744
745     loop_count++;
746     schedule_timeout(200); // wait for a while 
747   }
748    
749   if(flag == NO)  {
750     CDEBUG(D_NET, "search_send_buf: could not locate an entry in MSbuf_list\n");
751   }
752
753   send_id = (VAPI_wr_id_t ) i;
754
755   return send_id;
756 }
757
758
759
760 VAPI_wr_id_t 
761 search_RDMA_recv_buf(int buf_length)
762 {
763   VAPI_wr_id_t recv_id = -1;
764   u_int32_t    i;
765   int          flag = NO;
766   int          loop_count = 0;  
767
768   CDEBUG(D_NET, "search_RDMA_recv_buf\n");
769
770   while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
771
772     for(i=NUM_ENTRY; i < NUM_MBUF; i++) {
773
774       spin_lock(&MSB_mutex[i]);
775
776       if((MRbuf_list[i].status == BUF_REGISTERED)  &&
777          (MRbuf_list[i].buf_size >= buf_length)) {
778           MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
779           flag =  YES;
780           spin_unlock(&MSB_mutex[i]);
781           break;
782       }
783       else
784         spin_unlock(&MSB_mutex[i]);
785     }
786
787     loop_count++;
788
789     schedule_timeout(200); // wait for a while 
790   }
791    
792   if(flag == NO)  {
793     CERROR("search_RDMA_recv_buf: could not locate an entry in MBbuf_list\n");
794   }
795
796   recv_id = (VAPI_wr_id_t ) i;
797
798   return recv_id;
799
800 }
801
802
803
804
805
806
807
808 VAPI_ret_t Send_Small_Msg(char *buf, int buf_length)
809 {
810  VAPI_ret_t           vstat;
811  VAPI_sr_desc_t       sr_desc;
812  VAPI_sg_lst_entry_t  sr_sg;
813  QP_info              *qp;
814  VAPI_wr_id_t         send_id;
815
816  CDEBUG(D_NET, "Send_Small_Msg\n");
817
818  send_id = search_send_buf(buf_length); 
819
820  if(send_id < 0){
821    CERROR("Send_Small_Msg: Can not find a QP \n");
822    return(~VAPI_OK);
823  }
824
825  qp = &QP_list[(int) send_id];
826
827  // find a suitable/registered send_buf from MSbuf_list
828  CDEBUG(D_NET, "Send_Small_Msg: current send id  %d \n", send_id);
829
830  sr_desc.opcode    = VAPI_SEND;
831  sr_desc.comp_type = VAPI_SIGNALED;
832  sr_desc.id        =  send_id;
833
834
835  // scatter and gather info 
836  sr_sg.len  = buf_length;
837  sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR 
838
839  sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
840
841  // copy data to register send buffer 
842  memcpy(&sr_sg.addr, buf, buf_length);
843
844  sr_desc.sg_lst_p = &sr_sg;
845  sr_desc.sg_lst_len = 1; // only 1 entry is used 
846  sr_desc.fence = TRUE;
847  sr_desc.set_se = FALSE;
848
849  // call VAPI_post_sr to send out this data 
850  vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
851
852  if (vstat != VAPI_OK) {
853     CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
854  }
855
856  CDEBUG(D_NET, "VAPI_post_sr success.\n");
857
858  return (vstat);
859
860 }
861
862
863
864
865 VAPI_wr_id_t
866 RTS_handshaking_protocol(int buf_length) 
867 {
868
869  VAPI_ret_t           vstat;
870  VAPI_sr_desc_t       sr_desc;
871  VAPI_sg_lst_entry_t  sr_sg;
872  VAPI_wr_id_t         send_id;
873
874  RDMA_Info_Exchange   rdma_info;
875
876  rdma_info.opcode     = Ready_To_send;
877  rdma_info.buf_length = buf_length; 
878  rdma_info.raddr      = (VAPI_virt_addr_t) 0;
879  rdma_info.rkey       = (VAPI_rkey_t) 0 ; 
880
881  QP_info              *qp;
882
883  CDEBUG(D_NET, "RTS_handshaking_protocol\n");
884
885  // find a suitable/registered send_buf from MSbuf_list
886  send_id = search_send_buf(sizeof(RDMA_Info_Exchange));   
887
888  qp = &QP_list[(int) send_id];
889
890  CDEBUG(D_NET, "RTS_CTS: current send id  %d \n", send_id);
891  sr_desc.opcode    = VAPI_SEND;
892  sr_desc.comp_type = VAPI_SIGNALED;
893  sr_desc.id        = send_id + RDMA_RTS_ID;// this RTS mesage ID 
894
895  // scatter and gather info 
896  sr_sg.len  = sizeof(RDMA_Info_Exchange);
897  sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR 
898  sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
899
900  // copy data to register send buffer 
901  memcpy(&sr_sg.addr, &rdma_info, sizeof(RDMA_Info_Exchange));
902
903  sr_desc.sg_lst_p = &sr_sg;
904  sr_desc.sg_lst_len = 1; // only 1 entry is used 
905  sr_desc.fence = TRUE;
906  sr_desc.set_se = FALSE;
907
908  // call VAPI_post_sr to send out this RTS message data 
909  vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
910
911  if (vstat != VAPI_OK) {
912     CERROR("RTS: VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
913  }
914
915  return send_id;
916
917 }
918
919
920
921 // create local receiving Memory Region for a HCA
922 VAPI_ret_t
923 createMemRegion_RDMA(VAPI_hca_hndl_t  hca_hndl,
924                      VAPI_pd_hndl_t   pd_hndl,
925                      char            *bufptr,
926                      int              buf_length,
927                      VAPI_mr_hndl_t   *rep_mr_hndl,
928                      VAPI_mrw_t       *rep_mr)
929 {
930   VAPI_ret_t      vstat;
931   VAPI_mrw_t      mrw;
932   
933   CDEBUG(D_NET, "createMemRegion_RDMA\n");
934
935   // memory region address and size of memory region
936   // allocate a block of memory for this HCA 
937   // RDMA data buffer
938   
939   
940   if(bufptr == NULL) {
941     // need to allcate a local buffer to receive data from a
942     // remore VAPI_RDMA_WRITE_IMM
943     PORTAL_ALLOC(bufptr, buf_length);
944   }
945
946   if(bufptr == NULL) {
947     CDEBUG(D_MALLOC, "Failed to malloc a block of RDMA receiving memory, size %d\n",
948                                     buf_length);
949     return(VAPI_ENOMEM);
950   }
951
952   /* Register RDAM data Memory region */
953   CDEBUG(D_NET, "Register a RDMA data memory region\n");
954
955   mrw.type   = VAPI_MR;
956   mrw.pd_hndl= pd_hndl;
957   mrw.start  = (VAPI_virt_addr_t )(MT_virt_addr_t )bufptr;
958   mrw.size   = buf_length;
959   mrw.acl    = VAPI_EN_LOCAL_WRITE  | 
960                VAPI_EN_REMOTE_WRITE | 
961                VAPI_EN_REMOTE_READ;
962
963   // register send memory region
964   vstat = VAPI_register_mr(hca_hndl,
965                            &mrw,
966                            rep_mr_hndl,
967                            rep_mr);
968
969   // this memory region is going to be reused until deregister is called
970   if (vstat != VAPI_OK) {
971      CERROR("Failed registering a mem region Addr=%p, Len=%d. %s\n",
972              bufptr, buf_length, VAPI_strerror(vstat));
973   }
974
975   return(vstat);
976
977 }
978
979
980
981 RDMA_Info_Exchange  Local_rdma_info;
982
983 int insert_MRbuf_list(int buf_lenght)
984 {
985   int  recv_id = NUM_ENTRY;      
986
987   CDEBUG(D_NET, "insert_MRbuf_list\n");
988
989   for(recv_id= NUM_ENTRY; recv_id < NUM_MBUF; recv_id++){
990        if(BUF_UNREGISTERED == MRbuf_list[recv_id].status)  {
991          MRbuf_list[recv_id].status   = BUF_UNREGISTERED;
992          MRbuf_list[recv_id].buf_size = buf_lenght;
993          break;
994        }
995   }
996
997   return recv_id;
998
999 }  
1000
1001 VAPI_wr_id_t
1002 CTS_handshaking_protocol(RDMA_Info_Exchange *rdma_info) 
1003 {
1004
1005  VAPI_ret_t           vstat;
1006  VAPI_sr_desc_t       sr_desc;
1007  VAPI_sg_lst_entry_t  sr_sg;
1008  QP_info             *qp;
1009  VAPI_wr_id_t         send_id;
1010  VAPI_mr_hndl_t       rep_mr_hndl;
1011  VAPI_mrw_t           rep_mr;
1012  int                  recv_id;
1013  char                *bufptr = NULL;
1014
1015  // search MRbuf_list for an available entry that
1016  // has registered data buffer with size equal to rdma_info->buf_lenght
1017
1018  CDEBUG(D_NET, "CTS_handshaking_protocol\n");
1019
1020  // register memory buffer for RDAM operation
1021
1022  vstat = createMemRegion_RDMA(Hca_hndl,
1023                               Pd_hndl,
1024                               bufptr, 
1025                               rdma_info->buf_length,
1026                               &rep_mr_hndl,
1027                               &rep_mr);
1028
1029
1030  Local_rdma_info.opcode            = Clear_To_send;
1031  Local_rdma_info.recv_rdma_mr      = rep_mr;
1032  Local_rdma_info.recv_rdma_mr_hndl = rep_mr_hndl;
1033
1034  if (vstat != VAPI_OK) {
1035     CERROR("CST_handshaking_protocol: Failed registering a mem region"
1036            "Len=%d. %s\n", rdma_info->buf_length, VAPI_strerror(vstat));
1037     Local_rdma_info.flag = RDMA_BUFFER_UNAVAILABLE;
1038  }
1039  else {
1040     // successfully allcate reserved RDAM data buffer 
1041     recv_id = insert_MRbuf_list(rdma_info->buf_length);   
1042
1043     if(recv_id >=  NUM_ENTRY) { 
1044       MRbuf_list[recv_id].buf_addr     = rep_mr.start;
1045       MRbuf_list[recv_id].mr           = rep_mr;
1046       MRbuf_list[recv_id].mr_hndl      = rep_mr_hndl;
1047       MRbuf_list[recv_id].ref_count    = 0;
1048       Local_rdma_info.flag             = RDMA_BUFFER_RESERVED;
1049       Local_rdma_info.buf_length       = rdma_info->buf_length; 
1050       Local_rdma_info.raddr            = rep_mr.start;
1051       Local_rdma_info.rkey             = rep_mr.r_key; 
1052     }
1053     else {
1054       CERROR("Can not find an entry in MRbuf_list - how could this happen\n");  
1055     }
1056  }
1057
1058  // find a suitable/registered send_buf from MSbuf_list
1059  send_id = search_send_buf(sizeof(RDMA_Info_Exchange)); 
1060  CDEBUG(D_NET, "CTS: current send id  %d \n", send_id);
1061  sr_desc.opcode    = VAPI_SEND;
1062  sr_desc.comp_type = VAPI_SIGNALED;
1063  sr_desc.id        = send_id + RDMA_CTS_ID; // this CST message ID 
1064
1065  // scatter and gather info 
1066  sr_sg.len  = sizeof(RDMA_Info_Exchange);
1067  sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR 
1068  sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
1069
1070  // copy data to register send buffer 
1071  memcpy(&sr_sg.addr, &Local_rdma_info, sizeof(RDMA_Info_Exchange));
1072
1073  sr_desc.sg_lst_p   = &sr_sg;
1074  sr_desc.sg_lst_len = 1; // only 1 entry is used 
1075  sr_desc.fence = TRUE;
1076  sr_desc.set_se = FALSE;
1077
1078  // call VAPI_post_sr to send out this RTS message data 
1079  vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1080
1081  if (vstat != VAPI_OK) {
1082     CERROR("CTS: VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
1083  }
1084
1085
1086 }
1087
1088
1089
1090 VAPI_ret_t Send_Large_Msg(char *buf, int buf_length)
1091 {
1092   VAPI_ret_t           vstat;
1093   VAPI_sr_desc_t       sr_desc;
1094   VAPI_sg_lst_entry_t  sr_sg;
1095   QP_info             *qp;
1096   VAPI_mrw_t           rep_mr; 
1097   VAPI_mr_hndl_t       rep_mr_hndl;
1098   int                  send_id;
1099   VAPI_imm_data_t      imm_data = 0XAAAA5555;
1100
1101
1102   CDEBUG(D_NET, "Send_Large_Msg: Enter\n");
1103
1104   // register this large buf 
1105   // don't need to copy this buf to send buffer
1106   vstat = createMemRegion_RDMA(Hca_hndl,
1107                                Pd_hndl,
1108                                buf,
1109                                buf_length,
1110                                &rep_mr_hndl,
1111                                &rep_mr);
1112
1113   if (vstat != VAPI_OK) {
1114     CERROR("Send_Large_M\sg:  createMemRegion_RDMAi() failed (%s).\n",
1115                         VAPI_strerror(vstat));
1116   }
1117   
1118
1119   Local_rdma_info.send_rdma_mr      = rep_mr;
1120   Local_rdma_info.send_rdma_mr_hndl = rep_mr_hndl;
1121
1122   //
1123   //     Prepare descriptor for send queue
1124   //
1125  
1126   // ask for a remote rdma buffer with size buf_lenght
1127   send_id = RTS_handshaking_protocol(buf_length); 
1128
1129   qp = &QP_list[send_id];
1130
1131   // wait for CTS message receiving from remote node 
1132   while(1){
1133      if(YES == Cts_Message_arrived) {
1134         // receive CST message from remote node 
1135         // Rdma_info is available for use
1136         break;
1137      }
1138      schedule_timeout(RTS_CTS_TIMEOUT);
1139   }
1140   
1141   sr_desc.id        = send_id + RDMA_OP_ID;
1142   sr_desc.opcode    = VAPI_RDMA_WRITE_WITH_IMM;
1143   sr_desc.comp_type = VAPI_SIGNALED;
1144
1145   // scatter and gather info 
1146   sr_sg.len  = buf_length;
1147
1148   // rdma mr 
1149   sr_sg.lkey = rep_mr.l_key;  
1150   sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) rep_mr.start;
1151   sr_desc.sg_lst_p = &sr_sg;
1152   sr_desc.sg_lst_len = 1; // only 1 entry is used 
1153
1154   // immediate data - not used here 
1155   sr_desc.imm_data = imm_data;
1156   sr_desc.fence = TRUE;
1157   sr_desc.set_se = FALSE;
1158
1159   // RDAM operation only
1160   // raddr and rkey is receiving from remote node  
1161   sr_desc.remote_addr = Rdma_info.raddr;
1162   sr_desc.r_key       = Rdma_info.rkey;
1163
1164   // call VAPI_post_sr to send out this data 
1165   vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1166
1167   if (vstat != VAPI_OK) {
1168      CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
1169   }
1170
1171 }
1172
1173
1174
1175
1176
1177
1178 //
1179 //  repost_recv_buf
1180 //  post a used recv buffer back to recv WQE list 
1181 //  wrq_id is used to indicate the starting position of recv-buffer 
1182 //
1183 VAPI_ret_t 
1184 repost_recv_buf(QP_info      *qp,
1185                 VAPI_wr_id_t  wrq_id) 
1186 {
1187   VAPI_rr_desc_t       rr;
1188   VAPI_sg_lst_entry_t  sg_entry;
1189   VAPI_ret_t           ret;
1190
1191   CDEBUG(D_NET, "repost_recv_buf\n");
1192
1193   sg_entry.lkey = MRbuf_list[wrq_id].mr.l_key;
1194   sg_entry.len  = MRbuf_list[wrq_id].buf_size;
1195   sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[wrq_id].buf_addr;
1196   rr.opcode     = VAPI_RECEIVE;
1197   rr.comp_type  = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1198   rr.sg_lst_len = 1; /* single buffers */
1199   rr.sg_lst_p   = &sg_entry;
1200   rr.id         = wrq_id; /* WQE id used is the index to buffers ptr array */
1201
1202   ret= VAPI_post_rr(qp->hca_hndl,qp->qp_hndl,&rr);
1203      
1204   if (ret != VAPI_OK){
1205      CERROR("failed reposting RQ WQE (%s) buffer \n",VAPI_strerror_sym(ret));
1206      return ret;
1207   }
1208
1209   CDEBUG(D_NET, "Successfully reposting an RQ WQE %d recv bufer \n", wrq_id);
1210
1211   return ret ;
1212 }
1213                         
1214 //
1215 // post_recv_bufs
1216 //      post "num_o_bufs" for receiving data
1217 //      each receiving buf (buffer starting address, size of buffer)
1218 //      each buffer is associated with an id 
1219 //
1220 int 
1221 post_recv_bufs(VAPI_wr_id_t  start_id)
1222 {
1223   int i;
1224   VAPI_rr_desc_t       rr;
1225   VAPI_sg_lst_entry_t  sg_entry;
1226   VAPI_ret_t           ret;
1227
1228   CDEBUG(D_NET, "post_recv_bufs\n");
1229
1230   for(i=0; i< NUM_ENTRY; i++) {
1231     sg_entry.lkey = MRbuf_list[i].mr.l_key;
1232     sg_entry.len  = MRbuf_list[i].buf_size;
1233     sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[i].buf_addr;
1234     rr.opcode     = VAPI_RECEIVE;
1235     rr.comp_type  = VAPI_SIGNALED;  /* All with CQE (IB compliant) */
1236     rr.sg_lst_len = 1; /* single buffers */
1237     rr.sg_lst_p   = &sg_entry;
1238     rr.id         = start_id+i; /* WQE id used is the index to buffers ptr array */
1239
1240     ret= VAPI_post_rr(QP_list[i].hca_hndl,QP_list[i].qp_hndl, &rr);
1241     if (ret != VAPI_OK) {
1242        CERROR("failed posting RQ WQE (%s)\n",VAPI_strerror_sym(ret));
1243        return i;
1244     } 
1245   }
1246
1247   return i; /* num of buffers posted */
1248 }
1249                         
1250 int 
1251 post_RDMA_bufs(QP_info      *qp, 
1252                void         *buf_array,
1253                unsigned int  num_bufs,
1254                unsigned int  buf_size,
1255                VAPI_wr_id_t  start_id)
1256 {
1257
1258   CDEBUG(D_NET, "post_RDMA_bufs \n");
1259   return YES;
1260 }
1261
1262
1263
1264 //
1265 // LIB NAL
1266 // assign function pointers to theirs corresponding entries
1267 //
1268
1269 nal_cb_t kibnal_lib = {
1270         nal_data:       &kibnal_data,  /* NAL private data */
1271         cb_send:        kibnal_send,
1272         cb_send_pages:  NULL, // not implemented  
1273         cb_recv:        kibnal_recv,
1274         cb_recv_pages:  NULL, // not implemented 
1275         cb_read:        kibnal_read,
1276         cb_write:       kibnal_write,
1277         cb_callback:    NULL, // not implemented 
1278         cb_malloc:      kibnal_malloc,
1279         cb_free:        kibnal_free,
1280         cb_map:         NULL, // not implemented 
1281         cb_unmap:       NULL, // not implemented 
1282         cb_map_pages:   NULL, // not implemented 
1283         cb_unmap_pages: NULL, // not implemented 
1284         cb_printf:      kibnal_printf,
1285         cb_cli:         kibnal_cli,
1286         cb_sti:         kibnal_sti,
1287         cb_dist:        kibnal_dist // no used at this moment 
1288 };