1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Based on ksocknal and qswnal
6 * Author: Hsing-bung Chen <hbchen@lanl.gov>
8 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
10 * Portals is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Portals is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Portals; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
30 RDMA_Info_Exchange Rdma_nfo;
31 int Cts_Msg_Arrived = NO;
35 * LIB functions follow
40 // copy a block of data from scr_addr to dst_addr
41 // it all happens in kernel space - dst_addr and src_addr
43 // original definition is to read a block od data from a
44 // specified user address
48 int kibnal_read (nal_cb_t *nal,
54 CDEBUG(D_NET, "kibnal_read: 0x%Lx: reading %ld bytes from %p -> %p\n",
55 nal->ni.nid, (long)len, src_addr, dst_addr );
57 memcpy( dst_addr, src_addr, len );
63 // it seems that read and write are doing the same thing
64 // because they all happen in kernel space
65 // why do we need two functions like read and write
66 // to make PORTALS API compatable
71 // copy a block of data from scr_addr to dst_addr
72 // it all happens in kernel space - dst_addr and src_addr
74 // original definition is to write a block od data to a
75 // specified user address
79 int kibnal_write(nal_cb_t *nal,
85 CDEBUG(D_NET, "kibnal_write: 0x%Lx: writing %ld bytes from %p -> %p\n",
86 nal->ni.nid, (long)len, src_addr, dst_addr );
89 memcpy( dst_addr, src_addr, len );
97 // either vmalloc or kmalloc is used
98 // dynamically allocate a block of memory based on the size of buffer
102 void * kibnal_malloc(nal_cb_t *nal, size_t length)
106 // PORTAL_ALLOC will do the job
107 // allocate a buffer with size "length"
108 PORTAL_ALLOC(buffer, length);
115 // release a dynamically allocated memory pointed by buffer pointer
119 void kibnal_free(nal_cb_t *nal, void *buffer, size_t length)
122 // release allocated buffer to system
124 PORTAL_FREE(buffer, length);
129 // because evernthing is in kernel space (LUSTRE)
130 // there is no need to mark a piece of user memory as no longer in use by
135 void kibnal_invalidate(nal_cb_t *nal,
141 CDEBUG(D_NET, "kibnal_invalidate: 0x%Lx: invalidating %p : %d\n",
142 nal->ni.nid, base, extent);
149 // because everything is in kernel space (LUSTRE)
150 // there is no need to mark a piece of user memory in use by
155 int kibnal_validate(nal_cb_t *nal,
161 CDEBUG(D_NET, "kibnal_validate: 0x%Lx: validating %p : %d\n",
162 nal->ni.nid, base, extent);
169 // log messages from kernel space
174 void kibnal_printf(nal_cb_t *nal, const char *fmt, ...)
179 if (portal_debug & D_NET) {
181 vsnprintf( msg, sizeof(msg), fmt, ap );
184 printk("CPUId: %d %s",smp_processor_id(), msg);
190 // use spin_lock to lock protected area such as MD, ME...
191 // so a process can enter a protected area and do some works
192 // this won't physicall disable interrup but use a software
193 // spin-lock to control some protected areas
197 void kibnal_cli(nal_cb_t *nal, unsigned long *flags)
199 kibnal_data_t *data= nal->nal_data;
201 CDEBUG(D_NET, "kibnal_cli \n");
203 spin_lock_irqsave(&data->kib_dispatch_lock,*flags);
209 // use spin_lock to unlock protected area such as MD, ME...
210 // this won't physicall enable interrup but use a software
211 // spin-lock to control some protected areas
215 void kibnal_sti(nal_cb_t *nal, unsigned long *flags)
217 kibnal_data_t *data= nal->nal_data;
219 CDEBUG(D_NET, "kibnal_sti \n");
221 spin_unlock_irqrestore(&data->kib_dispatch_lock,*flags);
225 // A new event has just been created
227 void kibnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
229 /* holding kib_dispatch_lock */
231 if (eq->event_callback != NULL)
232 eq->event_callback(ev);
234 /* We will wake theads sleeping in yield() here, AFTER the
235 * callback, when we implement blocking yield */
241 // network distance doesn't mean much for this nal
242 // here we only indicate
243 // 0 - operation is happened on the same node
244 // 1 - operation is happened on different nodes
245 // router will handle the data routing
249 int kibnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
251 CDEBUG(D_NET, "kibnal_dist \n");
253 if ( nal->ni.nid == nid ) {
260 return 0; // always retrun 0
265 // This is the cb_send() on IB based interconnect system
266 // prepare a data package and use VAPI_post_sr() to send it
267 // down-link out-going message
272 kibnal_send(nal_cb_t *nal,
286 unsigned long buf_length = sizeof(ptl_hdr_t) + len;
287 int expected_buf_size = 0;
290 PROF_START(kibnal_send); // time stamp send start
292 CDEBUG(D_NET,"kibnal_send: sending %d bytes from %p to nid: 0x%Lx pid %d\n",
293 buf_length, iov, nid, HCA_PORT_1);
296 // do I need to check the gateway information
297 // do I have problem to send direct
298 // do I have to forward a data packet to gateway
300 // The current connection is back-to-back
301 // I always know that data will be send from one-side to
306 // check data buffer size
314 if(buf_length <= SMALL_MSG_SIZE) {
315 expected_buf_size = MSG_SIZE_SMALL;
318 if(buf_length > MAX_MSG_SIZE) {
319 CERROR("kibnal_send:request exceeds Transmit data size (%d).\n",
325 expected_buf_size = MSG_SIZE_LARGE; // this is a large data package
329 // prepare data packet for send operation
331 // allocate a data buffer "buf" with size of buf_len(header + payload)
333 // buf | hdr | size = sizeof(ptl_hdr_t)
335 // |payload data | size = len
338 // copy header to buf
339 memcpy(buf, hdr, sizeof(ptl_hdr_t));
341 // copy payload data from iov to buf
342 // use portals library function lib_copy_iov2buf()
345 lib_copy_iov2buf(((char *)buf) + sizeof (ptl_hdr_t),
350 // buf is ready to do a post send
351 // the send method is base on the buf_size
353 CDEBUG(D_NET,"ib_send %d bytes (size %d) from %p to nid: 0x%Lx "
354 " port %d\n", buf_length, expected_buf_size, iov, nid, HCA_PORT_1);
356 switch(expected_buf_size) {
358 // send small message
359 if((vstat = Send_Small_Msg(buf, buf_length)) != VAPI_OK){
360 CERROR("Send_Small_Msg() is failed\n");
365 // send small message
366 if((vstat = Send_Large_Msg(buf, buf_length)) != VAPI_OK){
367 CERROR("Send_Large_Msg() is failed\n");
372 CERROR("Unknown message size %d\n", expected_buf_size);
376 PROF_FINISH(kibnal_send); // time stapm of send operation
390 int kibnal_send_pages(nal_cb_t * nal,
403 CDEBUG(D_NET, "kibnal_send_pages\n");
405 // do nothing now for Infiniband
421 void kibnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
423 CDEBUG(D_NET, "forwarding not implemented\n");
435 void kibnal_callback(nal_cb_t * nal,
440 CDEBUG(D_NET, "callback not implemented\n");
445 /* Process a received portals packet */
447 // conver receiving data in to PORTALS header
450 void kibnal_rx(kibnal_data_t *kib,
451 VAPI_virt_addr_t buffer_addr,
452 u_int32_t buffer_len,
453 u_int32_t buffer_size,
454 unsigned int priority)
456 ptl_hdr_t *hdr = (ptl_hdr_t *) buffer_addr; // case to ptl header format
459 CDEBUG(D_NET,"kibnal_rx: buf %p, len %ld\n", buffer_addr, buffer_len);
461 if ( buffer_len < sizeof( ptl_hdr_t ) ) {
462 /* XXX what's this for? */
463 if (kib->kib_shuttingdown)
465 CERROR("kibnal_rx: did not receive complete portal header, "
466 "len= %ld", buffer_len);
472 // char *krx_buffer; // pointer to receiving buffer
473 // unsigned long krx_len; // length of buffer
474 // unsigned int krx_size; //
475 // unsigned int krx_priority; // do we need this
476 // struct list_head krx_item;
479 krx.krx_buffer = hdr;
480 krx.krx_len = buffer_len;
481 krx.krx_size = buffer_size;
482 krx.krx_priority = priority;
484 if ( hdr->dest_nid == kibnal_lib.ni.nid ) {
486 PROF_START(lib_parse);
488 lib_parse(&kibnal_lib, (ptl_hdr_t *)krx.krx_buffer, &krx);
490 PROF_FINISH(lib_parse);
492 /* forward to gateway */
493 // Do we expect this happened ?
495 CERROR("kibnal_rx: forwarding not implemented yet");
512 kibnal_recv_pages(nal_cb_t * nal,
521 CDEBUG(D_NET, "recv_pages not implemented\n");
528 kibnal_recv(nal_cb_t *nal,
536 kibnal_rx_t *krx = private;
538 CDEBUG(D_NET,"kibnal_recv: mlen=%d, rlen=%d\n", mlen, rlen);
540 /* What was actually received must be >= what sender claims to
542 LASSERT (mlen <= rlen);
544 if (krx->krx_len < sizeof (ptl_hdr_t) + rlen)
547 PROF_START(kibnal_recv);
551 lib_copy_buf2iov (niov, iov, krx->krx_buffer +
552 sizeof (ptl_hdr_t), mlen);
556 PROF_START(lib_finalize);
558 lib_finalize(nal, private, cookie, PTL_OK);
560 PROF_FINISH(lib_finalize);
561 PROF_FINISH(kibnal_recv);
571 int kibnal_map(nal_cb_t * nal,
576 CDEBUG(D_NET, "map not implemented\n");
589 void kibnal_unmap(nal_cb_t * nal,
594 CDEBUG(D_NET, "unmap not implemented\n");
604 /* as (un)map, but with a set of page fragments */
605 int kibnal_map_pages(nal_cb_t * nal,
610 CDEBUG(D_NET, "map_pages not implemented\n");
617 // kibnal_unmap_pages
623 void kibnal_unmap_pages(nal_cb_t * nal,
628 CDEBUG(D_NET, "unmap_pages not implemented\n");
633 int kibnal_end(kibnal_data_t *kib)
636 /* wait for sends to finish ? */
637 /* remove receive buffers */
638 /* shutdown receive thread */
640 CDEBUG(D_NET, "kibnal_end\n");
649 // asynchronous event handler: response to some unexpetced operation errors
651 // void async_event_handler(VAPI_hca_hndl_t hca_hndl,
652 // VAPI_event_record_t *event_record_p,
653 // void* private_data)
654 // the HCA drive will prepare evetn_record_p
656 // this handler is registered with VAPI_set_async_event_handler()
657 // VAPI_set_async_event_handler() is issued when an HCA is created
660 void async_event_handler(VAPI_hca_hndl_t hca_hndl,
661 VAPI_event_record_t *event_record_p,
665 // * event_record_p is prepared by the system when an async
667 // * what to do with private_data
668 // * do we expect more async events happened if so what are they
670 // only log ERROR message now
672 switch (event_record_p->type) {
673 case VAPI_PORT_ERROR:
674 printk("Got PORT_ERROR event. port number=%d\n",
675 event_record_p->modifier.port_num);
677 case VAPI_PORT_ACTIVE:
678 printk("Got PORT_ACTIVE event. port number=%d\n",
679 event_record_p->modifier.port_num);
681 case VAPI_QP_PATH_MIGRATED: /*QP*/
682 printk("Got P_PATH_MIGRATED event. qp_hndl=%lu\n",
683 event_record_p->modifier.qp_hndl);
685 case VAPI_EEC_PATH_MIGRATED: /*EEC*/
686 printk("Got EEC_PATH_MIGRATED event. eec_hndl=%d\n",
687 event_record_p->modifier.eec_hndl);
689 case VAPI_QP_COMM_ESTABLISHED: /*QP*/
690 printk("Got QP_COMM_ESTABLISHED event. qp_hndl=%lu\n",
691 event_record_p->modifier.qp_hndl);
693 case VAPI_EEC_COMM_ESTABLISHED: /*EEC*/
694 printk("Got EEC_COMM_ESTABLISHED event. eec_hndl=%d\n",
695 event_record_p->modifier.eec_hndl);
697 case VAPI_SEND_QUEUE_DRAINED: /*QP*/
698 printk("Got SEND_QUEUE_DRAINED event. qp_hndl=%lu\n",
699 event_record_p->modifier.qp_hndl);
701 case VAPI_CQ_ERROR: /*CQ*/
702 printk("Got CQ_ERROR event. cq_hndl=%lu\n",
703 event_record_p->modifier.cq_hndl);
705 case VAPI_LOCAL_WQ_INV_REQUEST_ERROR: /*QP*/
706 printk("Got LOCAL_WQ_INV_REQUEST_ERROR event. qp_hndl=%lu\n",
707 event_record_p->modifier.qp_hndl);
709 case VAPI_LOCAL_WQ_ACCESS_VIOL_ERROR: /*QP*/
710 printk("Got LOCAL_WQ_ACCESS_VIOL_ERROR event. qp_hndl=%lu\n",
711 event_record_p->modifier.qp_hndl);
713 case VAPI_LOCAL_WQ_CATASTROPHIC_ERROR: /*QP*/
714 printk("Got LOCAL_WQ_CATASTROPHIC_ERROR event. qp_hndl=%lu\n",
715 event_record_p->modifier.qp_hndl);
717 case VAPI_PATH_MIG_REQ_ERROR: /*QP*/
718 printk("Got PATH_MIG_REQ_ERROR event. qp_hndl=%lu\n",
719 event_record_p->modifier.qp_hndl);
721 case VAPI_LOCAL_CATASTROPHIC_ERROR: /*none*/
722 printk("Got LOCAL_CATASTROPHIC_ERROR event. \n");
725 printk(":got non-valid event type=%d. IGNORING\n",
726 event_record_p->type);
735 search_send_buf(int buf_length)
737 VAPI_wr_id_t send_id = -1;
742 CDEBUG(D_NET, "search_send_buf \n");
744 while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
745 for(i=0; i < NUM_ENTRY; i++) {
746 // problem about using spinlock
747 spin_lock(&MSB_mutex[i]);
748 if(MSbuf_list[i].status == BUF_REGISTERED) {
749 MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
751 spin_unlock(&MSB_mutex[i]);
755 spin_unlock(&MSB_mutex[i]);
759 schedule_timeout(200); // wait for a while
763 CDEBUG(D_NET, "search_send_buf: could not locate an entry in MSbuf_list\n");
766 send_id = (VAPI_wr_id_t ) i;
774 search_RDMA_recv_buf(int buf_length)
776 VAPI_wr_id_t recv_id = -1;
781 CDEBUG(D_NET, "search_RDMA_recv_buf\n");
783 while((flag == NO) && (loop_count < MAX_LOOP_COUNT)) {
785 for(i=NUM_ENTRY; i < NUM_MBUF; i++) {
787 spin_lock(&MSB_mutex[i]);
789 if((MRbuf_list[i].status == BUF_REGISTERED) &&
790 (MRbuf_list[i].buf_size >= buf_length)) {
791 MSbuf_list[i].status = BUF_INUSE;// make send buf as inuse
793 spin_unlock(&MSB_mutex[i]);
797 spin_unlock(&MSB_mutex[i]);
802 schedule_timeout(200); // wait for a while
806 CERROR("search_RDMA_recv_buf: could not locate an entry in MBbuf_list\n");
809 recv_id = (VAPI_wr_id_t ) i;
821 VAPI_ret_t Send_Small_Msg(char *buf, int buf_length)
824 VAPI_sr_desc_t sr_desc;
825 VAPI_sg_lst_entry_t sr_sg;
827 VAPI_wr_id_t send_id;
829 CDEBUG(D_NET, "Send_Small_Msg\n");
831 send_id = search_send_buf(buf_length);
834 CERROR("Send_Small_Msg: Can not find a QP \n");
838 qp = &QP_list[(int) send_id];
840 // find a suitable/registered send_buf from MSbuf_list
841 CDEBUG(D_NET, "Send_Small_Msg: current send id %d \n", send_id);
843 sr_desc.opcode = VAPI_SEND;
844 sr_desc.comp_type = VAPI_SIGNALED;
845 sr_desc.id = send_id;
848 // scatter and gather info
849 sr_sg.len = buf_length;
850 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
852 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
854 // copy data to register send buffer
855 memcpy(&sr_sg.addr, buf, buf_length);
857 sr_desc.sg_lst_p = &sr_sg;
858 sr_desc.sg_lst_len = 1; // only 1 entry is used
859 sr_desc.fence = TRUE;
860 sr_desc.set_se = FALSE;
862 // call VAPI_post_sr to send out this data
863 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
865 if (vstat != VAPI_OK) {
866 CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
869 CDEBUG(D_NET, "VAPI_post_sr success.\n");
879 RTS_handshaking_protocol(int buf_length)
883 VAPI_sr_desc_t sr_desc;
884 VAPI_sg_lst_entry_t sr_sg;
885 VAPI_wr_id_t send_id;
887 RDMA_Info_Exchange rdma_info;
889 rdma_info.opcode = Ready_To_send;
890 rdma_info.buf_length = buf_length;
891 rdma_info.raddr = (VAPI_virt_addr_t) 0;
892 rdma_info.rkey = (VAPI_rkey_t) 0 ;
896 CDEBUG(D_NET, "RTS_handshaking_protocol\n");
898 // find a suitable/registered send_buf from MSbuf_list
899 send_id = search_send_buf(sizeof(RDMA_Info_Exchange));
901 qp = &QP_list[(int) send_id];
903 CDEBUG(D_NET, "RTS_CTS: current send id %d \n", send_id);
904 sr_desc.opcode = VAPI_SEND;
905 sr_desc.comp_type = VAPI_SIGNALED;
906 sr_desc.id = send_id + RDMA_RTS_ID;// this RTS mesage ID
908 // scatter and gather info
909 sr_sg.len = sizeof(RDMA_Info_Exchange);
910 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
911 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
913 // copy data to register send buffer
914 memcpy(&sr_sg.addr, &rdma_info, sizeof(RDMA_Info_Exchange));
916 sr_desc.sg_lst_p = &sr_sg;
917 sr_desc.sg_lst_len = 1; // only 1 entry is used
918 sr_desc.fence = TRUE;
919 sr_desc.set_se = FALSE;
921 // call VAPI_post_sr to send out this RTS message data
922 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
924 if (vstat != VAPI_OK) {
925 CERROR("RTS: VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
934 // create local receiving Memory Region for a HCA
936 createMemRegion_RDMA(VAPI_hca_hndl_t hca_hndl,
937 VAPI_pd_hndl_t pd_hndl,
940 VAPI_mr_hndl_t *rep_mr_hndl,
946 CDEBUG(D_NET, "createMemRegion_RDMA\n");
948 // memory region address and size of memory region
949 // allocate a block of memory for this HCA
954 // need to allcate a local buffer to receive data from a
955 // remore VAPI_RDMA_WRITE_IMM
956 PORTAL_ALLOC(bufptr, buf_length);
960 CDEBUG(D_MALLOC, "Failed to malloc a block of RDMA receiving memory, size %d\n",
965 /* Register RDAM data Memory region */
966 CDEBUG(D_NET, "Register a RDMA data memory region\n");
969 mrw.pd_hndl= pd_hndl;
970 mrw.start = (VAPI_virt_addr_t )(MT_virt_addr_t )bufptr;
971 mrw.size = buf_length;
972 mrw.acl = VAPI_EN_LOCAL_WRITE |
973 VAPI_EN_REMOTE_WRITE |
976 // register send memory region
977 vstat = VAPI_register_mr(hca_hndl,
982 // this memory region is going to be reused until deregister is called
983 if (vstat != VAPI_OK) {
984 CERROR("Failed registering a mem region Addr=%p, Len=%d. %s\n",
985 bufptr, buf_length, VAPI_strerror(vstat));
994 RDMA_Info_Exchange Local_rdma_info;
996 int insert_MRbuf_list(int buf_lenght)
998 int recv_id = NUM_ENTRY;
1000 CDEBUG(D_NET, "insert_MRbuf_list\n");
1002 for(recv_id= NUM_ENTRY; recv_id < NUM_MBUF; recv_id++){
1003 if(BUF_UNREGISTERED == MRbuf_list[recv_id].status) {
1004 MRbuf_list[recv_id].status = BUF_UNREGISTERED;
1005 MRbuf_list[recv_id].buf_size = buf_lenght;
1015 CTS_handshaking_protocol(RDMA_Info_Exchange *rdma_info)
1019 VAPI_sr_desc_t sr_desc;
1020 VAPI_sg_lst_entry_t sr_sg;
1022 VAPI_wr_id_t send_id;
1023 VAPI_mr_hndl_t rep_mr_hndl;
1026 char *bufptr = NULL;
1028 // search MRbuf_list for an available entry that
1029 // has registered data buffer with size equal to rdma_info->buf_lenght
1031 CDEBUG(D_NET, "CTS_handshaking_protocol\n");
1033 // register memory buffer for RDAM operation
1035 vstat = createMemRegion_RDMA(Hca_hndl,
1038 rdma_info->buf_length,
1043 Local_rdma_info.opcode = Clear_To_send;
1044 Local_rdma_info.recv_rdma_mr = rep_mr;
1045 Local_rdma_info.recv_rdma_mr_hndl = rep_mr_hndl;
1047 if (vstat != VAPI_OK) {
1048 CERROR("CST_handshaking_protocol: Failed registering a mem region"
1049 "Len=%d. %s\n", rdma_info->buf_length, VAPI_strerror(vstat));
1050 Local_rdma_info.flag = RDMA_BUFFER_UNAVAILABLE;
1053 // successfully allcate reserved RDAM data buffer
1054 recv_id = insert_MRbuf_list(rdma_info->buf_length);
1056 if(recv_id >= NUM_ENTRY) {
1057 MRbuf_list[recv_id].buf_addr = rep_mr.start;
1058 MRbuf_list[recv_id].mr = rep_mr;
1059 MRbuf_list[recv_id].mr_hndl = rep_mr_hndl;
1060 MRbuf_list[recv_id].ref_count = 0;
1061 Local_rdma_info.flag = RDMA_BUFFER_RESERVED;
1062 Local_rdma_info.buf_length = rdma_info->buf_length;
1063 Local_rdma_info.raddr = rep_mr.start;
1064 Local_rdma_info.rkey = rep_mr.r_key;
1067 CERROR("Can not find an entry in MRbuf_list - how could this happen\n");
1071 // find a suitable/registered send_buf from MSbuf_list
1072 send_id = search_send_buf(sizeof(RDMA_Info_Exchange));
1073 CDEBUG(D_NET, "CTS: current send id %d \n", send_id);
1074 sr_desc.opcode = VAPI_SEND;
1075 sr_desc.comp_type = VAPI_SIGNALED;
1076 sr_desc.id = send_id + RDMA_CTS_ID; // this CST message ID
1078 // scatter and gather info
1079 sr_sg.len = sizeof(RDMA_Info_Exchange);
1080 sr_sg.lkey = MSbuf_list[send_id].mr.l_key; // use send MR
1081 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MSbuf_list[send_id].buf_addr;
1083 // copy data to register send buffer
1084 memcpy(&sr_sg.addr, &Local_rdma_info, sizeof(RDMA_Info_Exchange));
1086 sr_desc.sg_lst_p = &sr_sg;
1087 sr_desc.sg_lst_len = 1; // only 1 entry is used
1088 sr_desc.fence = TRUE;
1089 sr_desc.set_se = FALSE;
1091 // call VAPI_post_sr to send out this RTS message data
1092 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1094 if (vstat != VAPI_OK) {
1095 CERROR("CTS: VAPI_post_sr failed (%s).\n",VAPI_strerror(vstat));
1103 VAPI_ret_t Send_Large_Msg(char *buf, int buf_length)
1106 VAPI_sr_desc_t sr_desc;
1107 VAPI_sg_lst_entry_t sr_sg;
1110 VAPI_mr_hndl_t rep_mr_hndl;
1112 VAPI_imm_data_t imm_data = 0XAAAA5555;
1115 CDEBUG(D_NET, "Send_Large_Msg: Enter\n");
1117 // register this large buf
1118 // don't need to copy this buf to send buffer
1119 vstat = createMemRegion_RDMA(Hca_hndl,
1126 if (vstat != VAPI_OK) {
1127 CERROR("Send_Large_M\sg: createMemRegion_RDMAi() failed (%s).\n",
1128 VAPI_strerror(vstat));
1132 Local_rdma_info.send_rdma_mr = rep_mr;
1133 Local_rdma_info.send_rdma_mr_hndl = rep_mr_hndl;
1136 // Prepare descriptor for send queue
1139 // ask for a remote rdma buffer with size buf_lenght
1140 send_id = RTS_handshaking_protocol(buf_length);
1142 qp = &QP_list[send_id];
1144 // wait for CTS message receiving from remote node
1146 if(YES == Cts_Message_arrived) {
1147 // receive CST message from remote node
1148 // Rdma_info is available for use
1151 schedule_timeout(RTS_CTS_TIMEOUT);
1154 sr_desc.id = send_id + RDMA_OP_ID;
1155 sr_desc.opcode = VAPI_RDMA_WRITE_WITH_IMM;
1156 sr_desc.comp_type = VAPI_SIGNALED;
1158 // scatter and gather info
1159 sr_sg.len = buf_length;
1162 sr_sg.lkey = rep_mr.l_key;
1163 sr_sg.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) rep_mr.start;
1164 sr_desc.sg_lst_p = &sr_sg;
1165 sr_desc.sg_lst_len = 1; // only 1 entry is used
1167 // immediate data - not used here
1168 sr_desc.imm_data = imm_data;
1169 sr_desc.fence = TRUE;
1170 sr_desc.set_se = FALSE;
1172 // RDAM operation only
1173 // raddr and rkey is receiving from remote node
1174 sr_desc.remote_addr = Rdma_info.raddr;
1175 sr_desc.r_key = Rdma_info.rkey;
1177 // call VAPI_post_sr to send out this data
1178 vstat = VAPI_post_sr(qp->hca_hndl, qp->qp_hndl, &sr_desc);
1180 if (vstat != VAPI_OK) {
1181 CERROR("VAPI_post_sr failed (%s).\n",VAPI_strerror_sym(vstat));
1193 // post a used recv buffer back to recv WQE list
1194 // wrq_id is used to indicate the starting position of recv-buffer
1197 repost_recv_buf(QP_info *qp,
1198 VAPI_wr_id_t wrq_id)
1201 VAPI_sg_lst_entry_t sg_entry;
1204 CDEBUG(D_NET, "repost_recv_buf\n");
1206 sg_entry.lkey = MRbuf_list[wrq_id].mr.l_key;
1207 sg_entry.len = MRbuf_list[wrq_id].buf_size;
1208 sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[wrq_id].buf_addr;
1209 rr.opcode = VAPI_RECEIVE;
1210 rr.comp_type = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1211 rr.sg_lst_len = 1; /* single buffers */
1212 rr.sg_lst_p = &sg_entry;
1213 rr.id = wrq_id; /* WQE id used is the index to buffers ptr array */
1215 ret= VAPI_post_rr(qp->hca_hndl,qp->qp_hndl,&rr);
1217 if (ret != VAPI_OK){
1218 CERROR("failed reposting RQ WQE (%s) buffer \n",VAPI_strerror_sym(ret));
1222 CDEBUG(D_NET, "Successfully reposting an RQ WQE %d recv bufer \n", wrq_id);
1229 // post "num_o_bufs" for receiving data
1230 // each receiving buf (buffer starting address, size of buffer)
1231 // each buffer is associated with an id
1234 post_recv_bufs(VAPI_wr_id_t start_id)
1238 VAPI_sg_lst_entry_t sg_entry;
1241 CDEBUG(D_NET, "post_recv_bufs\n");
1243 for(i=0; i< NUM_ENTRY; i++) {
1244 sg_entry.lkey = MRbuf_list[i].mr.l_key;
1245 sg_entry.len = MRbuf_list[i].buf_size;
1246 sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t) MRbuf_list[i].buf_addr;
1247 rr.opcode = VAPI_RECEIVE;
1248 rr.comp_type = VAPI_SIGNALED; /* All with CQE (IB compliant) */
1249 rr.sg_lst_len = 1; /* single buffers */
1250 rr.sg_lst_p = &sg_entry;
1251 rr.id = start_id+i; /* WQE id used is the index to buffers ptr array */
1253 ret= VAPI_post_rr(QP_list[i].hca_hndl,QP_list[i].qp_hndl, &rr);
1254 if (ret != VAPI_OK) {
1255 CERROR("failed posting RQ WQE (%s)\n",VAPI_strerror_sym(ret));
1260 return i; /* num of buffers posted */
1264 post_RDMA_bufs(QP_info *qp,
1266 unsigned int num_bufs,
1267 unsigned int buf_size,
1268 VAPI_wr_id_t start_id)
1271 CDEBUG(D_NET, "post_RDMA_bufs \n");
1279 // assign function pointers to theirs corresponding entries
1282 nal_cb_t kibnal_lib = {
1283 nal_data: &kibnal_data, /* NAL private data */
1284 cb_send: kibnal_send,
1285 cb_send_pages: NULL, // not implemented
1286 cb_recv: kibnal_recv,
1287 cb_recv_pages: NULL, // not implemented
1288 cb_read: kibnal_read,
1289 cb_write: kibnal_write,
1290 cb_callback: NULL, // not implemented
1291 cb_malloc: kibnal_malloc,
1292 cb_free: kibnal_free,
1293 cb_map: NULL, // not implemented
1294 cb_unmap: NULL, // not implemented
1295 cb_map_pages: NULL, // not implemented
1296 cb_unmap_pages: NULL, // not implemented
1297 cb_printf: kibnal_printf,
1300 cb_callback: kibnal_callback,
1301 cb_dist: kibnal_dist // no used at this moment