1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 * This file implements the nal cb functions
30 ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
31 unsigned int niov, struct iovec *iov, size_t offset,
32 size_t mlen, size_t rlen)
35 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
38 CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
39 "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
40 libnal, private, cookie, niov, iov, offset, mlen, rlen);
43 case(GMNAL_SMALL_MESSAGE):
44 CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
45 /* HP SFS 1380: Proactively change receives to avoid a receive
46 * side occurrence of filling pkmap_count[].
48 buffer = srxd->buffer;
49 buffer += sizeof(gmnal_msghdr_t);
50 buffer += sizeof(ptl_hdr_t);
53 if (offset >= iov->iov_len) {
54 offset -= iov->iov_len;
55 } else if (offset > 0) {
56 CDEBUG(D_INFO, "processing [%p] base [%p] "
57 "len %d, offset %d, len ["LPSZ"]\n", iov,
58 iov->iov_base + offset, iov->iov_len,
59 offset, iov->iov_len - offset);
60 gm_bcopy(buffer, iov->iov_base + offset,
61 iov->iov_len - offset);
62 buffer += iov->iov_len - offset;
65 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n",
67 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
68 buffer += iov->iov_len;
72 status = gmnal_small_rx(libnal, private, cookie);
74 case(GMNAL_LARGE_MESSAGE_INIT):
75 CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
76 status = gmnal_large_rx(libnal, private, cookie, niov,
77 iov, offset, mlen, rlen);
80 CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
84 ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
85 lib_msg_t *cookie, unsigned int kniov,
86 ptl_kiov_t *kiov, size_t offset, size_t mlen,
89 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
95 CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
96 "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
97 libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
99 if (srxd->type == GMNAL_SMALL_MESSAGE) {
100 buffer = srxd->buffer;
101 buffer += sizeof(gmnal_msghdr_t);
102 buffer += sizeof(ptl_hdr_t);
105 * map each page and create an iovec for it
108 /* HP SFS 1380: Proactively change receives to avoid a
109 * receive side occurrence of filling pkmap_count[].
111 CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
114 if (offset >= kiov->kiov_len) {
115 offset -= kiov->kiov_len;
117 CDEBUG(D_INFO, "kniov page [%p] len [%d] "
118 "offset[%d]\n", kiov->kiov_page,
119 kiov->kiov_len, kiov->kiov_offset);
120 CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
121 ptr = ((char *)kmap(kiov->kiov_page)) +
125 CDEBUG(D_INFO, "processing [%p] base "
126 "[%p] len %d, offset %d, len ["
127 LPSZ"]\n", ptr, ptr + offset,
128 kiov->kiov_len, offset,
129 kiov->kiov_len - offset);
130 gm_bcopy(buffer, ptr + offset,
131 kiov->kiov_len - offset);
132 buffer += kiov->kiov_len - offset;
135 CDEBUG(D_INFO, "processing [%p] len ["
136 LPSZ"]\n", ptr, kiov->kiov_len);
137 gm_bcopy(buffer, ptr, kiov->kiov_len);
138 buffer += kiov->kiov_len;
140 kunmap(kiov->kiov_page);
141 CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
145 CDEBUG(D_INFO, "calling gmnal_small_rx\n");
146 status = gmnal_small_rx(libnal, private, cookie);
149 CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
154 ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
155 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
156 unsigned int niov, struct iovec *iov, size_t offset,
160 gmnal_data_t *nal_data;
162 gmnal_stxd_t *stxd = NULL;
165 CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
166 "] nid["LPU64"]\n", niov, offset, len, nid);
167 nal_data = libnal->libnal_data;
169 CERROR("no nal_data\n");
172 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
175 if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
176 CDEBUG(D_INFO, "This is a small message send\n");
178 * HP SFS 1380: With the change to gmnal_small_tx, need to get
179 * the stxd and do relevant setup here
181 stxd = gmnal_get_stxd(nal_data, 1);
182 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
183 /* Set the offset of the data to copy into the buffer */
184 buffer = stxd->buffer +sizeof(gmnal_msghdr_t)+sizeof(ptl_hdr_t);
186 if (offset >= iov->iov_len) {
187 offset -= iov->iov_len;
188 } else if (offset > 0) {
189 CDEBUG(D_INFO, "processing iov [%p] base [%p] "
190 "len ["LPSZ"] to [%p]\n",
191 iov, iov->iov_base + offset,
192 iov->iov_len - offset, buffer);
193 gm_bcopy(iov->iov_base + offset, buffer,
194 iov->iov_len - offset);
195 buffer+= iov->iov_len - offset;
198 CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ
199 "] to [%p]\n", iov, iov->iov_len,buffer);
200 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
201 buffer+= iov->iov_len;
205 gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
208 CERROR("Large message send is not supported\n");
209 lib_finalize(libnal, private, cookie, PTL_FAIL);
211 gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
212 niov, iov, offset, len);
217 ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
218 lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
219 ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
220 ptl_kiov_t *kiov, size_t offset, size_t len)
223 gmnal_data_t *nal_data;
226 gmnal_stxd_t *stxd = NULL;
227 ptl_err_t status = PTL_OK;
229 CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
230 LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
231 nal_data = libnal->libnal_data;
233 CERROR("no nal_data\n");
236 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
239 /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
240 * more aggressively. This is the fix for a livelock situation under
241 * load on ia32 that occurs when there are no more available entries in
242 * the pkmap_count array. Just fill the buffer and let gmnal_small_tx
243 * put the headers in after we pass it the stxd pointer.
245 stxd = gmnal_get_stxd(nal_data, 1);
246 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
247 /* Set the offset of the data to copy into the buffer */
248 buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
250 if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
251 CDEBUG(D_INFO, "This is a small message send\n");
254 CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
255 if (offset >= kiov->kiov_len) {
256 offset -= kiov->kiov_len;
258 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
259 kiov->kiov_page, kiov->kiov_len,
262 ptr = ((char *)kmap(kiov->kiov_page)) +
266 CDEBUG(D_INFO, "processing [%p] base "
267 "[%p] len ["LPSZ"] to [%p]\n",
269 kiov->kiov_len - offset, buffer);
270 gm_bcopy(ptr + offset, buffer,
271 kiov->kiov_len - offset);
272 buffer+= kiov->kiov_len - offset;
275 CDEBUG(D_INFO, "processing kmapped [%p]"
276 " len ["LPSZ"] to [%p]\n",
277 ptr, kiov->kiov_len, buffer);
278 gm_bcopy(ptr, buffer, kiov->kiov_len);
280 buffer += kiov->kiov_len;
282 kunmap(kiov->kiov_page);
286 status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
290 struct iovec *iovec = NULL, *iovec_dup = NULL;
291 ptl_kiov_t *kiov_dup = kiov;
293 PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
295 CERROR("Large message send it is not supported yet\n");
296 PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
298 for (i=0; i<kniov; i++) {
299 CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
300 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
301 kiov->kiov_page, kiov->kiov_len,
304 iovec->iov_base = kmap(kiov->kiov_page)
306 iovec->iov_len = kiov->kiov_len;
310 gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
311 pid, kniov, iovec, offset, len);
312 for (i=0; i<kniov; i++) {
313 kunmap(kiov_dup->kiov_page);
316 PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
321 int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
323 CDEBUG(D_TRACE, "gmnal_cb_dist\n");