Whamcloud - gitweb
file xnu_types.h was initially added on branch b_port_step.
[fs/lustre-release.git] / lnet / klnds / iblnd / ibnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Based on ksocknal and qswnal
5  *
6  *  Author: Hsing-bung Chen <hbchen@lanl.gov>
7  *
8  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
9  *
10  *   Portals is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Portals is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Portals; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24
25 #include "ibnal.h"
26
27
28
29
30 RDMA_Info_Exchange   Rdma_nfo;
31 int  Cts_Msg_Arrived = NO;
32
33
34 /*
35  *  LIB functions follow
36  */
37
38 //
39 // read
40 // copy a block of data from scr_addr to dst_addr 
41 // it all happens in kernel space - dst_addr and src_addr 
42 //
43 // original definition is to read a block od data from a 
44 // specified user address  
45 // 
46 // cb_read
47
48 int kibnal_read (nal_cb_t *nal, 
49                  void     *private, 
50                  void     *dst_addr, 
51                  user_ptr src_addr, 
52                  size_t   len)
53 {
54         CDEBUG(D_NET, "kibnal_read: 0x%Lx: reading %ld bytes from %p -> %p\n",
55                nal->ni.nid, (long)len, src_addr, dst_addr );
56
57         memcpy( dst_addr, src_addr, len );
58
59         return 0;
60 }
61
62 //
63 // it seems that read and write are doing the same thing
64 // because they all happen in kernel space 
65 // why do we need two functions like read and write 
66 // to make PORTALS API compatable 
67 //
68
69 //
70 // write 
71 // copy a block of data from scr_addr to dst_addr 
72 // it all happens in kernel space - dst_addr and src_addr 
73 //
74 // original definition is to write a block od data to a 
75 // specified user address  
76 // 
77 // cb_write
78
79 int kibnal_write(nal_cb_t   *nal, 
80                  void       *private, 
81                  user_ptr   dst_addr, 
82                  void       *src_addr, 
83                  size_t     len)
84 {
85         CDEBUG(D_NET, "kibnal_write: 0x%Lx: writing %ld bytes from %p -> %p\n",
86                nal->ni.nid, (long)len, src_addr, dst_addr );
87
88
89         memcpy( dst_addr, src_addr, len );
90
91         return 0;
92 }
93
94 //
95 // malloc
96 //
97 // either vmalloc or kmalloc is used 
98 // dynamically allocate a block of memory based on the size of buffer  
99 //
100 // cb_malloc
101
102 void * kibnal_malloc(nal_cb_t *nal, size_t length)
103 {
104         void *buffer;
105
106         // PORTAL_ALLOC will do the job 
107         // allocate a buffer with size "length"
108         PORTAL_ALLOC(buffer, length);
109
110         return buffer;
111 }
112
113 //
114 // free
115 // release a dynamically allocated memory pointed by buffer pointer 
116 //
117 // cb_free
118
119 void kibnal_free(nal_cb_t *nal, void *buffer, size_t length)
120 {
121         //
122         // release allocated buffer to system 
123         //
124         PORTAL_FREE(buffer, length);
125 }
126
127 //
128 // invalidate 
129 // because evernthing is in kernel space (LUSTRE)
130 // there is no need to mark a piece of user memory as no longer in use by
131 // the system
132 //
133 // cb_invalidate
134
135 void kibnal_invalidate(nal_cb_t      *nal, 
136                               void          *base, 
137                               size_t        extent, 
138                               void          *addrkey)
139 {
140   // do nothing 
141   CDEBUG(D_NET, "kibnal_invalidate: 0x%Lx: invalidating %p : %d\n", 
142                                         nal->ni.nid, base, extent);
143   return;
144 }
145
146
147 //
148 // validate 
149 // because everything is in kernel space (LUSTRE)
150 // there is no need to mark a piece of user memory in use by
151 // the system
152 //
153 // cb_validate
154
155 int kibnal_validate(nal_cb_t        *nal,  
156                            void            *base, 
157                            size_t          extent, 
158                            void            **addrkey)
159 {
160   // do nothing 
161   CDEBUG(D_NET, "kibnal_validate: 0x%Lx: validating %p : %d\n", 
162                                         nal->ni.nid, base, extent);
163
164   return 0;
165 }
166
167
168 //
169 // log messages from kernel space 
170 // printk() is used 
171 //
172 // cb_printf
173
174 void kibnal_printf(nal_cb_t *nal, const char *fmt, ...)
175 {
176         va_list ap;
177         char    msg[256];
178
179         if (portal_debug & D_NET) {
180                 va_start( ap, fmt );
181                 vsnprintf( msg, sizeof(msg), fmt, ap );
182                 va_end( ap );
183
184                 printk("CPUId: %d %s",smp_processor_id(), msg);
185         }
186 }
187
188 //
189 // clear interrupt
190 // use spin_lock to lock protected area such as MD, ME...
191 // so a process can enter a protected area and do some works
192 // this won't physicall disable interrup but use a software 
193 // spin-lock to control some protected areas 
194 //
195 // cb_cli 
196
197 void kibnal_cli(nal_cb_t *nal, unsigned long *flags) 
198
199         kibnal_data_t *data= nal->nal_data;
200
201         CDEBUG(D_NET, "kibnal_cli \n");
202
203         spin_lock_irqsave(&data->kib_dispatch_lock,*flags);
204
205 }
206
207 //
208 // set interrupt
209 // use spin_lock to unlock protected area such as MD, ME...
210 // this won't physicall enable interrup but use a software 
211 // spin-lock to control some protected areas 
212 //
213 // cb_sti
214
215 void kibnal_sti(nal_cb_t *nal, unsigned long *flags)
216 {
217         kibnal_data_t *data= nal->nal_data;
218
219         CDEBUG(D_NET, "kibnal_sti \n");
220
221         spin_unlock_irqrestore(&data->kib_dispatch_lock,*flags);
222 }
223
224 //
225 // A new event has just been created
226 //
227 void kibnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
228 {
229         /* holding kib_dispatch_lock */
230
231         if (eq->event_callback != NULL)
232                 eq->event_callback(ev);
233
234         /* We will wake theads sleeping in yield() here, AFTER the
235          * callback, when we implement blocking yield */
236 }
237
238 //
239 // nic distance 
240 // 
241 // network distance doesn't mean much for this nal 
242 // here we only indicate 
243 //      0 - operation is happened on the same node 
244 //      1 - operation is happened on different nodes 
245 //          router will handle the data routing 
246 //
247 // cb_dist
248
249 int kibnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
250 {
251         CDEBUG(D_NET, "kibnal_dist \n");
252
253         if ( nal->ni.nid == nid ) {
254                 *dist = 0;
255         } 
256         else {
257                 *dist = 1;
258         }
259
260         return 0; // always retrun 0 
261 }
262
263
264 //
265 // This is the cb_send() on IB based interconnect system
266 // prepare a data package and use VAPI_post_sr() to send it
267 // down-link out-going message 
268 //
269
270
271 int
272 kibnal_send(nal_cb_t        *nal,
273             void            *private,
274             lib_msg_t       *cookie,
275             ptl_hdr_t       *hdr,
276             int              type,
277             ptl_nid_t        nid,
278             ptl_pid_t        pid,
279             unsigned int     niov,
280             ptl_kiov_t      *iov,
281             size_t           len)
282 {
283         
284         int           rc=0;
285         void         *buf = NULL; 
286         unsigned long buf_length = sizeof(ptl_hdr_t) + len;
287         int           expected_buf_size = 0;
288         VAPI_ret_t    vstat;
289
290         PROF_START(kibnal_send); // time stamp send start 
291
292         CDEBUG(D_NET,"kibnal_send: sending %d bytes from %p to nid: 0x%Lx pid %d\n",
293                buf_length, iov, nid, HCA_PORT_1);
294
295
296         // do I need to check the gateway information
297         // do I have problem to send direct 
298         // do I have to forward a data packet to gateway
299         // 
300         // The current connection is back-to-back 
301         // I always know that data will be send from one-side to
302         // the other side
303         //
304         
305         //
306         //  check data buffer size 
307         //
308         //  MSG_SIZE_SMALL 
309         //      regular post send 
310         //  
311         //  MSG_SIZE_LARGE
312         //      rdma write
313         
314         if(buf_length <= SMALL_MSG_SIZE) {  
315            expected_buf_size = MSG_SIZE_SMALL;
316         } 
317         else { 
318           if(buf_length > MAX_MSG_SIZE) { 
319              CERROR("kibnal_send:request exceeds Transmit data size (%d).\n",
320                       MAX_MSG_SIZE);
321              rc = PTL_FAIL;
322              return rc;
323           }
324           else {
325              expected_buf_size = MSG_SIZE_LARGE; // this is a large data package 
326           } 
327         }
328                 
329         // prepare data packet for send operation 
330         //
331         // allocate a data buffer "buf" with size of buf_len(header + payload)
332         //                 ---------------
333         //  buf            | hdr         |  size = sizeof(ptl_hdr_t)
334         //                 --------------
335         //                 |payload data |  size = len
336         //                 ---------------
337         
338         // copy header to buf 
339         memcpy(buf, hdr, sizeof(ptl_hdr_t));
340
341         // copy payload data from iov to buf
342         // use portals library function lib_copy_iov2buf()
343         
344         if (len != 0)
345            lib_copy_iov2buf(((char *)buf) + sizeof (ptl_hdr_t),
346                             niov, 
347                             iov, 
348                             len);
349
350         // buf is ready to do a post send 
351         // the send method is base on the buf_size 
352
353         CDEBUG(D_NET,"ib_send %d bytes (size %d) from %p to nid: 0x%Lx "
354                " port %d\n", buf_length, expected_buf_size, iov, nid, HCA_PORT_1);
355
356         switch(expected_buf_size) {
357           case MSG_SIZE_SMALL:
358             // send small message 
359             if((vstat = Send_Small_Msg(buf, buf_length)) != VAPI_OK){
360                 CERROR("Send_Small_Msg() is failed\n");
361             } 
362             break;
363
364           case MSG_SIZE_LARGE:
365             // send small message 
366             if((vstat = Send_Large_Msg(buf, buf_length)) != VAPI_OK){
367                 CERROR("Send_Large_Msg() is failed\n");
368             } 
369             break;
370
371           default:
372             CERROR("Unknown message size %d\n", expected_buf_size);
373             break;
374         }
375
376         PROF_FINISH(kibnal_send); // time stapm of send operation 
377
378         rc = PTL_OK;
379
380         return rc; 
381 }
382
383 //
384 // kibnal_send_pages
385 //
386 // no support 
387 //
388 // do you need this 
389 //
390 int kibnal_send_pages(nal_cb_t * nal, 
391                       void *private, 
392                       lib_msg_t * cookie,
393                       ptl_hdr_t * hdr, 
394                       int type, 
395                       ptl_nid_t nid, 
396                       ptl_pid_t pid,
397                       unsigned int niov, 
398                       ptl_kiov_t *iov, 
399                       size_t mlen)
400 {
401    int rc = PTL_FAIL;
402
403    CDEBUG(D_NET, "kibnal_send_pages\n");
404
405    // do nothing now for Infiniband 
406    
407    return rc;
408 }
409
410
411
412
413
414 //
415 // kibnal_fwd_packet 
416 //
417 // no support 
418 //
419 // do you need this 
420 //
421 void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
422 {
423         CDEBUG(D_NET, "forwarding not implemented\n");
424         return;
425       
426 }
427
428 //
429 // kibnal_callback 
430 //
431 // no support 
432 //
433 // do you need this 
434 //
435 void kibnal_callback(nal_cb_t * nal, 
436                            void *private, 
437                            lib_eq_t *eq,
438                            ptl_event_t *ev)
439 {
440         CDEBUG(D_NET,  "callback not implemented\n");
441         return PTL_OK;
442 }
443
444
445 /* Process a received portals packet */
446 //
447 //  conver receiving data in to PORTALS header 
448 //
449
450 void kibnal_rx(kibnal_data_t    *kib, 
451                       VAPI_virt_addr_t buffer_addr,
452                       u_int32_t        buffer_len,
453                       u_int32_t        buffer_size,
454                       unsigned int     priority) 
455 {
456         ptl_hdr_t  *hdr = (ptl_hdr_t *)  buffer_addr; // case to ptl header format 
457         kibnal_rx_t krx;
458
459         CDEBUG(D_NET,"kibnal_rx: buf %p, len %ld\n", buffer_addr, buffer_len);
460
461         if ( buffer_len < sizeof( ptl_hdr_t ) ) {
462                 /* XXX what's this for? */
463                 if (kib->kib_shuttingdown)
464                         return;
465                 CERROR("kibnal_rx: did not receive complete portal header, "
466                        "len= %ld", buffer_len);
467
468                 return;
469         }
470
471        // typedef struct {
472        //         char             *krx_buffer; // pointer to receiving buffer
473        //         unsigned long     krx_len;  // length of buffer
474        //         unsigned int      krx_size; //
475        //         unsigned int      krx_priority; // do we need this
476        //         struct list_head  krx_item;
477        // } kibnal_rx_t;
478        //
479         krx.krx_buffer    = hdr;
480         krx.krx_len       = buffer_len;
481         krx.krx_size      = buffer_size;
482         krx.krx_priority  = priority;
483
484         if ( hdr->dest_nid == kibnal_lib.ni.nid ) {
485            // this is my data 
486            PROF_START(lib_parse);
487
488            lib_parse(&kibnal_lib, (ptl_hdr_t *)krx.krx_buffer, &krx);
489
490            PROF_FINISH(lib_parse);
491         } else {
492            /* forward to gateway */
493            // Do we expect this happened ?
494            //      
495            CERROR("kibnal_rx: forwarding not implemented yet");
496         }
497
498         return;
499 }
500
501
502
503
504 //
505 // kibnal_recv_pages 
506 //
507 // no support 
508 //
509 // do you need this 
510 //
511 int
512 kibnal_recv_pages(nal_cb_t * nal, 
513                   void *private, 
514                   lib_msg_t * cookie,
515                   unsigned int niov, 
516                   ptl_kiov_t *iov, 
517                   size_t mlen,
518                   size_t rlen)
519 {
520
521   CDEBUG(D_NET, "recv_pages not implemented\n");
522   return PTL_FAIL;
523        
524 }
525
526
527 int 
528 kibnal_recv(nal_cb_t     *nal,
529             void         *private,
530             lib_msg_t    *cookie,
531             unsigned int  niov,
532             struct iovec *iov,
533             size_t        mlen,
534             size_t        rlen)
535 {
536         kibnal_rx_t *krx = private;
537
538         CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen);
539
540         /* What was actually received must be >= what sender claims to
541          * have sent. */
542         LASSERT (mlen <= rlen);
543
544         if (krx->krx_len < sizeof (ptl_hdr_t) + rlen)
545                 return (PTL_FAIL);
546
547         PROF_START(kibnal_recv);
548
549         if(mlen != 0) {
550                 PROF_START(memcpy);
551                 lib_copy_buf2iov (niov, iov, krx->krx_buffer +
552                                   sizeof (ptl_hdr_t), mlen);
553                 PROF_FINISH(memcpy);
554         }
555
556         PROF_START(lib_finalize);
557         
558         lib_finalize(nal, private, cookie, PTL_OK);
559         
560         PROF_FINISH(lib_finalize);
561         PROF_FINISH(kibnal_recv);
562
563         return PTL_OK;
564 }
565
566 //
567 // kibnal_map 
568 // no support 
569 // do you need this 
570 //
571 int kibnal_map(nal_cb_t * nal, 
572                unsigned int niov, 
573                struct iovec *iov,
574                void **addrkey)
575 {
576   CDEBUG(D_NET, "map not implemented\n");
577   return PTL_OK; 
578 }
579
580
581
582 //
583 // kibnal_unmap
584 //
585 // no support 
586 //
587 // do you need this 
588 //
589 void kibnal_unmap(nal_cb_t * nal, 
590                   unsigned int niov, 
591                   struct iovec *iov,
592                   void **addrkey)
593 {
594   CDEBUG(D_NET, "unmap not implemented\n");
595   return;
596 }
597
598
599
600 //
601 // kibnal_map_pages 
602 // no support 
603 // do you need this 
604 /* as (un)map, but with a set of page fragments */
605 int kibnal_map_pages(nal_cb_t * nal, 
606                      unsigned int niov, 
607                      ptl_kiov_t *iov,
608                      void **addrkey)
609 {
610   CDEBUG(D_NET, "map_pages not implemented\n");
611   return PTL_OK;
612 }
613
614
615
616 //
617 // kibnal_unmap_pages 
618 //
619 // no support 
620 //
621 // do you need this 
622 //
623 void kibnal_unmap_pages(nal_cb_t * nal, 
624                                unsigned int niov, 
625                                ptl_kiov_t *iov,
626                                void **addrkey)
627 {
628   CDEBUG(D_NET, "unmap_pages not implemented\n");
629   return ;
630 }
631
632
633 int kibnal_end(kibnal_data_t *kib)
634 {
635
636   /* wait for sends to finish ? */
637   /* remove receive buffers */
638   /* shutdown receive thread */
639
640   CDEBUG(D_NET, "kibnal_end\n");
641   IB_Close_HCA();
642
643   return 0;
644 }
645
646
647 //
648 //
649 //  asynchronous event handler: response to some unexpetced operation errors 
650 //    
651 //  void async_event_handler(VAPI_hca_hndl_t      hca_hndl,
652 //                           VAPI_event_record_t *event_record_p,
653 //                           void*                private_data)
654 //  the HCA drive will prepare evetn_record_p                        
655 //
656 //  this handler is registered with VAPI_set_async_event_handler()
657 //  VAPI_set_async_event_handler() is issued when an HCA is created 
658 //
659 //
660 void async_event_handler(VAPI_hca_hndl_t      hca_hndl,
661                          VAPI_event_record_t *event_record_p,  
662                          void*                private_data)
663 {
664   //
665   // * event_record_p is prepared by the system when an async
666   //   event happened
667   // * what to do with private_data 
668   // * do we expect more async events happened if so what are they 
669   //
670   //   only log ERROR message now 
671
672   switch (event_record_p->type) {
673     case VAPI_PORT_ERROR:
674          printk("Got PORT_ERROR event. port number=%d\n", 
675                  event_record_p->modifier.port_num);
676          break;
677     case VAPI_PORT_ACTIVE:
678          printk("Got PORT_ACTIVE event. port number=%d\n", 
679                  event_record_p->modifier.port_num);
680          break;
681     case VAPI_QP_PATH_MIGRATED:    /*QP*/
682          printk("Got P_PATH_MIGRATED event. qp_hndl=%lu\n", 
683                  event_record_p->modifier.qp_hndl);
684          break;
685     case VAPI_EEC_PATH_MIGRATED:   /*EEC*/
686          printk("Got EEC_PATH_MIGRATED event. eec_hndl=%d\n", 
687                  event_record_p->modifier.eec_hndl);
688          break;
689     case VAPI_QP_COMM_ESTABLISHED: /*QP*/
690          printk("Got QP_COMM_ESTABLISHED event. qp_hndl=%lu\n", 
691                  event_record_p->modifier.qp_hndl);
692          break;
693     case VAPI_EEC_COMM_ESTABLISHED: /*EEC*/
694          printk("Got EEC_COMM_ESTABLISHED event. eec_hndl=%d\n",
695                  event_record_p->modifier.eec_hndl);
696          break;
697     case VAPI_SEND_QUEUE_DRAINED:  /*QP*/
698          printk("Got SEND_QUEUE_DRAINED event. qp_hndl=%lu\n", 
699                  event_record_p->modifier.qp_hndl);
700          break;
701     case VAPI_CQ_ERROR:            /*CQ*/
702          printk("Got CQ_ERROR event. cq_hndl=%lu\n", 
703                  event_record_p->modifier.cq_hndl);
704          break;
705     case VAPI_LOCAL_WQ_INV_REQUEST_ERROR: /*QP*/
706          printk("Got LOCAL_WQ_INV_REQUEST_ERROR event. qp_hndl=%lu\n", 
707                  event_record_p->modifier.qp_hndl);
708          break;
709     case VAPI_LOCAL_WQ_ACCESS_VIOL_ERROR: /*QP*/
710          printk("Got LOCAL_WQ_ACCESS_VIOL_ERROR event. qp_hndl=%lu\n", 
711                  event_record_p->modifier.qp_hndl);
712          break;
713     case VAPI_LOCAL_WQ_CATASTROPHIC_ERROR: /*QP*/
714          printk("Got LOCAL_WQ_CATASTROPHIC_ERROR event. qp_hndl=%lu\n", 
715                  event_record_p->modifier.qp_hndl);
716          break;
717     case VAPI_PATH_MIG_REQ_ERROR:  /*QP*/
718          printk("Got PATH_MIG_REQ_ERROR event. qp_hndl=%lu\n", 
719                  event_record_p->modifier.qp_hndl);
720          break;
721     case VAPI_LOCAL_CATASTROPHIC_ERROR: /*none*/
722          printk("Got LOCAL_CATASTROPHIC_ERROR event. \n");
723          break;
724     default:
725          printk(":got non-valid event type=%d. IGNORING\n",
726                     event_record_p->type);
727   }
728
729 }
730
731
732
733
734 VAPI_wr_id_t 
735 search_send_buf(int buf_length)
736 {
737   VAPI_wr_id_t send_id = -1;
738   u_int32_t    i;
739   int          flag = NO;
740   int          loop_count = 0;  
741
742   CDEBUG(D_NET, "search_send_buf \n");
743   
744   while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
745     for(i=0; i < NUM_ENTRY; i++) {
746       // problem about using spinlock
747       spin_lock(&MSB_mutex[i]);
748       if(MSbuf_list[i].status == BUF_REGISTERED)  {
749         MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
750         flag =  YES;
751         spin_unlock(&MSB_mutex[i]);
752         break;
753       }
754       else
755         spin_unlock(&MSB_mutex[i]); 
756     }
757
758     loop_count++;
759     schedule_timeout(200); // wait for a while 
760   }
761    
762   if(flag == NO)  {
763     CDEBUG(D_NET, "search_send_buf: could not locate an entry in MSbuf_list\n");
764   }
765
766   send_id = (VAPI_wr_id_t ) i;
767
768   return send_id;
769 }
770
771
772
773 VAPI_wr_id_t 
774 search_RDMA_recv_buf(int buf_length)
775 {
776   VAPI_wr_id_t recv_id = -1;
777   u_int32_t    i;
778   int          flag = NO;
779   int          loop_count = 0;  
780
781   CDEBUG(D_NET, "search_RDMA_recv_buf\n");
782
783   while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
784
785     for(i=NUM_ENTRY; i < NUM_MBUF; i++) {
786
787       spin_lock(&MSB_mutex[i]);
788
789       if((MRbuf_list[i].status == BUF_REGISTERED)  &&
790          (MRbuf_list[i].buf_size >= buf_length)) {
791           MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
792           flag =  YES;
793           spin_unlock(&MSB_mutex[i]);
794           break;
795       }
796       else
797         spin_unlock(&MSB_mutex[i]);
798     }
799
800     loop_count++;
801
802     schedule_timeout(200); // wait for a while 
803   }
804    
805   if(flag == NO)  {
806     CERROR("search_RDMA_recv_buf: could not locate an entry in MBbuf_list\n");
807   }
808
809   recv_id = (VAPI_wr_id_t ) i;
810
811   return recv_id;
812
813 }
814
815
816
817
818
819
820
821 VAPI_ret_t Send_Small_Msg(char *buf, int buf_length)
822 {
823  VAPI_ret_t           vstat;
824  VAPI_sr_desc_t       sr_desc;
825  VAPI_sg_lst_entry_t  sr_sg;
826  QP_info              *qp;
827  VAPI_wr_id_t         send_id;
828
829  CDEBUG(D_NET, "Send_Small_Msg\n");
830
831  send_id = search_send_buf(buf_length); 
832
833  if(send_id < 0){
834    CERROR("Send_Small_Msg: Can not find a QP \n");
835    return(~VAPI_OK);
836  }
837
838  qp = &QP_list[(int) send_id];
839
840  // find a suitable/registered send_buf from MSbuf_list
841  CDEBUG(D_NET, "Send_Small_Msg: current send id  %d \n", send_id);
842
843  sr_desc.opcode    = VAPI_SEND;
844  sr_desc.comp_type = VAPI_SIGNALED;
845  sr_desc.id        =  send_id;
846
847
848  // scatter and gather info 
849  sr_sg.len  = buf_length;
850  sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR 
851
852  sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
853
854  // copy data to register send buffer 
855  memcpy(&sr_sg.addr, buf, buf_length);
856
857  sr_desc.sg_lst_p = &sr_sg;
858  sr_desc.sg_lst_len = 1; // only 1 entry is used 
859  sr_desc.fence = TRUE;
860  sr_desc.set_se = FALSE;
861
862  // call VAPI_post_sr to send out this data 
863  vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
864
865  if (vstat != VAPI_OK) {
866     CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
867  }
868
869  CDEBUG(D_NET, "VAPI_post_sr success.\n");
870
871  return (vstat);
872
873 }
874
875
876
877
878 VAPI_wr_id_t
879 RTS_handshaking_protocol(int buf_length) 
880 {
881
882  VAPI_ret_t           vstat;
883  VAPI_sr_desc_t       sr_desc;
884  VAPI_sg_lst_entry_t  sr_sg;
885  VAPI_wr_id_t         send_id;
886
887  RDMA_Info_Exchange   rdma_info;
888
889  rdma_info.opcode     = Ready_To_send;
890  rdma_info.buf_length = buf_length; 
891  rdma_info.raddr      = (VAPI_virt_addr_t) 0;
892  rdma_info.rkey       = (VAPI_rkey_t) 0 ; 
893
894  QP_info              *qp;
895
896  CDEBUG(D_NET, "RTS_handshaking_protocol\n");
897
898  // find a suitable/registered send_buf from MSbuf_list
899  send_id = search_send_buf(sizeof(RDMA_Info_Exchange));   
900
901  qp = &QP_list[(int) send_id];
902
903  CDEBUG(D_NET, "RTS_CTS: current send id  %d \n", send_id);
904  sr_desc.opcode    = VAPI_SEND;
905  sr_desc.comp_type = VAPI_SIGNALED;
906  sr_desc.id        = send_id + RDMA_RTS_ID;// this RTS mesage ID 
907
908  // scatter and gather info 
909  sr_sg.len  = sizeof(RDMA_Info_Exchange);
910  sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR 
911  sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
912
913  // copy data to register send buffer 
914  memcpy(&sr_sg.addr, &rdma_info, sizeof(RDMA_Info_Exchange));
915
916  sr_desc.sg_lst_p = &sr_sg;
917  sr_desc.sg_lst_len = 1; // only 1 entry is used 
918  sr_desc.fence = TRUE;
919  sr_desc.set_se = FALSE;
920
921  // call VAPI_post_sr to send out this RTS message data 
922  vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
923
924  if (vstat != VAPI_OK) {
925     CERROR("RTS: VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
926  }
927
928  return send_id;
929
930 }
931
932
933
934 // create local receiving Memory Region for a HCA
935 VAPI_ret_t
936 createMemRegion_RDMA(VAPI_hca_hndl_t  hca_hndl,
937                      VAPI_pd_hndl_t   pd_hndl,
938                      char            *bufptr,
939                      int              buf_length,
940                      VAPI_mr_hndl_t   *rep_mr_hndl,
941                      VAPI_mrw_t       *rep_mr)
942 {
943   VAPI_ret_t      vstat;
944   VAPI_mrw_t      mrw;
945   
946   CDEBUG(D_NET, "createMemRegion_RDMA\n");
947
948   // memory region address and size of memory region
949   // allocate a block of memory for this HCA 
950   // RDMA data buffer
951   
952   
953   if(bufptr == NULL) {
954     // need to allcate a local buffer to receive data from a
955     // remore VAPI_RDMA_WRITE_IMM
956     PORTAL_ALLOC(bufptr, buf_length);
957   }
958
959   if(bufptr == NULL) {
960     CDEBUG(D_MALLOC, "Failed to malloc a block of RDMA receiving memory, size %d\n",
961                                     buf_length);
962     return(VAPI_ENOMEM);
963   }
964
965   /* Register RDAM data Memory region */
966   CDEBUG(D_NET, "Register a RDMA data memory region\n");
967
968   mrw.type   = VAPI_MR;
969   mrw.pd_hndl= pd_hndl;
970   mrw.start  = (VAPI_virt_addr_t )(MT_virt_addr_t )bufptr;
971   mrw.size   = buf_length;
972   mrw.acl    = VAPI_EN_LOCAL_WRITE  | 
973                VAPI_EN_REMOTE_WRITE | 
974                VAPI_EN_REMOTE_READ;
975
976   // register send memory region
977   vstat = VAPI_register_mr(hca_hndl,
978                            &mrw,
979                            rep_mr_hndl,
980                            rep_mr);
981
982   // this memory region is going to be reused until deregister is called
983   if (vstat != VAPI_OK) {
984      CERROR("Failed registering a mem region Addr=%p, Len=%d. %s\n",
985              bufptr, buf_length, VAPI_strerror(vstat));
986   }
987
988   return(vstat);
989
990 }
991
992
993
994 RDMA_Info_Exchange  Local_rdma_info;
995
996 int insert_MRbuf_list(int buf_lenght)
997 {
998   int  recv_id = NUM_ENTRY;      
999
1000   CDEBUG(D_NET, "insert_MRbuf_list\n");
1001
1002   for(recv_id= NUM_ENTRY; recv_id < NUM_MBUF; recv_id++){
1003        if(BUF_UNREGISTERED == MRbuf_list[recv_id].status)  {
1004          MRbuf_list[recv_id].status   = BUF_UNREGISTERED;
1005          MRbuf_list[recv_id].buf_size = buf_lenght;
1006          break;
1007        }
1008   }
1009
1010   return recv_id;
1011
1012 }  
1013
1014 VAPI_wr_id_t
1015 CTS_handshaking_protocol(RDMA_Info_Exchange *rdma_info) 
1016 {
1017
1018  VAPI_ret_t           vstat;
1019  VAPI_sr_desc_t       sr_desc;
1020  VAPI_sg_lst_entry_t  sr_sg;
1021  QP_info             *qp;
1022  VAPI_wr_id_t         send_id;
1023  VAPI_mr_hndl_t       rep_mr_hndl;
1024  VAPI_mrw_t           rep_mr;
1025  int                  recv_id;
1026  char                *bufptr = NULL;
1027
1028  // search MRbuf_list for an available entry that
1029  // has registered data buffer with size equal to rdma_info->buf_lenght
1030
1031  CDEBUG(D_NET, "CTS_handshaking_protocol\n");
1032
1033  // register memory buffer for RDAM operation
1034
1035  vstat = createMemRegion_RDMA(Hca_hndl,
1036                               Pd_hndl,
1037                               bufptr, 
1038                               rdma_info->buf_length,
1039                               &rep_mr_hndl,
1040                               &rep_mr);
1041
1042
1043  Local_rdma_info.opcode            = Clear_To_send;
1044  Local_rdma_info.recv_rdma_mr      = rep_mr;
1045  Local_rdma_info.recv_rdma_mr_hndl = rep_mr_hndl;
1046
1047  if (vstat != VAPI_OK) {
1048     CERROR("CST_handshaking_protocol: Failed registering a mem region"
1049            "Len=%d. %s\n", rdma_info->buf_length, VAPI_strerror(vstat));
1050     Local_rdma_info.flag = RDMA_BUFFER_UNAVAILABLE;
1051  }
1052  else {
1053     // successfully allcate reserved RDAM data buffer 
1054     recv_id = insert_MRbuf_list(rdma_info->buf_length);   
1055
1056     if(recv_id >=  NUM_ENTRY) { 
1057       MRbuf_list[recv_id].buf_addr     = rep_mr.start;
1058       MRbuf_list[recv_id].mr           = rep_mr;
1059       MRbuf_list[recv_id].mr_hndl      = rep_mr_hndl;
1060       MRbuf_list[recv_id].ref_count    = 0;
1061       Local_rdma_info.flag             = RDMA_BUFFER_RESERVED;
1062       Local_rdma_info.buf_length       = rdma_info->buf_length; 
1063       Local_rdma_info.raddr            = rep_mr.start;
1064       Local_rdma_info.rkey             = rep_mr.r_key; 
1065     }
1066     else {
1067       CERROR("Can not find an entry in MRbuf_list - how could this happen\n");  
1068     }
1069  }
1070
1071  // find a suitable/registered send_buf from MSbuf_list
1072  send_id = search_send_buf(sizeof(RDMA_Info_Exchange)); 
1073  CDEBUG(D_NET, "CTS: current send id  %d \n", send_id);
1074  sr_desc.opcode    = VAPI_SEND;
1075  sr_desc.comp_type = VAPI_SIGNALED;
1076  sr_desc.id        = send_id + RDMA_CTS_ID; // this CST message ID 
1077
1078  // scatter and gather info 
1079  sr_sg.len  = sizeof(RDMA_Info_Exchange);
1080  sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR 
1081  sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
1082
1083  // copy data to register send buffer 
1084  memcpy(&sr_sg.addr, &Local_rdma_info, sizeof(RDMA_Info_Exchange));
1085
1086  sr_desc.sg_lst_p   = &sr_sg;
1087  sr_desc.sg_lst_len = 1; // only 1 entry is used 
1088  sr_desc.fence = TRUE;
1089  sr_desc.set_se = FALSE;
1090
1091  // call VAPI_post_sr to send out this RTS message data 
1092  vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1093
1094  if (vstat != VAPI_OK) {
1095     CERROR("CTS: VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
1096  }
1097
1098
1099 }
1100
1101
1102
1103 VAPI_ret_t Send_Large_Msg(char *buf, int buf_length)
1104 {
1105   VAPI_ret_t           vstat;
1106   VAPI_sr_desc_t       sr_desc;
1107   VAPI_sg_lst_entry_t  sr_sg;
1108   QP_info             *qp;
1109   VAPI_mrw_t           rep_mr; 
1110   VAPI_mr_hndl_t       rep_mr_hndl;
1111   int                  send_id;
1112   VAPI_imm_data_t      imm_data = 0XAAAA5555;
1113
1114
1115   CDEBUG(D_NET, "Send_Large_Msg: Enter\n");
1116
1117   // register this large buf 
1118   // don't need to copy this buf to send buffer
1119   vstat = createMemRegion_RDMA(Hca_hndl,
1120                                Pd_hndl,
1121                                buf,
1122                                buf_length,
1123                                &rep_mr_hndl,
1124                                &rep_mr);
1125
1126   if (vstat != VAPI_OK) {
1127     CERROR("Send_Large_M\sg:  createMemRegion_RDMAi() failed (%s).\n",
1128                         VAPI_strerror(vstat));
1129   }
1130   
1131
1132   Local_rdma_info.send_rdma_mr      = rep_mr;
1133   Local_rdma_info.send_rdma_mr_hndl = rep_mr_hndl;
1134
1135   //
1136   //     Prepare descriptor for send queue
1137   //
1138  
1139   // ask for a remote rdma buffer with size buf_lenght
1140   send_id = RTS_handshaking_protocol(buf_length); 
1141
1142   qp = &QP_list[send_id];
1143
1144   // wait for CTS message receiving from remote node 
1145   while(1){
1146      if(YES == Cts_Message_arrived) {
1147         // receive CST message from remote node 
1148         // Rdma_info is available for use
1149         break;
1150      }
1151      schedule_timeout(RTS_CTS_TIMEOUT);
1152   }
1153   
1154   sr_desc.id        = send_id + RDMA_OP_ID;
1155   sr_desc.opcode    = VAPI_RDMA_WRITE_WITH_IMM;
1156   sr_desc.comp_type = VAPI_SIGNALED;
1157
1158   // scatter and gather info 
1159   sr_sg.len  = buf_length;
1160
1161   // rdma mr 
1162   sr_sg.lkey = rep_mr.l_key;  
1163   sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) rep_mr.start;
1164   sr_desc.sg_lst_p = &sr_sg;
1165   sr_desc.sg_lst_len = 1; // only 1 entry is used 
1166
1167   // immediate data - not used here 
1168   sr_desc.imm_data = imm_data;
1169   sr_desc.fence = TRUE;
1170   sr_desc.set_se = FALSE;
1171
1172   // RDAM operation only
1173   // raddr and rkey is receiving from remote node  
1174   sr_desc.remote_addr = Rdma_info.raddr;
1175   sr_desc.r_key       = Rdma_info.rkey;
1176
1177   // call VAPI_post_sr to send out this data 
1178   vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1179
1180   if (vstat != VAPI_OK) {
1181      CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
1182   }
1183
1184 }
1185
1186
1187
1188
1189
1190
1191 //
1192 //  repost_recv_buf
1193 //  post a used recv buffer back to recv WQE list 
1194 //  wrq_id is used to indicate the starting position of recv-buffer 
1195 //
1196 VAPI_ret_t 
1197 repost_recv_buf(QP_info      *qp,
1198                 VAPI_wr_id_t  wrq_id) 
1199 {
1200   VAPI_rr_desc_t       rr;
1201   VAPI_sg_lst_entry_t  sg_entry;
1202   VAPI_ret_t           ret;
1203
1204   CDEBUG(D_NET, "repost_recv_buf\n");
1205
1206   sg_entry.lkey = MRbuf_list[wrq_id].mr.l_key;
1207   sg_entry.len  = MRbuf_list[wrq_id].buf_size;
1208   sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[wrq_id].buf_addr;
1209   rr.opcode     = VAPI_RECEIVE;
1210   rr.comp_type  = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1211   rr.sg_lst_len = 1; /* single buffers */
1212   rr.sg_lst_p   = &sg_entry;
1213   rr.id         = wrq_id; /* WQE id used is the index to buffers ptr array */
1214
1215   ret= VAPI_post_rr(qp->hca_hndl,qp->qp_hndl,&rr);
1216      
1217   if (ret != VAPI_OK){
1218      CERROR("failed reposting RQ WQE (%s) buffer \n",VAPI_strerror_sym(ret));
1219      return ret;
1220   }
1221
1222   CDEBUG(D_NET, "Successfully reposting an RQ WQE %d recv bufer \n", wrq_id);
1223
1224   return ret ;
1225 }
1226                         
1227 //
1228 // post_recv_bufs
1229 //      post "num_o_bufs" for receiving data
1230 //      each receiving buf (buffer starting address, size of buffer)
1231 //      each buffer is associated with an id 
1232 //
1233 int 
1234 post_recv_bufs(VAPI_wr_id_t  start_id)
1235 {
1236   int i;
1237   VAPI_rr_desc_t       rr;
1238   VAPI_sg_lst_entry_t  sg_entry;
1239   VAPI_ret_t           ret;
1240
1241   CDEBUG(D_NET, "post_recv_bufs\n");
1242
1243   for(i=0; i< NUM_ENTRY; i++) {
1244     sg_entry.lkey = MRbuf_list[i].mr.l_key;
1245     sg_entry.len  = MRbuf_list[i].buf_size;
1246     sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[i].buf_addr;
1247     rr.opcode     = VAPI_RECEIVE;
1248     rr.comp_type  = VAPI_SIGNALED;  /* All with CQE (IB compliant) */
1249     rr.sg_lst_len = 1; /* single buffers */
1250     rr.sg_lst_p   = &sg_entry;
1251     rr.id         = start_id+i; /* WQE id used is the index to buffers ptr array */
1252
1253     ret= VAPI_post_rr(QP_list[i].hca_hndl,QP_list[i].qp_hndl, &rr);
1254     if (ret != VAPI_OK) {
1255        CERROR("failed posting RQ WQE (%s)\n",VAPI_strerror_sym(ret));
1256        return i;
1257     } 
1258   }
1259
1260   return i; /* num of buffers posted */
1261 }
1262                         
1263 int 
1264 post_RDMA_bufs(QP_info      *qp, 
1265                void         *buf_array,
1266                unsigned int  num_bufs,
1267                unsigned int  buf_size,
1268                VAPI_wr_id_t  start_id)
1269 {
1270
1271   CDEBUG(D_NET, "post_RDMA_bufs \n");
1272   return YES;
1273 }
1274
1275
1276
1277 //
1278 // LIB NAL
1279 // assign function pointers to theirs corresponding entries
1280 //
1281
1282 nal_cb_t kibnal_lib = {
1283         nal_data:       &kibnal_data,  /* NAL private data */
1284         cb_send:        kibnal_send,
1285         cb_send_pages:  NULL, // not implemented  
1286         cb_recv:        kibnal_recv,
1287         cb_recv_pages:  NULL, // not implemented 
1288         cb_read:        kibnal_read,
1289         cb_write:       kibnal_write,
1290         cb_callback:    NULL, // not implemented 
1291         cb_malloc:      kibnal_malloc,
1292         cb_free:        kibnal_free,
1293         cb_map:         NULL, // not implemented 
1294         cb_unmap:       NULL, // not implemented 
1295         cb_map_pages:   NULL, // not implemented 
1296         cb_unmap_pages: NULL, // not implemented 
1297         cb_printf:      kibnal_printf,
1298         cb_cli:         kibnal_cli,
1299         cb_sti:         kibnal_sti,
1300         cb_callback:    kibnal_callback,
1301         cb_dist:        kibnal_dist // no used at this moment 
1302 };