1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Based on ksocknal and qswnal
6 * Author: Hsing-bung Chen <hbchen@lanl.gov>
8 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
10 * Portals is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Portals is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Portals; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
30 RDMA_Info_Exchange Rdma_nfo;
31 int Cts_Msg_Arrived = NO;
35 * LIB functions follow
40 // copy a block of data from scr_addr to dst_addr
41 // it all happens in kernel space - dst_addr and src_addr
43 // original definition is to read a block od data from a
44 // specified user address
48 int kibnal_read (nal_cb_t *nal,
54 CDEBUG(D_NET, "kibnal_read: 0x%Lx: reading %ld bytes from %p -> %p\n",
55 nal->ni.nid, (long)len, src_addr, dst_addr );
57 memcpy( dst_addr, src_addr, len );
63 // it seems that read and write are doing the same thing
64 // because they all happen in kernel space
65 // why do we need two functions like read and write
66 // to make PORTALS API compatable
71 // copy a block of data from scr_addr to dst_addr
72 // it all happens in kernel space - dst_addr and src_addr
74 // original definition is to write a block od data to a
75 // specified user address
79 int kibnal_write(nal_cb_t *nal,
85 CDEBUG(D_NET, "kibnal_write: 0x%Lx: writing %ld bytes from %p -> %p\n",
86 nal->ni.nid, (long)len, src_addr, dst_addr );
89 memcpy( dst_addr, src_addr, len );
97 // either vmalloc or kmalloc is used
98 // dynamically allocate a block of memory based on the size of buffer
102 void * kibnal_malloc(nal_cb_t *nal, size_t length)
106 // PORTAL_ALLOC will do the job
107 // allocate a buffer with size "length"
108 PORTAL_ALLOC(buffer, length);
115 // release a dynamically allocated memory pointed by buffer pointer
119 void kibnal_free(nal_cb_t *nal, void *buffer, size_t length)
122 // release allocated buffer to system
124 PORTAL_FREE(buffer, length);
129 // because evernthing is in kernel space (LUSTRE)
130 // there is no need to mark a piece of user memory as no longer in use by
135 void kibnal_invalidate(nal_cb_t *nal,
141 CDEBUG(D_NET, "kibnal_invalidate: 0x%Lx: invalidating %p : %d\n",
142 nal->ni.nid, base, extent);
149 // because everything is in kernel space (LUSTRE)
150 // there is no need to mark a piece of user memory in use by
155 int kibnal_validate(nal_cb_t *nal,
161 CDEBUG(D_NET, "kibnal_validate: 0x%Lx: validating %p : %d\n",
162 nal->ni.nid, base, extent);
169 // log messages from kernel space
174 void kibnal_printf(nal_cb_t *nal, const char *fmt, ...)
179 if (portal_debug & D_NET) {
181 vsnprintf( msg, sizeof(msg), fmt, ap );
184 printk("CPUId: %d %s",smp_processor_id(), msg);
190 // use spin_lock to lock protected area such as MD, ME...
191 // so a process can enter a protected area and do some works
192 // this won't physicall disable interrup but use a software
193 // spin-lock to control some protected areas
197 void kibnal_cli(nal_cb_t *nal, unsigned long *flags)
199 kibnal_data_t *data= nal->nal_data;
201 CDEBUG(D_NET, "kibnal_cli \n");
203 spin_lock_irqsave(&data->kib_dispatch_lock,*flags);
209 // use spin_lock to unlock protected area such as MD, ME...
210 // this won't physicall enable interrup but use a software
211 // spin-lock to control some protected areas
215 void kibnal_sti(nal_cb_t *nal, unsigned long *flags)
217 kibnal_data_t *data= nal->nal_data;
219 CDEBUG(D_NET, "kibnal_sti \n");
221 spin_unlock_irqrestore(&data->kib_dispatch_lock,*flags);
229 // network distance doesn't mean much for this nal
230 // here we only indicate
231 // 0 - operation is happened on the same node
232 // 1 - operation is happened on different nodes
233 // router will handle the data routing
237 int kibnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
239 CDEBUG(D_NET, "kibnal_dist \n");
241 if ( nal->ni.nid == nid ) {
248 return 0; // always retrun 0
253 // This is the cb_send() on IB based interconnect system
254 // prepare a data package and use VAPI_post_sr() to send it
255 // down-link out-going message
260 kibnal_send(nal_cb_t *nal,
274 unsigned long buf_length = sizeof(ptl_hdr_t) + len;
275 int expected_buf_size = 0;
278 PROF_START(kibnal_send); // time stamp send start
280 CDEBUG(D_NET,"kibnal_send: sending %d bytes from %p to nid: 0x%Lx pid %d\n",
281 buf_length, iov, nid, HCA_PORT_1);
284 // do I need to check the gateway information
285 // do I have problem to send direct
286 // do I have to forward a data packet to gateway
288 // The current connection is back-to-back
289 // I always know that data will be send from one-side to
294 // check data buffer size
302 if(buf_length <= SMALL_MSG_SIZE) {
303 expected_buf_size = MSG_SIZE_SMALL;
306 if(buf_length > MAX_MSG_SIZE) {
307 CERROR("kibnal_send:request exceeds Transmit data size (%d).\n",
313 expected_buf_size = MSG_SIZE_LARGE; // this is a large data package
317 // prepare data packet for send operation
319 // allocate a data buffer "buf" with size of buf_len(header + payload)
321 // buf | hdr | size = sizeof(ptl_hdr_t)
323 // |payload data | size = len
326 // copy header to buf
327 memcpy(buf, hdr, sizeof(ptl_hdr_t));
329 // copy payload data from iov to buf
330 // use portals library function lib_copy_iov2buf()
333 lib_copy_iov2buf(((char *)buf) + sizeof (ptl_hdr_t),
338 // buf is ready to do a post send
339 // the send method is base on the buf_size
341 CDEBUG(D_NET,"ib_send %d bytes (size %d) from %p to nid: 0x%Lx "
342 " port %d\n", buf_length, expected_buf_size, iov, nid, HCA_PORT_1);
344 switch(expected_buf_size) {
346 // send small message
347 if((vstat = Send_Small_Msg(buf, buf_length)) != VAPI_OK){
348 CERROR("Send_Small_Msg() is failed\n");
353 // send small message
354 if((vstat = Send_Large_Msg(buf, buf_length)) != VAPI_OK){
355 CERROR("Send_Large_Msg() is failed\n");
360 CERROR("Unknown message size %d\n", expected_buf_size);
364 PROF_FINISH(kibnal_send); // time stapm of send operation
378 int kibnal_send_pages(nal_cb_t * nal,
391 CDEBUG(D_NET, "kibnal_send_pages\n");
393 // do nothing now for Infiniband
409 void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
411 CDEBUG(D_NET, "forwarding not implemented\n");
423 int kibnal_callback(nal_cb_t * nal,
428 CDEBUG(D_NET, "callback not implemented\n");
433 /* Process a received portals packet */
435 // conver receiving data in to PORTALS header
438 void kibnal_rx(kibnal_data_t *kib,
439 VAPI_virt_addr_t buffer_addr,
440 u_int32_t buffer_len,
441 u_int32_t buffer_size,
442 unsigned int priority)
444 ptl_hdr_t *hdr = (ptl_hdr_t *) buffer_addr; // case to ptl header format
447 CDEBUG(D_NET,"kibnal_rx: buf %p, len %ld\n", buffer_addr, buffer_len);
449 if ( buffer_len < sizeof( ptl_hdr_t ) ) {
450 /* XXX what's this for? */
451 if (kib->kib_shuttingdown)
453 CERROR("kibnal_rx: did not receive complete portal header, "
454 "len= %ld", buffer_len);
460 // char *krx_buffer; // pointer to receiving buffer
461 // unsigned long krx_len; // length of buffer
462 // unsigned int krx_size; //
463 // unsigned int krx_priority; // do we need this
464 // struct list_head krx_item;
467 krx.krx_buffer = hdr;
468 krx.krx_len = buffer_len;
469 krx.krx_size = buffer_size;
470 krx.krx_priority = priority;
472 if ( hdr->dest_nid == kibnal_lib.ni.nid ) {
474 PROF_START(lib_parse);
476 lib_parse(&kibnal_lib, (ptl_hdr_t *)krx.krx_buffer, &krx);
478 PROF_FINISH(lib_parse);
480 /* forward to gateway */
481 // Do we expect this happened ?
483 CERROR("kibnal_rx: forwarding not implemented yet");
500 kibnal_recv_pages(nal_cb_t * nal,
509 CDEBUG(D_NET, "recv_pages not implemented\n");
516 kibnal_recv(nal_cb_t *nal,
524 kibnal_rx_t *krx = private;
526 CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen);
528 /* What was actually received must be >= what sender claims to
529 * have sent. This is an LASSERT, since lib-move doesn't
530 * check cb return code yet. */
531 LASSERT (krx->krx_len >= sizeof (ptl_hdr_t) + rlen);
532 LASSERT (mlen <= rlen);
534 PROF_START(kibnal_recv);
538 lib_copy_buf2iov (niov, iov, krx->krx_buffer +
539 sizeof (ptl_hdr_t), mlen);
543 PROF_START(lib_finalize);
545 lib_finalize(nal, private, cookie);
547 PROF_FINISH(lib_finalize);
548 PROF_FINISH(kibnal_recv);
558 int kibnal_map(nal_cb_t * nal,
563 CDEBUG(D_NET, "map not implemented\n");
576 void kibnal_unmap(nal_cb_t * nal,
581 CDEBUG(D_NET, "unmap not implemented\n");
591 /* as (un)map, but with a set of page fragments */
592 int kibnal_map_pages(nal_cb_t * nal,
597 CDEBUG(D_NET, "map_pages not implemented\n");
604 // kibnal_unmap_pages
610 void kibnal_unmap_pages(nal_cb_t * nal,
615 CDEBUG(D_NET, "unmap_pages not implemented\n");
620 int kibnal_end(kibnal_data_t *kib)
623 /* wait for sends to finish ? */
624 /* remove receive buffers */
625 /* shutdown receive thread */
627 CDEBUG(D_NET, "kibnal_end\n");
636 // asynchronous event handler: response to some unexpetced operation errors
638 // void async_event_handler(VAPI_hca_hndl_t hca_hndl,
639 // VAPI_event_record_t *event_record_p,
640 // void* private_data)
641 // the HCA drive will prepare evetn_record_p
643 // this handler is registered with VAPI_set_async_event_handler()
644 // VAPI_set_async_event_handler() is issued when an HCA is created
647 void async_event_handler(VAPI_hca_hndl_t hca_hndl,
648 VAPI_event_record_t *event_record_p,
652 // * event_record_p is prepared by the system when an async
654 // * what to do with private_data
655 // * do we expect more async events happened if so what are they
657 // only log ERROR message now
659 switch (event_record_p->type) {
660 case VAPI_PORT_ERROR:
661 printk("Got PORT_ERROR event. port number=%d\n",
662 event_record_p->modifier.port_num);
664 case VAPI_PORT_ACTIVE:
665 printk("Got PORT_ACTIVE event. port number=%d\n",
666 event_record_p->modifier.port_num);
668 case VAPI_QP_PATH_MIGRATED: /*QP*/
669 printk("Got P_PATH_MIGRATED event. qp_hndl=%lu\n",
670 event_record_p->modifier.qp_hndl);
672 case VAPI_EEC_PATH_MIGRATED: /*EEC*/
673 printk("Got EEC_PATH_MIGRATED event. eec_hndl=%d\n",
674 event_record_p->modifier.eec_hndl);
676 case VAPI_QP_COMM_ESTABLISHED: /*QP*/
677 printk("Got QP_COMM_ESTABLISHED event. qp_hndl=%lu\n",
678 event_record_p->modifier.qp_hndl);
680 case VAPI_EEC_COMM_ESTABLISHED: /*EEC*/
681 printk("Got EEC_COMM_ESTABLISHED event. eec_hndl=%d\n",
682 event_record_p->modifier.eec_hndl);
684 case VAPI_SEND_QUEUE_DRAINED: /*QP*/
685 printk("Got SEND_QUEUE_DRAINED event. qp_hndl=%lu\n",
686 event_record_p->modifier.qp_hndl);
688 case VAPI_CQ_ERROR: /*CQ*/
689 printk("Got CQ_ERROR event. cq_hndl=%lu\n",
690 event_record_p->modifier.cq_hndl);
692 case VAPI_LOCAL_WQ_INV_REQUEST_ERROR: /*QP*/
693 printk("Got LOCAL_WQ_INV_REQUEST_ERROR event. qp_hndl=%lu\n",
694 event_record_p->modifier.qp_hndl);
696 case VAPI_LOCAL_WQ_ACCESS_VIOL_ERROR: /*QP*/
697 printk("Got LOCAL_WQ_ACCESS_VIOL_ERROR event. qp_hndl=%lu\n",
698 event_record_p->modifier.qp_hndl);
700 case VAPI_LOCAL_WQ_CATASTROPHIC_ERROR: /*QP*/
701 printk("Got LOCAL_WQ_CATASTROPHIC_ERROR event. qp_hndl=%lu\n",
702 event_record_p->modifier.qp_hndl);
704 case VAPI_PATH_MIG_REQ_ERROR: /*QP*/
705 printk("Got PATH_MIG_REQ_ERROR event. qp_hndl=%lu\n",
706 event_record_p->modifier.qp_hndl);
708 case VAPI_LOCAL_CATASTROPHIC_ERROR: /*none*/
709 printk("Got LOCAL_CATASTROPHIC_ERROR event. \n");
712 printk(":got non-valid event type=%d. IGNORING\n",
713 event_record_p->type);
722 search_send_buf(int buf_length)
724 VAPI_wr_id_t send_id = -1;
729 CDEBUG(D_NET, "search_send_buf \n");
731 while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
732 for(i=0; i < NUM_ENTRY; i++) {
733 // problem about using spinlock
734 spin_lock(&MSB_mutex[i]);
735 if(MSbuf_list[i].status == BUF_REGISTERED) {
736 MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
738 spin_unlock(&MSB_mutex[i]);
742 spin_unlock(&MSB_mutex[i]);
746 schedule_timeout(200); // wait for a while
750 CDEBUG(D_NET, "search_send_buf: could not locate an entry in MSbuf_list\n");
753 send_id = (VAPI_wr_id_t ) i;
761 search_RDMA_recv_buf(int buf_length)
763 VAPI_wr_id_t recv_id = -1;
768 CDEBUG(D_NET, "search_RDMA_recv_buf\n");
770 while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
772 for(i=NUM_ENTRY; i < NUM_MBUF; i++) {
774 spin_lock(&MSB_mutex[i]);
776 if((MRbuf_list[i].status == BUF_REGISTERED) &&
777 (MRbuf_list[i].buf_size >= buf_length)) {
778 MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
780 spin_unlock(&MSB_mutex[i]);
784 spin_unlock(&MSB_mutex[i]);
789 schedule_timeout(200); // wait for a while
793 CERROR("search_RDMA_recv_buf: could not locate an entry in MBbuf_list\n");
796 recv_id = (VAPI_wr_id_t ) i;
808 VAPI_ret_t Send_Small_Msg(char *buf, int buf_length)
811 VAPI_sr_desc_t sr_desc;
812 VAPI_sg_lst_entry_t sr_sg;
814 VAPI_wr_id_t send_id;
816 CDEBUG(D_NET, "Send_Small_Msg\n");
818 send_id = search_send_buf(buf_length);
821 CERROR("Send_Small_Msg: Can not find a QP \n");
825 qp = &QP_list[(int) send_id];
827 // find a suitable/registered send_buf from MSbuf_list
828 CDEBUG(D_NET, "Send_Small_Msg: current send id %d \n", send_id);
830 sr_desc.opcode = VAPI_SEND;
831 sr_desc.comp_type = VAPI_SIGNALED;
832 sr_desc.id = send_id;
835 // scatter and gather info
836 sr_sg.len = buf_length;
837 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
839 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
841 // copy data to register send buffer
842 memcpy(&sr_sg.addr, buf, buf_length);
844 sr_desc.sg_lst_p = &sr_sg;
845 sr_desc.sg_lst_len = 1; // only 1 entry is used
846 sr_desc.fence = TRUE;
847 sr_desc.set_se = FALSE;
849 // call VAPI_post_sr to send out this data
850 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
852 if (vstat != VAPI_OK) {
853 CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
856 CDEBUG(D_NET, "VAPI_post_sr success.\n");
866 RTS_handshaking_protocol(int buf_length)
870 VAPI_sr_desc_t sr_desc;
871 VAPI_sg_lst_entry_t sr_sg;
872 VAPI_wr_id_t send_id;
874 RDMA_Info_Exchange rdma_info;
876 rdma_info.opcode = Ready_To_send;
877 rdma_info.buf_length = buf_length;
878 rdma_info.raddr = (VAPI_virt_addr_t) 0;
879 rdma_info.rkey = (VAPI_rkey_t) 0 ;
883 CDEBUG(D_NET, "RTS_handshaking_protocol\n");
885 // find a suitable/registered send_buf from MSbuf_list
886 send_id = search_send_buf(sizeof(RDMA_Info_Exchange));
888 qp = &QP_list[(int) send_id];
890 CDEBUG(D_NET, "RTS_CTS: current send id %d \n", send_id);
891 sr_desc.opcode = VAPI_SEND;
892 sr_desc.comp_type = VAPI_SIGNALED;
893 sr_desc.id = send_id + RDMA_RTS_ID;// this RTS mesage ID
895 // scatter and gather info
896 sr_sg.len = sizeof(RDMA_Info_Exchange);
897 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
898 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
900 // copy data to register send buffer
901 memcpy(&sr_sg.addr, &rdma_info, sizeof(RDMA_Info_Exchange));
903 sr_desc.sg_lst_p = &sr_sg;
904 sr_desc.sg_lst_len = 1; // only 1 entry is used
905 sr_desc.fence = TRUE;
906 sr_desc.set_se = FALSE;
908 // call VAPI_post_sr to send out this RTS message data
909 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
911 if (vstat != VAPI_OK) {
912 CERROR("RTS: VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
921 // create local receiving Memory Region for a HCA
923 createMemRegion_RDMA(VAPI_hca_hndl_t hca_hndl,
924 VAPI_pd_hndl_t pd_hndl,
927 VAPI_mr_hndl_t *rep_mr_hndl,
933 CDEBUG(D_NET, "createMemRegion_RDMA\n");
935 // memory region address and size of memory region
936 // allocate a block of memory for this HCA
941 // need to allcate a local buffer to receive data from a
942 // remore VAPI_RDMA_WRITE_IMM
943 PORTAL_ALLOC(bufptr, buf_length);
947 CDEBUG(D_MALLOC, "Failed to malloc a block of RDMA receiving memory, size %d\n",
952 /* Register RDAM data Memory region */
953 CDEBUG(D_NET, "Register a RDMA data memory region\n");
956 mrw.pd_hndl= pd_hndl;
957 mrw.start = (VAPI_virt_addr_t )(MT_virt_addr_t )bufptr;
958 mrw.size = buf_length;
959 mrw.acl = VAPI_EN_LOCAL_WRITE |
960 VAPI_EN_REMOTE_WRITE |
963 // register send memory region
964 vstat = VAPI_register_mr(hca_hndl,
969 // this memory region is going to be reused until deregister is called
970 if (vstat != VAPI_OK) {
971 CERROR("Failed registering a mem region Addr=%p, Len=%d. %s\n",
972 bufptr, buf_length, VAPI_strerror(vstat));
981 RDMA_Info_Exchange Local_rdma_info;
983 int insert_MRbuf_list(int buf_lenght)
985 int recv_id = NUM_ENTRY;
987 CDEBUG(D_NET, "insert_MRbuf_list\n");
989 for(recv_id= NUM_ENTRY; recv_id < NUM_MBUF; recv_id++){
990 if(BUF_UNREGISTERED == MRbuf_list[recv_id].status) {
991 MRbuf_list[recv_id].status = BUF_UNREGISTERED;
992 MRbuf_list[recv_id].buf_size = buf_lenght;
1002 CTS_handshaking_protocol(RDMA_Info_Exchange *rdma_info)
1006 VAPI_sr_desc_t sr_desc;
1007 VAPI_sg_lst_entry_t sr_sg;
1009 VAPI_wr_id_t send_id;
1010 VAPI_mr_hndl_t rep_mr_hndl;
1013 char *bufptr = NULL;
1015 // search MRbuf_list for an available entry that
1016 // has registered data buffer with size equal to rdma_info->buf_lenght
1018 CDEBUG(D_NET, "CTS_handshaking_protocol\n");
1020 // register memory buffer for RDAM operation
1022 vstat = createMemRegion_RDMA(Hca_hndl,
1025 rdma_info->buf_length,
1030 Local_rdma_info.opcode = Clear_To_send;
1031 Local_rdma_info.recv_rdma_mr = rep_mr;
1032 Local_rdma_info.recv_rdma_mr_hndl = rep_mr_hndl;
1034 if (vstat != VAPI_OK) {
1035 CERROR("CST_handshaking_protocol: Failed registering a mem region"
1036 "Len=%d. %s\n", rdma_info->buf_length, VAPI_strerror(vstat));
1037 Local_rdma_info.flag = RDMA_BUFFER_UNAVAILABLE;
1040 // successfully allcate reserved RDAM data buffer
1041 recv_id = insert_MRbuf_list(rdma_info->buf_length);
1043 if(recv_id >= NUM_ENTRY) {
1044 MRbuf_list[recv_id].buf_addr = rep_mr.start;
1045 MRbuf_list[recv_id].mr = rep_mr;
1046 MRbuf_list[recv_id].mr_hndl = rep_mr_hndl;
1047 MRbuf_list[recv_id].ref_count = 0;
1048 Local_rdma_info.flag = RDMA_BUFFER_RESERVED;
1049 Local_rdma_info.buf_length = rdma_info->buf_length;
1050 Local_rdma_info.raddr = rep_mr.start;
1051 Local_rdma_info.rkey = rep_mr.r_key;
1054 CERROR("Can not find an entry in MRbuf_list - how could this happen\n");
1058 // find a suitable/registered send_buf from MSbuf_list
1059 send_id = search_send_buf(sizeof(RDMA_Info_Exchange));
1060 CDEBUG(D_NET, "CTS: current send id %d \n", send_id);
1061 sr_desc.opcode = VAPI_SEND;
1062 sr_desc.comp_type = VAPI_SIGNALED;
1063 sr_desc.id = send_id + RDMA_CTS_ID; // this CST message ID
1065 // scatter and gather info
1066 sr_sg.len = sizeof(RDMA_Info_Exchange);
1067 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
1068 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
1070 // copy data to register send buffer
1071 memcpy(&sr_sg.addr, &Local_rdma_info, sizeof(RDMA_Info_Exchange));
1073 sr_desc.sg_lst_p = &sr_sg;
1074 sr_desc.sg_lst_len = 1; // only 1 entry is used
1075 sr_desc.fence = TRUE;
1076 sr_desc.set_se = FALSE;
1078 // call VAPI_post_sr to send out this RTS message data
1079 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1081 if (vstat != VAPI_OK) {
1082 CERROR("CTS: VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
1090 VAPI_ret_t Send_Large_Msg(char *buf, int buf_length)
1093 VAPI_sr_desc_t sr_desc;
1094 VAPI_sg_lst_entry_t sr_sg;
1097 VAPI_mr_hndl_t rep_mr_hndl;
1099 VAPI_imm_data_t imm_data = 0XAAAA5555;
1102 CDEBUG(D_NET, "Send_Large_Msg: Enter\n");
1104 // register this large buf
1105 // don't need to copy this buf to send buffer
1106 vstat = createMemRegion_RDMA(Hca_hndl,
1113 if (vstat != VAPI_OK) {
1114 CERROR("Send_Large_M\sg: createMemRegion_RDMAi() failed (%s).\n",
1115 VAPI_strerror(vstat));
1119 Local_rdma_info.send_rdma_mr = rep_mr;
1120 Local_rdma_info.send_rdma_mr_hndl = rep_mr_hndl;
1123 // Prepare descriptor for send queue
1126 // ask for a remote rdma buffer with size buf_lenght
1127 send_id = RTS_handshaking_protocol(buf_length);
1129 qp = &QP_list[send_id];
1131 // wait for CTS message receiving from remote node
1133 if(YES == Cts_Message_arrived) {
1134 // receive CST message from remote node
1135 // Rdma_info is available for use
1138 schedule_timeout(RTS_CTS_TIMEOUT);
1141 sr_desc.id = send_id + RDMA_OP_ID;
1142 sr_desc.opcode = VAPI_RDMA_WRITE_WITH_IMM;
1143 sr_desc.comp_type = VAPI_SIGNALED;
1145 // scatter and gather info
1146 sr_sg.len = buf_length;
1149 sr_sg.lkey = rep_mr.l_key;
1150 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) rep_mr.start;
1151 sr_desc.sg_lst_p = &sr_sg;
1152 sr_desc.sg_lst_len = 1; // only 1 entry is used
1154 // immediate data - not used here
1155 sr_desc.imm_data = imm_data;
1156 sr_desc.fence = TRUE;
1157 sr_desc.set_se = FALSE;
1159 // RDAM operation only
1160 // raddr and rkey is receiving from remote node
1161 sr_desc.remote_addr = Rdma_info.raddr;
1162 sr_desc.r_key = Rdma_info.rkey;
1164 // call VAPI_post_sr to send out this data
1165 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1167 if (vstat != VAPI_OK) {
1168 CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
1180 // post a used recv buffer back to recv WQE list
1181 // wrq_id is used to indicate the starting position of recv-buffer
1184 repost_recv_buf(QP_info *qp,
1185 VAPI_wr_id_t wrq_id)
1188 VAPI_sg_lst_entry_t sg_entry;
1191 CDEBUG(D_NET, "repost_recv_buf\n");
1193 sg_entry.lkey = MRbuf_list[wrq_id].mr.l_key;
1194 sg_entry.len = MRbuf_list[wrq_id].buf_size;
1195 sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[wrq_id].buf_addr;
1196 rr.opcode = VAPI_RECEIVE;
1197 rr.comp_type = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1198 rr.sg_lst_len = 1; /* single buffers */
1199 rr.sg_lst_p = &sg_entry;
1200 rr.id = wrq_id; /* WQE id used is the index to buffers ptr array */
1202 ret= VAPI_post_rr(qp->hca_hndl,qp->qp_hndl,&rr);
1204 if (ret != VAPI_OK){
1205 CERROR("failed reposting RQ WQE (%s) buffer \n",VAPI_strerror_sym(ret));
1209 CDEBUG(D_NET, "Successfully reposting an RQ WQE %d recv bufer \n", wrq_id);
1216 // post "num_o_bufs" for receiving data
1217 // each receiving buf (buffer starting address, size of buffer)
1218 // each buffer is associated with an id
1221 post_recv_bufs(VAPI_wr_id_t start_id)
1225 VAPI_sg_lst_entry_t sg_entry;
1228 CDEBUG(D_NET, "post_recv_bufs\n");
1230 for(i=0; i< NUM_ENTRY; i++) {
1231 sg_entry.lkey = MRbuf_list[i].mr.l_key;
1232 sg_entry.len = MRbuf_list[i].buf_size;
1233 sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[i].buf_addr;
1234 rr.opcode = VAPI_RECEIVE;
1235 rr.comp_type = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1236 rr.sg_lst_len = 1; /* single buffers */
1237 rr.sg_lst_p = &sg_entry;
1238 rr.id = start_id+i; /* WQE id used is the index to buffers ptr array */
1240 ret= VAPI_post_rr(QP_list[i].hca_hndl,QP_list[i].qp_hndl, &rr);
1241 if (ret != VAPI_OK) {
1242 CERROR("failed posting RQ WQE (%s)\n",VAPI_strerror_sym(ret));
1247 return i; /* num of buffers posted */
1251 post_RDMA_bufs(QP_info *qp,
1253 unsigned int num_bufs,
1254 unsigned int buf_size,
1255 VAPI_wr_id_t start_id)
1258 CDEBUG(D_NET, "post_RDMA_bufs \n");
1266 // assign function pointers to theirs corresponding entries
1269 nal_cb_t kibnal_lib = {
1270 nal_data: &kibnal_data, /* NAL private data */
1271 cb_send: kibnal_send,
1272 cb_send_pages: NULL, // not implemented
1273 cb_recv: kibnal_recv,
1274 cb_recv_pages: NULL, // not implemented
1275 cb_read: kibnal_read,
1276 cb_write: kibnal_write,
1277 cb_callback: NULL, // not implemented
1278 cb_malloc: kibnal_malloc,
1279 cb_free: kibnal_free,
1280 cb_map: NULL, // not implemented
1281 cb_unmap: NULL, // not implemented
1282 cb_map_pages: NULL, // not implemented
1283 cb_unmap_pages: NULL, // not implemented
1284 cb_printf: kibnal_printf,
1287 cb_dist: kibnal_dist // no used at this moment