Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  *
33  * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 /*
41  *      This file implements the nal cb functions
42  */
43
44
45 #include "gmlnd.h"
46
47 int
48 gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
49            int delayed, unsigned int niov, 
50            struct iovec *iov, lnet_kiov_t *kiov,
51            unsigned int offset, unsigned int mlen, unsigned int rlen)
52 {
53         gmnal_ni_t      *gmni = ni->ni_data;
54         gmnal_rx_t      *rx = (gmnal_rx_t*)private;
55         gmnal_msg_t     *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
56         int              npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
57         int              payload_offset = offsetof(gmnal_msg_t, 
58                                               gmm_u.immediate.gmim_payload[0]);
59         int              nob = payload_offset + mlen;
60
61         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
62         LASSERT (iov == NULL || kiov == NULL);
63
64         if (rx->rx_recv_nob < nob) {
65                 CERROR("Short message from nid %s: got %d, need %d\n",
66                        libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
67                 gmnal_post_rx(gmni, rx);
68                 return -EIO;
69         }
70
71         if (kiov != NULL)
72                 lnet_copy_kiov2kiov(niov, kiov, offset,
73                                     npages, rx->rx_buf.nb_kiov, payload_offset, 
74                                     mlen);
75         else
76                 lnet_copy_kiov2iov(niov, iov, offset,
77                                    npages, rx->rx_buf.nb_kiov, payload_offset,
78                                    mlen);
79
80         lnet_finalize(ni, lntmsg, 0);
81         gmnal_post_rx(gmni, rx);
82         return 0;
83 }
84
85 int
86 gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
87 {
88         lnet_hdr_t       *hdr= &lntmsg->msg_hdr;
89         int               type = lntmsg->msg_type;
90         lnet_process_id_t target = lntmsg->msg_target;
91         unsigned int      niov = lntmsg->msg_niov;
92         struct iovec     *iov = lntmsg->msg_iov;
93         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
94         unsigned int      offset = lntmsg->msg_offset;
95         unsigned int      len = lntmsg->msg_len;
96         gmnal_ni_t       *gmni = ni->ni_data;
97         gm_status_t       gmrc;
98         gmnal_tx_t       *tx;
99
100         LASSERT (iov == NULL || kiov == NULL);
101
102         /* I may not block for a tx if I'm responding to an incoming message */
103         tx = gmnal_get_tx(gmni);
104         if (tx == NULL) {
105                 if (!gmni->gmni_shutdown)
106                         CERROR ("Can't get tx for msg type %d for %s\n",
107                                 type, libcfs_nid2str(target.nid));
108                 return -EIO;
109         }
110
111         tx->tx_nid = target.nid;
112
113         gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
114                                        &tx->tx_gmlid);
115         if (gmrc != GM_SUCCESS) {
116                 CERROR("Can't map Nid %s to a GM local ID: %d\n", 
117                        libcfs_nid2str(target.nid), gmrc);
118                 /* NB tx_lntmsg not set => doesn't finalize */
119                 gmnal_tx_done(tx, -EIO);
120                 return -EIO;
121         }
122
123         gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), 
124                        target.nid, GMNAL_MSG_IMMEDIATE);
125         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
126         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
127
128         if (the_lnet.ln_testprotocompat != 0) {
129                 /* single-shot proto test */
130                 LNET_LOCK();
131                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
132                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
133                         the_lnet.ln_testprotocompat &= ~1;
134                 }
135                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
136                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
137                                 LNET_PROTO_MAGIC;
138                         the_lnet.ln_testprotocompat &= ~2;
139                 }
140                 LNET_UNLOCK();
141         }
142
143         if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
144                 /* whole message fits in tx_buf */
145                 char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
146
147                 if (iov != NULL)
148                         lnet_copy_iov2flat(len, buffer, 0,
149                                            niov, iov, offset, len);
150                 else
151                         lnet_copy_kiov2flat(len, buffer, 0,
152                                             niov, kiov, offset, len);
153
154                 tx->tx_msgnob += len;
155                 tx->tx_large_nob = 0;
156         } else {
157                 /* stash payload pts to copy later */
158                 tx->tx_large_nob = len;
159                 tx->tx_large_iskiov = (kiov != NULL);
160                 tx->tx_large_niov = niov;
161                 if (tx->tx_large_iskiov)
162                         tx->tx_large_frags.kiov = kiov;
163                 else
164                         tx->tx_large_frags.iov = iov;
165         }
166
167         LASSERT(tx->tx_lntmsg == NULL);
168         tx->tx_lntmsg = lntmsg;
169
170         spin_lock(&gmni->gmni_tx_lock);
171
172         list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
173         gmnal_check_txqueues_locked(gmni);
174
175         spin_unlock(&gmni->gmni_tx_lock);
176
177         return 0;
178 }