Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  */
38
39 /*
40  *      This file implements the nal cb functions
41  */
42
43
44 #include "gmlnd.h"
45
46 int
47 gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
48            int delayed, unsigned int niov, 
49            struct iovec *iov, lnet_kiov_t *kiov,
50            unsigned int offset, unsigned int mlen, unsigned int rlen)
51 {
52         gmnal_ni_t      *gmni = ni->ni_data;
53         gmnal_rx_t      *rx = (gmnal_rx_t*)private;
54         gmnal_msg_t     *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
55         int              npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
56         int              payload_offset = offsetof(gmnal_msg_t, 
57                                               gmm_u.immediate.gmim_payload[0]);
58         int              nob = payload_offset + mlen;
59
60         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
61         LASSERT (iov == NULL || kiov == NULL);
62
63         if (rx->rx_recv_nob < nob) {
64                 CERROR("Short message from nid %s: got %d, need %d\n",
65                        libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
66                 gmnal_post_rx(gmni, rx);
67                 return -EIO;
68         }
69
70         if (kiov != NULL)
71                 lnet_copy_kiov2kiov(niov, kiov, offset,
72                                     npages, rx->rx_buf.nb_kiov, payload_offset, 
73                                     mlen);
74         else
75                 lnet_copy_kiov2iov(niov, iov, offset,
76                                    npages, rx->rx_buf.nb_kiov, payload_offset,
77                                    mlen);
78
79         lnet_finalize(ni, lntmsg, 0);
80         gmnal_post_rx(gmni, rx);
81         return 0;
82 }
83
84 int
85 gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
86 {
87         lnet_hdr_t       *hdr= &lntmsg->msg_hdr;
88         int               type = lntmsg->msg_type;
89         lnet_process_id_t target = lntmsg->msg_target;
90         unsigned int      niov = lntmsg->msg_niov;
91         struct iovec     *iov = lntmsg->msg_iov;
92         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
93         unsigned int      offset = lntmsg->msg_offset;
94         unsigned int      len = lntmsg->msg_len;
95         gmnal_ni_t       *gmni = ni->ni_data;
96         gm_status_t       gmrc;
97         gmnal_tx_t       *tx;
98
99         LASSERT (iov == NULL || kiov == NULL);
100
101         /* I may not block for a tx if I'm responding to an incoming message */
102         tx = gmnal_get_tx(gmni);
103         if (tx == NULL) {
104                 if (!gmni->gmni_shutdown)
105                         CERROR ("Can't get tx for msg type %d for %s\n",
106                                 type, libcfs_nid2str(target.nid));
107                 return -EIO;
108         }
109
110         tx->tx_nid = target.nid;
111
112         gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
113                                        &tx->tx_gmlid);
114         if (gmrc != GM_SUCCESS) {
115                 CERROR("Can't map Nid %s to a GM local ID: %d\n", 
116                        libcfs_nid2str(target.nid), gmrc);
117                 /* NB tx_lntmsg not set => doesn't finalize */
118                 gmnal_tx_done(tx, -EIO);
119                 return -EIO;
120         }
121
122         gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), 
123                        target.nid, GMNAL_MSG_IMMEDIATE);
124         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
125         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
126
127         if (the_lnet.ln_testprotocompat != 0) {
128                 /* single-shot proto test */
129                 LNET_LOCK();
130                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
131                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
132                         the_lnet.ln_testprotocompat &= ~1;
133                 }
134                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
135                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
136                                 LNET_PROTO_MAGIC;
137                         the_lnet.ln_testprotocompat &= ~2;
138                 }
139                 LNET_UNLOCK();
140         }
141
142         if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
143                 /* whole message fits in tx_buf */
144                 char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
145
146                 if (iov != NULL)
147                         lnet_copy_iov2flat(len, buffer, 0,
148                                            niov, iov, offset, len);
149                 else
150                         lnet_copy_kiov2flat(len, buffer, 0,
151                                             niov, kiov, offset, len);
152
153                 tx->tx_msgnob += len;
154                 tx->tx_large_nob = 0;
155         } else {
156                 /* stash payload pts to copy later */
157                 tx->tx_large_nob = len;
158                 tx->tx_large_iskiov = (kiov != NULL);
159                 tx->tx_large_niov = niov;
160                 if (tx->tx_large_iskiov)
161                         tx->tx_large_frags.kiov = kiov;
162                 else
163                         tx->tx_large_frags.iov = iov;
164         }
165
166         LASSERT(tx->tx_lntmsg == NULL);
167         tx->tx_lntmsg = lntmsg;
168
169         spin_lock(&gmni->gmni_tx_lock);
170
171         list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
172         gmnal_check_txqueues_locked(gmni);
173
174         spin_unlock(&gmni->gmni_tx_lock);
175
176         return 0;
177 }