1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 * This file implements the nal cb functions
30 ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
31 unsigned int niov, struct iovec *iov, size_t offset,
32 size_t mlen, size_t rlen)
35 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
38 CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
39 "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
40 libnal, private, cookie, niov, iov, offset, mlen, rlen);
43 case(GMNAL_SMALL_MESSAGE):
44 CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
45 /* HP SFS 1380: Proactively change receives to avoid a receive
46 * side occurrence of filling pkmap_count[].
48 buffer = srxd->buffer;
49 buffer += sizeof(gmnal_msghdr_t);
50 buffer += sizeof(ptl_hdr_t);
53 if (offset >= iov->iov_len) {
54 offset -= iov->iov_len;
55 } else if (offset > 0) {
56 CDEBUG(D_INFO, "processing [%p] base [%p] len %d, "
57 "offset %d, len ["LPSZ"]\n", iov,
58 iov->iov_base + offset, iov->iov_len, offset,
59 iov->iov_len - offset);
60 gm_bcopy(buffer, iov->iov_base + offset,
61 iov->iov_len - offset);
62 buffer += iov->iov_len - offset;
65 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov,
67 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
68 buffer += iov->iov_len;
72 status = gmnal_small_rx(libnal, private, cookie);
74 case(GMNAL_LARGE_MESSAGE_INIT):
75 CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
76 status = gmnal_large_rx(libnal, private, cookie, niov,
77 iov, offset, mlen, rlen);
81 CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
85 ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
86 unsigned int kniov, ptl_kiov_t *kiov, size_t offset,
87 size_t mlen, size_t rlen)
89 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
95 CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
96 "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
97 libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
99 if (srxd->type == GMNAL_SMALL_MESSAGE) {
100 buffer = srxd->buffer;
101 buffer += sizeof(gmnal_msghdr_t);
102 buffer += sizeof(ptl_hdr_t);
105 * map each page and create an iovec for it
108 /* HP SFS 1380: Proactively change receives to avoid a receive
109 * side occurrence of filling pkmap_count[].
111 CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
113 if (offset >= kiov->kiov_len) {
114 offset -= kiov->kiov_len;
116 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
117 kiov->kiov_page, kiov->kiov_len,
119 CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
120 ptr = ((char *)kmap(kiov->kiov_page)) + kiov->kiov_offset;
123 CDEBUG(D_INFO, "processing [%p] base [%p] len %d, "
124 "offset %d, len ["LPSZ"]\n", ptr,
125 ptr + offset, kiov->kiov_len, offset,
126 kiov->kiov_len - offset);
127 gm_bcopy(buffer, ptr + offset,
128 kiov->kiov_len - offset);
129 buffer += kiov->kiov_len - offset;
132 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", ptr,
134 gm_bcopy(buffer, ptr, kiov->kiov_len);
135 buffer += kiov->kiov_len;
137 kunmap(kiov->kiov_page);
138 CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
142 CDEBUG(D_INFO, "calling gmnal_small_rx\n");
143 status = gmnal_small_rx(libnal, private, cookie);
147 CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
152 ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
153 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
154 unsigned int niov, struct iovec *iov, size_t offset, size_t len)
157 gmnal_data_t *nal_data;
159 gmnal_stxd_t *stxd = NULL;
162 CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ"] nid["LPU64"]\n",
163 niov, offset, len, nid);
164 nal_data = libnal->libnal_data;
166 CDEBUG(D_ERROR, "no nal_data\n");
169 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
172 if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
173 CDEBUG(D_INFO, "This is a small message send\n");
175 * HP SFS 1380: With the change to gmnal_small_tx, need to get the stxd
176 * and do relevant setup here
178 stxd = gmnal_get_stxd(nal_data, 1);
179 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
180 /* Set the offset of the data to copy into the buffer */
181 buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
183 if (offset >= iov->iov_len) {
184 offset -= iov->iov_len;
185 } else if (offset > 0) {
186 CDEBUG(D_INFO, "processing iov [%p] base [%p] len ["LPSZ"] to [%p]\n",
187 iov, iov->iov_base + offset, iov->iov_len - offset, buffer);
188 gm_bcopy(iov->iov_base + offset, buffer, iov->iov_len - offset);
189 buffer+= iov->iov_len - offset;
192 CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n",
193 iov, iov->iov_len, buffer);
194 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
195 buffer+= iov->iov_len;
199 gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
202 CDEBUG(D_ERROR, "Large message send is not supported\n");
203 lib_finalize(libnal, private, cookie, PTL_FAIL);
205 gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
206 niov, iov, offset, len);
211 ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
212 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
213 unsigned int kniov, ptl_kiov_t *kiov, size_t offset, size_t len)
216 gmnal_data_t *nal_data;
219 gmnal_stxd_t *stxd = NULL;
220 ptl_err_t status = PTL_OK;
222 CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["LPSZ"] len["LPSZ"]\n",
223 nid, kniov, offset, len);
224 nal_data = libnal->libnal_data;
226 CDEBUG(D_ERROR, "no nal_data\n");
229 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
232 /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
233 * more aggressively. This is the fix for a livelock situation under load
234 * on ia32 that occurs when there are no more available entries in the
235 * pkmap_count array. Just fill the buffer and let gmnal_small_tx
236 * put the headers in after we pass it the stxd pointer.
238 stxd = gmnal_get_stxd(nal_data, 1);
239 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
240 /* Set the offset of the data to copy into the buffer */
241 buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
243 if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
244 CDEBUG(D_INFO, "This is a small message send\n");
247 CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
248 if (offset >= kiov->kiov_len) {
249 offset -= kiov->kiov_len;
251 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
252 kiov->kiov_page, kiov->kiov_len,
255 ptr = ((char *)kmap(kiov->kiov_page)) + kiov->kiov_offset;
258 CDEBUG(D_INFO, "processing [%p] base [%p] len ["LPSZ"] to [%p]\n",
259 ptr, ptr + offset, kiov->kiov_len - offset, buffer);
260 gm_bcopy(ptr + offset, buffer, kiov->kiov_len - offset);
261 buffer+= kiov->kiov_len - offset;
264 CDEBUG(D_INFO, "processing kmapped [%p] len ["LPSZ"] to [%p]\n",
265 ptr, kiov->kiov_len, buffer);
266 gm_bcopy(ptr, buffer, kiov->kiov_len);
268 buffer += kiov->kiov_len;
270 kunmap(kiov->kiov_page);
274 status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
278 struct iovec *iovec = NULL, *iovec_dup = NULL;
279 ptl_kiov_t *kiov_dup = kiov;
281 PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
283 CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
284 PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
286 for (i=0; i<kniov; i++) {
287 CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
288 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
289 kiov->kiov_page, kiov->kiov_len,
292 iovec->iov_base = kmap(kiov->kiov_page)
294 iovec->iov_len = kiov->kiov_len;
298 gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
299 pid, kniov, iovec, offset, len);
300 for (i=0; i<kniov; i++) {
301 kunmap(kiov_dup->kiov_page);
304 PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
309 int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
311 CDEBUG(D_TRACE, "gmnal_cb_dist\n");