1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Based on ksocknal and qswnal
6 * Author: Hsing-bung Chen <hbchen@lanl.gov>
8 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
10 * Portals is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Portals is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Portals; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
30 RDMA_Info_Exchange Rdma_nfo;
31 int Cts_Msg_Arrived = NO;
35 * LIB functions follow
40 // copy a block of data from scr_addr to dst_addr
41 // it all happens in kernel space - dst_addr and src_addr
43 // original definition is to read a block od data from a
44 // specified user address
48 int kibnal_read (nal_cb_t *nal,
54 CDEBUG(D_NET, "kibnal_read: 0x%Lx: reading %ld bytes from %p -> %p\n",
55 nal->ni.nid, (long)len, src_addr, dst_addr );
57 memcpy( dst_addr, src_addr, len );
63 // it seems that read and write are doing the same thing
64 // because they all happen in kernel space
65 // why do we need two functions like read and write
66 // to make PORTALS API compatable
71 // copy a block of data from scr_addr to dst_addr
72 // it all happens in kernel space - dst_addr and src_addr
74 // original definition is to write a block od data to a
75 // specified user address
79 int kibnal_write(nal_cb_t *nal,
85 CDEBUG(D_NET, "kibnal_write: 0x%Lx: writing %ld bytes from %p -> %p\n",
86 nal->ni.nid, (long)len, src_addr, dst_addr );
89 memcpy( dst_addr, src_addr, len );
97 // either vmalloc or kmalloc is used
98 // dynamically allocate a block of memory based on the size of buffer
102 void * kibnal_malloc(nal_cb_t *nal, size_t length)
106 // PORTAL_ALLOC will do the job
107 // allocate a buffer with size "length"
108 PORTAL_ALLOC(buffer, length);
115 // release a dynamically allocated memory pointed by buffer pointer
119 void kibnal_free(nal_cb_t *nal, void *buffer, size_t length)
122 // release allocated buffer to system
124 PORTAL_FREE(buffer, length);
129 // because evernthing is in kernel space (LUSTRE)
130 // there is no need to mark a piece of user memory as no longer in use by
135 void kibnal_invalidate(nal_cb_t *nal,
141 CDEBUG(D_NET, "kibnal_invalidate: 0x%Lx: invalidating %p : %d\n",
142 nal->ni.nid, base, extent);
149 // because everything is in kernel space (LUSTRE)
150 // there is no need to mark a piece of user memory in use by
155 int kibnal_validate(nal_cb_t *nal,
161 CDEBUG(D_NET, "kibnal_validate: 0x%Lx: validating %p : %d\n",
162 nal->ni.nid, base, extent);
169 // log messages from kernel space
174 void kibnal_printf(nal_cb_t *nal, const char *fmt, ...)
179 if (portal_debug & D_NET) {
181 vsnprintf( msg, sizeof(msg), fmt, ap );
184 printk("CPUId: %d %s",smp_processor_id(), msg);
190 // use spin_lock to lock protected area such as MD, ME...
191 // so a process can enter a protected area and do some works
192 // this won't physicall disable interrup but use a software
193 // spin-lock to control some protected areas
197 void kibnal_cli(nal_cb_t *nal, unsigned long *flags)
199 kibnal_data_t *data= nal->nal_data;
201 CDEBUG(D_NET, "kibnal_cli \n");
203 spin_lock_irqsave(&data->kib_dispatch_lock,*flags);
209 // use spin_lock to unlock protected area such as MD, ME...
210 // this won't physicall enable interrup but use a software
211 // spin-lock to control some protected areas
215 void kibnal_sti(nal_cb_t *nal, unsigned long *flags)
217 kibnal_data_t *data= nal->nal_data;
219 CDEBUG(D_NET, "kibnal_sti \n");
221 spin_unlock_irqrestore(&data->kib_dispatch_lock,*flags);
229 // network distance doesn't mean much for this nal
230 // here we only indicate
231 // 0 - operation is happened on the same node
232 // 1 - operation is happened on different nodes
233 // router will handle the data routing
237 int kibnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
239 CDEBUG(D_NET, "kibnal_dist \n");
241 if ( nal->ni.nid == nid ) {
248 return 0; // always retrun 0
253 // This is the cb_send() on IB based interconnect system
254 // prepare a data package and use VAPI_post_sr() to send it
255 // down-link out-going message
260 kibnal_send(nal_cb_t *nal,
274 unsigned long buf_length = sizeof(ptl_hdr_t) + len;
275 int expected_buf_size = 0;
278 PROF_START(kibnal_send); // time stamp send start
280 CDEBUG(D_NET,"kibnal_send: sending %d bytes from %p to nid: 0x%Lx pid %d\n",
281 buf_length, iov, nid, HCA_PORT_1);
284 // do I need to check the gateway information
285 // do I have problem to send direct
286 // do I have to forward a data packet to gateway
288 // The current connection is back-to-back
289 // I always know that data will be send from one-side to
294 // check data buffer size
302 if(buf_length <= SMALL_MSG_SIZE) {
303 expected_buf_size = MSG_SIZE_SMALL;
306 if(buf_length > MAX_MSG_SIZE) {
307 CERROR("kibnal_send:request exceeds Transmit data size (%d).\n",
313 expected_buf_size = MSG_SIZE_LARGE; // this is a large data package
317 // prepare data packet for send operation
319 // allocate a data buffer "buf" with size of buf_len(header + payload)
321 // buf | hdr | size = sizeof(ptl_hdr_t)
323 // |payload data | size = len
326 // copy header to buf
327 memcpy(buf, hdr, sizeof(ptl_hdr_t));
329 // copy payload data from iov to buf
330 // use portals library function lib_copy_iov2buf()
333 lib_copy_iov2buf(((char *)buf) + sizeof (ptl_hdr_t),
338 // buf is ready to do a post send
339 // the send method is base on the buf_size
341 CDEBUG(D_NET,"ib_send %d bytes (size %d) from %p to nid: 0x%Lx "
342 " port %d\n", buf_length, expected_buf_size, iov, nid, HCA_PORT_1);
344 switch(expected_buf_size) {
346 // send small message
347 if((vstat = Send_Small_Msg(buf, buf_length)) != VAPI_OK){
348 CERROR("Send_Small_Msg() is failed\n");
353 // send small message
354 if((vstat = Send_Large_Msg(buf, buf_length)) != VAPI_OK){
355 CERROR("Send_Large_Msg() is failed\n");
360 CERROR("Unknown message size %d\n", expected_buf_size);
364 PROF_FINISH(kibnal_send); // time stapm of send operation
378 int kibnal_send_pages(nal_cb_t * nal,
391 CDEBUG(D_NET, "kibnal_send_pages\n");
393 // do nothing now for Infiniband
409 void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
411 CDEBUG(D_NET, "forwarding not implemented\n");
423 void kibnal_callback(nal_cb_t * nal,
428 CDEBUG(D_NET, "callback not implemented\n");
433 /* Process a received portals packet */
435 // conver receiving data in to PORTALS header
438 void kibnal_rx(kibnal_data_t *kib,
439 VAPI_virt_addr_t buffer_addr,
440 u_int32_t buffer_len,
441 u_int32_t buffer_size,
442 unsigned int priority)
444 ptl_hdr_t *hdr = (ptl_hdr_t *) buffer_addr; // case to ptl header format
447 CDEBUG(D_NET,"kibnal_rx: buf %p, len %ld\n", buffer_addr, buffer_len);
449 if ( buffer_len < sizeof( ptl_hdr_t ) ) {
450 /* XXX what's this for? */
451 if (kib->kib_shuttingdown)
453 CERROR("kibnal_rx: did not receive complete portal header, "
454 "len= %ld", buffer_len);
460 // char *krx_buffer; // pointer to receiving buffer
461 // unsigned long krx_len; // length of buffer
462 // unsigned int krx_size; //
463 // unsigned int krx_priority; // do we need this
464 // struct list_head krx_item;
467 krx.krx_buffer = hdr;
468 krx.krx_len = buffer_len;
469 krx.krx_size = buffer_size;
470 krx.krx_priority = priority;
472 if ( hdr->dest_nid == kibnal_lib.ni.nid ) {
474 PROF_START(lib_parse);
476 lib_parse(&kibnal_lib, (ptl_hdr_t *)krx.krx_buffer, &krx);
478 PROF_FINISH(lib_parse);
480 /* forward to gateway */
481 // Do we expect this happened ?
483 CERROR("kibnal_rx: forwarding not implemented yet");
500 kibnal_recv_pages(nal_cb_t * nal,
509 CDEBUG(D_NET, "recv_pages not implemented\n");
516 kibnal_recv(nal_cb_t *nal,
524 kibnal_rx_t *krx = private;
526 CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen);
528 /* What was actually received must be >= what sender claims to
530 LASSERT (mlen <= rlen);
532 if (krx->krx_len < sizeof (ptl_hdr_t) + rlen)
535 PROF_START(kibnal_recv);
539 lib_copy_buf2iov (niov, iov, krx->krx_buffer +
540 sizeof (ptl_hdr_t), mlen);
544 PROF_START(lib_finalize);
546 lib_finalize(nal, private, cookie, PTL_OK);
548 PROF_FINISH(lib_finalize);
549 PROF_FINISH(kibnal_recv);
559 int kibnal_map(nal_cb_t * nal,
564 CDEBUG(D_NET, "map not implemented\n");
577 void kibnal_unmap(nal_cb_t * nal,
582 CDEBUG(D_NET, "unmap not implemented\n");
592 /* as (un)map, but with a set of page fragments */
593 int kibnal_map_pages(nal_cb_t * nal,
598 CDEBUG(D_NET, "map_pages not implemented\n");
605 // kibnal_unmap_pages
611 void kibnal_unmap_pages(nal_cb_t * nal,
616 CDEBUG(D_NET, "unmap_pages not implemented\n");
621 int kibnal_end(kibnal_data_t *kib)
624 /* wait for sends to finish ? */
625 /* remove receive buffers */
626 /* shutdown receive thread */
628 CDEBUG(D_NET, "kibnal_end\n");
637 // asynchronous event handler: response to some unexpetced operation errors
639 // void async_event_handler(VAPI_hca_hndl_t hca_hndl,
640 // VAPI_event_record_t *event_record_p,
641 // void* private_data)
642 // the HCA drive will prepare evetn_record_p
644 // this handler is registered with VAPI_set_async_event_handler()
645 // VAPI_set_async_event_handler() is issued when an HCA is created
648 void async_event_handler(VAPI_hca_hndl_t hca_hndl,
649 VAPI_event_record_t *event_record_p,
653 // * event_record_p is prepared by the system when an async
655 // * what to do with private_data
656 // * do we expect more async events happened if so what are they
658 // only log ERROR message now
660 switch (event_record_p->type) {
661 case VAPI_PORT_ERROR:
662 printk("Got PORT_ERROR event. port number=%d\n",
663 event_record_p->modifier.port_num);
665 case VAPI_PORT_ACTIVE:
666 printk("Got PORT_ACTIVE event. port number=%d\n",
667 event_record_p->modifier.port_num);
669 case VAPI_QP_PATH_MIGRATED: /*QP*/
670 printk("Got P_PATH_MIGRATED event. qp_hndl=%lu\n",
671 event_record_p->modifier.qp_hndl);
673 case VAPI_EEC_PATH_MIGRATED: /*EEC*/
674 printk("Got EEC_PATH_MIGRATED event. eec_hndl=%d\n",
675 event_record_p->modifier.eec_hndl);
677 case VAPI_QP_COMM_ESTABLISHED: /*QP*/
678 printk("Got QP_COMM_ESTABLISHED event. qp_hndl=%lu\n",
679 event_record_p->modifier.qp_hndl);
681 case VAPI_EEC_COMM_ESTABLISHED: /*EEC*/
682 printk("Got EEC_COMM_ESTABLISHED event. eec_hndl=%d\n",
683 event_record_p->modifier.eec_hndl);
685 case VAPI_SEND_QUEUE_DRAINED: /*QP*/
686 printk("Got SEND_QUEUE_DRAINED event. qp_hndl=%lu\n",
687 event_record_p->modifier.qp_hndl);
689 case VAPI_CQ_ERROR: /*CQ*/
690 printk("Got CQ_ERROR event. cq_hndl=%lu\n",
691 event_record_p->modifier.cq_hndl);
693 case VAPI_LOCAL_WQ_INV_REQUEST_ERROR: /*QP*/
694 printk("Got LOCAL_WQ_INV_REQUEST_ERROR event. qp_hndl=%lu\n",
695 event_record_p->modifier.qp_hndl);
697 case VAPI_LOCAL_WQ_ACCESS_VIOL_ERROR: /*QP*/
698 printk("Got LOCAL_WQ_ACCESS_VIOL_ERROR event. qp_hndl=%lu\n",
699 event_record_p->modifier.qp_hndl);
701 case VAPI_LOCAL_WQ_CATASTROPHIC_ERROR: /*QP*/
702 printk("Got LOCAL_WQ_CATASTROPHIC_ERROR event. qp_hndl=%lu\n",
703 event_record_p->modifier.qp_hndl);
705 case VAPI_PATH_MIG_REQ_ERROR: /*QP*/
706 printk("Got PATH_MIG_REQ_ERROR event. qp_hndl=%lu\n",
707 event_record_p->modifier.qp_hndl);
709 case VAPI_LOCAL_CATASTROPHIC_ERROR: /*none*/
710 printk("Got LOCAL_CATASTROPHIC_ERROR event. \n");
713 printk(":got non-valid event type=%d. IGNORING\n",
714 event_record_p->type);
723 search_send_buf(int buf_length)
725 VAPI_wr_id_t send_id = -1;
730 CDEBUG(D_NET, "search_send_buf \n");
732 while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
733 for(i=0; i < NUM_ENTRY; i++) {
734 // problem about using spinlock
735 spin_lock(&MSB_mutex[i]);
736 if(MSbuf_list[i].status == BUF_REGISTERED) {
737 MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
739 spin_unlock(&MSB_mutex[i]);
743 spin_unlock(&MSB_mutex[i]);
747 schedule_timeout(200); // wait for a while
751 CDEBUG(D_NET, "search_send_buf: could not locate an entry in MSbuf_list\n");
754 send_id = (VAPI_wr_id_t ) i;
762 search_RDMA_recv_buf(int buf_length)
764 VAPI_wr_id_t recv_id = -1;
769 CDEBUG(D_NET, "search_RDMA_recv_buf\n");
771 while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
773 for(i=NUM_ENTRY; i < NUM_MBUF; i++) {
775 spin_lock(&MSB_mutex[i]);
777 if((MRbuf_list[i].status == BUF_REGISTERED) &&
778 (MRbuf_list[i].buf_size >= buf_length)) {
779 MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
781 spin_unlock(&MSB_mutex[i]);
785 spin_unlock(&MSB_mutex[i]);
790 schedule_timeout(200); // wait for a while
794 CERROR("search_RDMA_recv_buf: could not locate an entry in MBbuf_list\n");
797 recv_id = (VAPI_wr_id_t ) i;
809 VAPI_ret_t Send_Small_Msg(char *buf, int buf_length)
812 VAPI_sr_desc_t sr_desc;
813 VAPI_sg_lst_entry_t sr_sg;
815 VAPI_wr_id_t send_id;
817 CDEBUG(D_NET, "Send_Small_Msg\n");
819 send_id = search_send_buf(buf_length);
822 CERROR("Send_Small_Msg: Can not find a QP \n");
826 qp = &QP_list[(int) send_id];
828 // find a suitable/registered send_buf from MSbuf_list
829 CDEBUG(D_NET, "Send_Small_Msg: current send id %d \n", send_id);
831 sr_desc.opcode = VAPI_SEND;
832 sr_desc.comp_type = VAPI_SIGNALED;
833 sr_desc.id = send_id;
836 // scatter and gather info
837 sr_sg.len = buf_length;
838 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
840 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
842 // copy data to register send buffer
843 memcpy(&sr_sg.addr, buf, buf_length);
845 sr_desc.sg_lst_p = &sr_sg;
846 sr_desc.sg_lst_len = 1; // only 1 entry is used
847 sr_desc.fence = TRUE;
848 sr_desc.set_se = FALSE;
850 // call VAPI_post_sr to send out this data
851 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
853 if (vstat != VAPI_OK) {
854 CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
857 CDEBUG(D_NET, "VAPI_post_sr success.\n");
867 RTS_handshaking_protocol(int buf_length)
871 VAPI_sr_desc_t sr_desc;
872 VAPI_sg_lst_entry_t sr_sg;
873 VAPI_wr_id_t send_id;
875 RDMA_Info_Exchange rdma_info;
877 rdma_info.opcode = Ready_To_send;
878 rdma_info.buf_length = buf_length;
879 rdma_info.raddr = (VAPI_virt_addr_t) 0;
880 rdma_info.rkey = (VAPI_rkey_t) 0 ;
884 CDEBUG(D_NET, "RTS_handshaking_protocol\n");
886 // find a suitable/registered send_buf from MSbuf_list
887 send_id = search_send_buf(sizeof(RDMA_Info_Exchange));
889 qp = &QP_list[(int) send_id];
891 CDEBUG(D_NET, "RTS_CTS: current send id %d \n", send_id);
892 sr_desc.opcode = VAPI_SEND;
893 sr_desc.comp_type = VAPI_SIGNALED;
894 sr_desc.id = send_id + RDMA_RTS_ID;// this RTS mesage ID
896 // scatter and gather info
897 sr_sg.len = sizeof(RDMA_Info_Exchange);
898 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
899 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
901 // copy data to register send buffer
902 memcpy(&sr_sg.addr, &rdma_info, sizeof(RDMA_Info_Exchange));
904 sr_desc.sg_lst_p = &sr_sg;
905 sr_desc.sg_lst_len = 1; // only 1 entry is used
906 sr_desc.fence = TRUE;
907 sr_desc.set_se = FALSE;
909 // call VAPI_post_sr to send out this RTS message data
910 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
912 if (vstat != VAPI_OK) {
913 CERROR("RTS: VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
922 // create local receiving Memory Region for a HCA
924 createMemRegion_RDMA(VAPI_hca_hndl_t hca_hndl,
925 VAPI_pd_hndl_t pd_hndl,
928 VAPI_mr_hndl_t *rep_mr_hndl,
934 CDEBUG(D_NET, "createMemRegion_RDMA\n");
936 // memory region address and size of memory region
937 // allocate a block of memory for this HCA
942 // need to allcate a local buffer to receive data from a
943 // remore VAPI_RDMA_WRITE_IMM
944 PORTAL_ALLOC(bufptr, buf_length);
948 CDEBUG(D_MALLOC, "Failed to malloc a block of RDMA receiving memory, size %d\n",
953 /* Register RDAM data Memory region */
954 CDEBUG(D_NET, "Register a RDMA data memory region\n");
957 mrw.pd_hndl= pd_hndl;
958 mrw.start = (VAPI_virt_addr_t )(MT_virt_addr_t )bufptr;
959 mrw.size = buf_length;
960 mrw.acl = VAPI_EN_LOCAL_WRITE |
961 VAPI_EN_REMOTE_WRITE |
964 // register send memory region
965 vstat = VAPI_register_mr(hca_hndl,
970 // this memory region is going to be reused until deregister is called
971 if (vstat != VAPI_OK) {
972 CERROR("Failed registering a mem region Addr=%p, Len=%d. %s\n",
973 bufptr, buf_length, VAPI_strerror(vstat));
982 RDMA_Info_Exchange Local_rdma_info;
984 int insert_MRbuf_list(int buf_lenght)
986 int recv_id = NUM_ENTRY;
988 CDEBUG(D_NET, "insert_MRbuf_list\n");
990 for(recv_id= NUM_ENTRY; recv_id < NUM_MBUF; recv_id++){
991 if(BUF_UNREGISTERED == MRbuf_list[recv_id].status) {
992 MRbuf_list[recv_id].status = BUF_UNREGISTERED;
993 MRbuf_list[recv_id].buf_size = buf_lenght;
1003 CTS_handshaking_protocol(RDMA_Info_Exchange *rdma_info)
1007 VAPI_sr_desc_t sr_desc;
1008 VAPI_sg_lst_entry_t sr_sg;
1010 VAPI_wr_id_t send_id;
1011 VAPI_mr_hndl_t rep_mr_hndl;
1014 char *bufptr = NULL;
1016 // search MRbuf_list for an available entry that
1017 // has registered data buffer with size equal to rdma_info->buf_lenght
1019 CDEBUG(D_NET, "CTS_handshaking_protocol\n");
1021 // register memory buffer for RDAM operation
1023 vstat = createMemRegion_RDMA(Hca_hndl,
1026 rdma_info->buf_length,
1031 Local_rdma_info.opcode = Clear_To_send;
1032 Local_rdma_info.recv_rdma_mr = rep_mr;
1033 Local_rdma_info.recv_rdma_mr_hndl = rep_mr_hndl;
1035 if (vstat != VAPI_OK) {
1036 CERROR("CST_handshaking_protocol: Failed registering a mem region"
1037 "Len=%d. %s\n", rdma_info->buf_length, VAPI_strerror(vstat));
1038 Local_rdma_info.flag = RDMA_BUFFER_UNAVAILABLE;
1041 // successfully allcate reserved RDAM data buffer
1042 recv_id = insert_MRbuf_list(rdma_info->buf_length);
1044 if(recv_id >= NUM_ENTRY) {
1045 MRbuf_list[recv_id].buf_addr = rep_mr.start;
1046 MRbuf_list[recv_id].mr = rep_mr;
1047 MRbuf_list[recv_id].mr_hndl = rep_mr_hndl;
1048 MRbuf_list[recv_id].ref_count = 0;
1049 Local_rdma_info.flag = RDMA_BUFFER_RESERVED;
1050 Local_rdma_info.buf_length = rdma_info->buf_length;
1051 Local_rdma_info.raddr = rep_mr.start;
1052 Local_rdma_info.rkey = rep_mr.r_key;
1055 CERROR("Can not find an entry in MRbuf_list - how could this happen\n");
1059 // find a suitable/registered send_buf from MSbuf_list
1060 send_id = search_send_buf(sizeof(RDMA_Info_Exchange));
1061 CDEBUG(D_NET, "CTS: current send id %d \n", send_id);
1062 sr_desc.opcode = VAPI_SEND;
1063 sr_desc.comp_type = VAPI_SIGNALED;
1064 sr_desc.id = send_id + RDMA_CTS_ID; // this CST message ID
1066 // scatter and gather info
1067 sr_sg.len = sizeof(RDMA_Info_Exchange);
1068 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
1069 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
1071 // copy data to register send buffer
1072 memcpy(&sr_sg.addr, &Local_rdma_info, sizeof(RDMA_Info_Exchange));
1074 sr_desc.sg_lst_p = &sr_sg;
1075 sr_desc.sg_lst_len = 1; // only 1 entry is used
1076 sr_desc.fence = TRUE;
1077 sr_desc.set_se = FALSE;
1079 // call VAPI_post_sr to send out this RTS message data
1080 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1082 if (vstat != VAPI_OK) {
1083 CERROR("CTS: VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
1091 VAPI_ret_t Send_Large_Msg(char *buf, int buf_length)
1094 VAPI_sr_desc_t sr_desc;
1095 VAPI_sg_lst_entry_t sr_sg;
1098 VAPI_mr_hndl_t rep_mr_hndl;
1100 VAPI_imm_data_t imm_data = 0XAAAA5555;
1103 CDEBUG(D_NET, "Send_Large_Msg: Enter\n");
1105 // register this large buf
1106 // don't need to copy this buf to send buffer
1107 vstat = createMemRegion_RDMA(Hca_hndl,
1114 if (vstat != VAPI_OK) {
1115 CERROR("Send_Large_M\sg: createMemRegion_RDMAi() failed (%s).\n",
1116 VAPI_strerror(vstat));
1120 Local_rdma_info.send_rdma_mr = rep_mr;
1121 Local_rdma_info.send_rdma_mr_hndl = rep_mr_hndl;
1124 // Prepare descriptor for send queue
1127 // ask for a remote rdma buffer with size buf_lenght
1128 send_id = RTS_handshaking_protocol(buf_length);
1130 qp = &QP_list[send_id];
1132 // wait for CTS message receiving from remote node
1134 if(YES == Cts_Message_arrived) {
1135 // receive CST message from remote node
1136 // Rdma_info is available for use
1139 schedule_timeout(RTS_CTS_TIMEOUT);
1142 sr_desc.id = send_id + RDMA_OP_ID;
1143 sr_desc.opcode = VAPI_RDMA_WRITE_WITH_IMM;
1144 sr_desc.comp_type = VAPI_SIGNALED;
1146 // scatter and gather info
1147 sr_sg.len = buf_length;
1150 sr_sg.lkey = rep_mr.l_key;
1151 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) rep_mr.start;
1152 sr_desc.sg_lst_p = &sr_sg;
1153 sr_desc.sg_lst_len = 1; // only 1 entry is used
1155 // immediate data - not used here
1156 sr_desc.imm_data = imm_data;
1157 sr_desc.fence = TRUE;
1158 sr_desc.set_se = FALSE;
1160 // RDAM operation only
1161 // raddr and rkey is receiving from remote node
1162 sr_desc.remote_addr = Rdma_info.raddr;
1163 sr_desc.r_key = Rdma_info.rkey;
1165 // call VAPI_post_sr to send out this data
1166 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1168 if (vstat != VAPI_OK) {
1169 CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
1181 // post a used recv buffer back to recv WQE list
1182 // wrq_id is used to indicate the starting position of recv-buffer
1185 repost_recv_buf(QP_info *qp,
1186 VAPI_wr_id_t wrq_id)
1189 VAPI_sg_lst_entry_t sg_entry;
1192 CDEBUG(D_NET, "repost_recv_buf\n");
1194 sg_entry.lkey = MRbuf_list[wrq_id].mr.l_key;
1195 sg_entry.len = MRbuf_list[wrq_id].buf_size;
1196 sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[wrq_id].buf_addr;
1197 rr.opcode = VAPI_RECEIVE;
1198 rr.comp_type = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1199 rr.sg_lst_len = 1; /* single buffers */
1200 rr.sg_lst_p = &sg_entry;
1201 rr.id = wrq_id; /* WQE id used is the index to buffers ptr array */
1203 ret= VAPI_post_rr(qp->hca_hndl,qp->qp_hndl,&rr);
1205 if (ret != VAPI_OK){
1206 CERROR("failed reposting RQ WQE (%s) buffer \n",VAPI_strerror_sym(ret));
1210 CDEBUG(D_NET, "Successfully reposting an RQ WQE %d recv bufer \n", wrq_id);
1217 // post "num_o_bufs" for receiving data
1218 // each receiving buf (buffer starting address, size of buffer)
1219 // each buffer is associated with an id
1222 post_recv_bufs(VAPI_wr_id_t start_id)
1226 VAPI_sg_lst_entry_t sg_entry;
1229 CDEBUG(D_NET, "post_recv_bufs\n");
1231 for(i=0; i< NUM_ENTRY; i++) {
1232 sg_entry.lkey = MRbuf_list[i].mr.l_key;
1233 sg_entry.len = MRbuf_list[i].buf_size;
1234 sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[i].buf_addr;
1235 rr.opcode = VAPI_RECEIVE;
1236 rr.comp_type = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1237 rr.sg_lst_len = 1; /* single buffers */
1238 rr.sg_lst_p = &sg_entry;
1239 rr.id = start_id+i; /* WQE id used is the index to buffers ptr array */
1241 ret= VAPI_post_rr(QP_list[i].hca_hndl,QP_list[i].qp_hndl, &rr);
1242 if (ret != VAPI_OK) {
1243 CERROR("failed posting RQ WQE (%s)\n",VAPI_strerror_sym(ret));
1248 return i; /* num of buffers posted */
1252 post_RDMA_bufs(QP_info *qp,
1254 unsigned int num_bufs,
1255 unsigned int buf_size,
1256 VAPI_wr_id_t start_id)
1259 CDEBUG(D_NET, "post_RDMA_bufs \n");
1267 // assign function pointers to theirs corresponding entries
1270 nal_cb_t kibnal_lib = {
1271 nal_data: &kibnal_data, /* NAL private data */
1272 cb_send: kibnal_send,
1273 cb_send_pages: NULL, // not implemented
1274 cb_recv: kibnal_recv,
1275 cb_recv_pages: NULL, // not implemented
1276 cb_read: kibnal_read,
1277 cb_write: kibnal_write,
1278 cb_callback: NULL, // not implemented
1279 cb_malloc: kibnal_malloc,
1280 cb_free: kibnal_free,
1281 cb_map: NULL, // not implemented
1282 cb_unmap: NULL, // not implemented
1283 cb_map_pages: NULL, // not implemented
1284 cb_unmap_pages: NULL, // not implemented
1285 cb_printf: kibnal_printf,
1288 cb_dist: kibnal_dist // no used at this moment