Whamcloud - gitweb
land 1.0.1 fixes on main development branch (head)
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <portals/lib-types.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32 #include "ptlrpc_internal.h"
33
34 static int ptl_send_buf(struct ptlrpc_request *request,
35                         struct ptlrpc_connection *conn, int portal)
36 {
37         int rc;
38         int rc2;
39         ptl_process_id_t remote_id;
40         ptl_handle_md_t md_h;
41         ptl_ack_req_t ack_req;
42         char str[PTL_NALFMT_SIZE];
43
44         LASSERT (portal != 0);
45         LASSERT (conn != NULL);
46         CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" (%s) on %s\n",
47                 conn, conn->c_peer.peer_ni->pni_name,
48                 conn->c_peer.peer_nid,
49                 portals_nid2str(conn->c_peer.peer_ni->pni_number,
50                                 conn->c_peer.peer_nid, str),
51                 conn->c_peer.peer_ni->pni_name);
52
53         request->rq_req_md.user_ptr = request;
54
55         switch (request->rq_type) {
56         case PTL_RPC_MSG_REQUEST:
57                 request->rq_reqmsg->type = request->rq_type;
58                 request->rq_req_md.start = request->rq_reqmsg;
59                 request->rq_req_md.length = request->rq_reqlen;
60                 request->rq_req_md.eventq =
61                         conn->c_peer.peer_ni->pni_request_out_eq_h;
62                 LASSERT (!request->rq_want_ack);
63                 break;
64         case PTL_RPC_MSG_ERR:
65         case PTL_RPC_MSG_REPLY:
66                 request->rq_repmsg->type = request->rq_type;
67                 request->rq_req_md.start = request->rq_repmsg;
68                 request->rq_req_md.length = request->rq_replen;
69                 request->rq_req_md.eventq =
70                         conn->c_peer.peer_ni->pni_reply_out_eq_h;
71                 break;
72         default:
73                 LBUG();
74                 return -1; /* notreached */
75         }
76         if (request->rq_want_ack) {
77                 request->rq_req_md.threshold = 2; /* SENT and ACK */
78                 ack_req = PTL_ACK_REQ;
79         } else {
80                 request->rq_req_md.threshold = 1;
81                 ack_req = PTL_NOACK_REQ;
82         }
83         request->rq_req_md.options = PTL_MD_OP_PUT;
84         request->rq_req_md.user_ptr = request;
85
86         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
87                 request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
88                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
89         }
90
91         /* NB if the send fails, we back out of the send and return
92          * failure; it's down to the caller to handle missing callbacks */
93
94         rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md,
95                        &md_h);
96         if (rc != PTL_OK) {
97                 CERROR("PtlMDBind failed: %d\n", rc);
98                 LASSERT (rc == PTL_NOSPACE);
99                 RETURN (-ENOMEM);
100         }
101         if (request->rq_type != PTL_RPC_MSG_REQUEST)
102                 memcpy(&request->rq_reply_md_h, &md_h, sizeof(md_h));
103
104         remote_id.nid = conn->c_peer.peer_nid;
105         remote_id.pid = 0;
106
107         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
108                request->rq_req_md.length, portal, request->rq_xid);
109
110         rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
111         if (rc != PTL_OK) {
112                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
113                        remote_id.nid, portal, request->rq_xid, rc);
114                 rc2 = PtlMDUnlink(md_h);
115                 LASSERT (rc2 == PTL_OK);
116                 RETURN ((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
117         }
118
119         return 0;
120 }
121
122 static inline ptl_kiov_t *
123 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
124 {
125         ptl_kiov_t *iov;
126
127         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
128                 return (desc->bd_iov);
129
130         OBD_ALLOC (iov, desc->bd_page_count * sizeof (*iov));
131         if (iov == NULL)
132                 LBUG();
133
134         return (iov);
135 }
136
137 static inline void
138 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, ptl_kiov_t *iov)
139 {
140         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
141                 return;
142
143         OBD_FREE (iov, desc->bd_page_count * sizeof (*iov));
144 }
145
146 int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
147 {
148         int rc;
149         int rc2;
150         struct ptlrpc_peer *peer;
151         struct list_head *tmp, *next;
152         ptl_process_id_t remote_id;
153         ptl_kiov_t *iov;
154         __u64 xid;
155         ENTRY;
156
157         /* NB no locking required until desc is on the network */
158         LASSERT (!desc->bd_network_rw);
159         LASSERT (desc->bd_type == BULK_PUT_SOURCE);
160         desc->bd_complete = 0;
161
162         iov = ptlrpc_get_bulk_iov (desc);
163         if (iov == NULL)
164                 RETURN (-ENOMEM);
165
166         peer = &desc->bd_export->exp_connection->c_peer;
167
168         desc->bd_md.start = iov;
169         desc->bd_md.niov = 0;
170         desc->bd_md.length = 0;
171         desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
172         desc->bd_md.threshold = 2; /* SENT and ACK */
173         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
174         desc->bd_md.user_ptr = desc;
175
176         desc->bd_callback_count = 2;
177
178         list_for_each_safe(tmp, next, &desc->bd_page_list) {
179                 struct ptlrpc_bulk_page *bulk;
180                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
181
182                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
183
184                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
185                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
186                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
187
188                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
189                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
190                 desc->bd_md.niov++;
191                 desc->bd_md.length += bulk->bp_buflen;
192         }
193
194         /* NB total length may be 0 for a read past EOF, so we send a 0
195          * length bulk, since the client expects a bulk event. */
196         LASSERT(desc->bd_md.niov == desc->bd_page_count);
197
198         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
199                        &desc->bd_md_h);
200
201         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
202
203         if (rc != PTL_OK) {
204                 CERROR("PtlMDBind failed: %d\n", rc);
205                 LASSERT (rc == PTL_NOSPACE);
206                 RETURN(-ENOMEM);
207         }
208
209         /* Client's bulk and reply matchbits are the same */
210         xid = desc->bd_req->rq_xid;
211         remote_id.nid = peer->peer_nid;
212         remote_id.pid = 0;
213
214         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
215                "nid "LPX64" pid %d xid "LPX64"\n",
216                desc->bd_md.niov, desc->bd_md.length,
217                desc->bd_portal, peer->peer_ni->pni_name,
218                remote_id.nid, remote_id.pid, xid);
219
220         desc->bd_network_rw = 1;
221         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
222                     desc->bd_portal, 0, xid, 0, 0);
223         if (rc != PTL_OK) {
224                 desc->bd_network_rw = 0;
225                 CERROR("PtlPut("LPU64", %d, "LPX64") failed: %d\n",
226                        remote_id.nid, desc->bd_portal, xid, rc);
227                 rc2 = PtlMDUnlink(desc->bd_md_h);
228                 LASSERT (rc2 == PTL_OK);
229                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
230         }
231
232         RETURN(0);
233 }
234
235 int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
236 {
237         int rc;
238         int rc2;
239         struct ptlrpc_peer *peer;
240         struct list_head *tmp, *next;
241         ptl_process_id_t remote_id;
242         ptl_kiov_t *iov;
243         __u64 xid;
244         ENTRY;
245
246         /* NB no locking required until desc is on the network */
247         LASSERT (!desc->bd_network_rw);
248         LASSERT (desc->bd_type == BULK_GET_SINK);
249         desc->bd_complete = 0;
250
251         iov = ptlrpc_get_bulk_iov (desc);
252         if (iov == NULL)
253                 RETURN(-ENOMEM);
254
255         peer = &desc->bd_export->exp_connection->c_peer;
256
257         desc->bd_md.start = iov;
258         desc->bd_md.niov = 0;
259         desc->bd_md.length = 0;
260         desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
261         desc->bd_md.threshold = 2; /* SENT and REPLY */
262         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
263         desc->bd_md.user_ptr = desc;
264
265         desc->bd_callback_count = 2;
266
267         list_for_each_safe(tmp, next, &desc->bd_page_list) {
268                 struct ptlrpc_bulk_page *bulk;
269                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
270
271                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
272
273                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
274                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
275                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
276
277                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
278                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
279                 desc->bd_md.niov++;
280                 desc->bd_md.length += bulk->bp_buflen;
281         }
282
283         LASSERT(desc->bd_md.niov == desc->bd_page_count);
284         LASSERT(desc->bd_md.niov != 0);
285
286         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md, &desc->bd_md_h);
287
288         ptlrpc_put_bulk_iov(desc, iov); /*move down to reduce latency to send*/
289
290         if (rc != PTL_OK) {
291                 CERROR("PtlMDBind failed: %d\n", rc);
292                 LASSERT (rc == PTL_NOSPACE);
293                 RETURN(-ENOMEM);
294         }
295
296         /* Client's bulk and reply matchbits are the same */
297         xid = desc->bd_req->rq_xid;
298         remote_id.nid = desc->bd_export->exp_connection->c_peer.peer_nid;
299         remote_id.pid = 0;
300
301         CDEBUG(D_NET, "Fetching %u pages %u bytes from portal %d on %s "
302                "nid "LPX64" pid %d xid "LPX64"\n",
303                desc->bd_md.niov, desc->bd_md.length, desc->bd_portal,
304                peer->peer_ni->pni_name, remote_id.nid, remote_id.pid,
305                xid);
306
307         desc->bd_network_rw = 1;
308         rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0,
309                     xid, 0);
310         if (rc != PTL_OK) {
311                 desc->bd_network_rw = 0;
312                 CERROR("PtlGet("LPU64", %d, "LPX64") failed: %d\n",
313                        remote_id.nid, desc->bd_portal, xid, rc);
314                 rc2 = PtlMDUnlink(desc->bd_md_h);
315                 LASSERT (rc2 == PTL_OK);
316                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
317         }
318
319         RETURN(0);
320 }
321
322 void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
323 {
324         /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
325          * serialises with completion callback) */
326         unsigned long      flags;
327         struct l_wait_info lwi;
328         int                callback_count;
329         int                rc;
330
331         LASSERT (!in_interrupt ());             /* might sleep */
332
333         /* NB. server-side bulk gets 2 events, so we have to keep trying to
334          * unlink the MD until all callbacks have happened, or
335          * PtlMDUnlink() returns OK or INVALID */
336  again:
337         spin_lock_irqsave (&desc->bd_lock, flags);
338         if (!desc->bd_network_rw) {
339                 /* completed or never even registered. NB holding bd_lock
340                  * guarantees callback has completed if it ran. */
341                 spin_unlock_irqrestore (&desc->bd_lock, flags);
342                 return;
343         }
344
345         /* sample callback count while we have the lock */
346         callback_count = desc->bd_callback_count;
347         spin_unlock_irqrestore (&desc->bd_lock, flags);
348
349         rc = PtlMDUnlink (desc->bd_md_h);
350         switch (rc) {
351         default:
352                 CERROR("PtlMDUnlink returned %d\n", rc);
353                 LBUG ();
354         case PTL_OK:                    /* Won the race with the network */
355                 LASSERT (!desc->bd_complete); /* Not all callbacks ran */
356                 desc->bd_network_rw = 0;
357                 return;
358
359         case PTL_MD_INUSE:              /* MD is being accessed right now */
360                 for (;;) {
361                         /* Network access will complete in finite time but the
362                          * timeout lets us CERROR for visibility */
363                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
364                         rc = l_wait_event(desc->bd_waitq,
365                                           desc->bd_callback_count !=
366                                           callback_count, &lwi);
367                         if (rc == -ETIMEDOUT) {
368                                 CERROR("Unexpectedly long timeout: desc %p\n",
369                                        desc);
370                                 continue;
371                         }
372                         LASSERT (rc == 0);
373                         break;
374                 }
375                 /* go back and try again... */
376                 goto again;
377
378         case PTL_INV_MD:            /* Lost the race with completion */
379                 LASSERT (desc->bd_complete);    /* Callbacks all ran */
380                 LASSERT (!desc->bd_network_rw);
381                 return;
382         }
383 }
384
385 int ptlrpc_register_bulk (struct ptlrpc_request *req)
386 {
387         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
388         struct ptlrpc_peer *peer;
389         struct list_head *tmp, *next;
390         int rc;
391         int rc2;
392         ptl_kiov_t *iov;
393         ptl_process_id_t source_id;
394         ENTRY;
395
396         /* NB no locking required until desc is on the network */
397         LASSERT (!desc->bd_network_rw);
398         LASSERT (desc->bd_page_count <= PTL_MD_MAX_PAGES);
399         LASSERT (desc->bd_req != NULL);
400         LASSERT (desc->bd_type == BULK_PUT_SINK ||
401                  desc->bd_type == BULK_GET_SOURCE);
402
403         desc->bd_complete = 0;
404
405         iov = ptlrpc_get_bulk_iov (desc);
406         if (iov == NULL)
407                 return (-ENOMEM);
408
409         peer = &desc->bd_import->imp_connection->c_peer;
410
411         desc->bd_md.start = iov;
412         desc->bd_md.niov = 0;
413         desc->bd_md.length = 0;
414         desc->bd_md.threshold = 1;
415         desc->bd_md.user_ptr = desc;
416
417         if (desc->bd_type == BULK_GET_SOURCE) {
418                 desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
419                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_source_eq_h;
420         } else {
421                 desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
422                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_sink_eq_h;
423         }
424
425         list_for_each_safe(tmp, next, &desc->bd_page_list) {
426                 struct ptlrpc_bulk_page *bulk;
427                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
428
429                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
430
431                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
432                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
433                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
434
435                 LASSERT (bulk->bp_pageoffset + bulk->bp_buflen <= PAGE_SIZE);
436                 desc->bd_md.niov++;
437                 desc->bd_md.length += bulk->bp_buflen;
438         }
439
440         LASSERT(desc->bd_md.niov == desc->bd_page_count);
441         LASSERT(desc->bd_md.niov != 0);
442
443         /* XXX Registering the same xid on retried bulk makes my head
444          * explode trying to understand how the original request's bulk
445          * might interfere with the retried request -eeb */
446         LASSERT (!desc->bd_registered || req->rq_xid != desc->bd_last_xid);
447         desc->bd_registered = 1;
448         desc->bd_last_xid = desc->bd_last_xid;
449
450         source_id.nid = desc->bd_import->imp_connection->c_peer.peer_nid;
451         source_id.pid = PTL_PID_ANY;
452
453         rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
454                          desc->bd_portal, source_id, req->rq_xid, 0,
455                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
456
457         if (rc != PTL_OK) {
458                 CERROR("PtlMEAttach failed: %d\n", rc);
459                 LASSERT (rc == PTL_NOSPACE);
460                 GOTO(out, rc = -ENOMEM);
461         }
462
463         /* About to let the network at it... */
464         desc->bd_network_rw = 1;
465         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
466                          &desc->bd_md_h);
467         if (rc != PTL_OK) {
468                 CERROR("PtlMDAttach failed: %d\n", rc);
469                 LASSERT (rc == PTL_NOSPACE);
470                 desc->bd_network_rw = 0;
471                 rc2 = PtlMEUnlink (desc->bd_me_h);
472                 LASSERT (rc2 == PTL_OK);
473                 GOTO(out, rc = -ENOMEM);
474         }
475         rc = 0;
476
477         CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
478                "portal %u on %s\n",
479                desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
480                desc->bd_md.niov, desc->bd_md.length,
481                req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
482
483  out:
484         ptlrpc_put_bulk_iov (desc, iov);
485         RETURN(rc);
486 }
487
488 void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
489 {
490         /* Disconnect a bulk desc from the network. Idempotent. Not
491          * thread-safe (i.e. only interlocks with completion callback). */
492         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
493         wait_queue_head_t       *wq;
494         unsigned long            flags;
495         struct l_wait_info       lwi;
496         int                      rc;
497
498         LASSERT (!in_interrupt ());             /* might sleep */
499
500         spin_lock_irqsave (&desc->bd_lock, flags);
501         if (!desc->bd_network_rw) {     /* completed or never even registered */
502                 spin_unlock_irqrestore (&desc->bd_lock, flags);
503                 return;
504         }
505         spin_unlock_irqrestore (&desc->bd_lock, flags);
506
507         LASSERT (desc->bd_req == req);     /* NB bd_req NULL until registered */
508
509         /* NB...
510          * 1. If the MD unlink is successful, the ME gets unlinked too.
511          * 2. Since client-side bulk only gets a single event and a
512          * .. threshold of 1.  If the MD was inuse at the first link
513          * .. attempt, the callback is due any minute, and the MD/ME will
514          * .. unlink themselves.
515          */
516         rc = PtlMDUnlink (desc->bd_md_h);
517         switch (rc) {
518         default:
519                 CERROR("PtlMDUnlink returned %d\n", rc);
520                 LBUG ();
521         case PTL_OK:                          /* Won the race with completion */
522                 LASSERT (!desc->bd_complete);   /* Callback hasn't happened */
523                 desc->bd_network_rw = 0;
524                 return;
525         case PTL_MD_INUSE:                  /* MD is being accessed right now */
526                 for (;;) {
527                         /* Network access will complete in finite time but the
528                          * timeout lets us CERROR for visibility */
529                         if (desc->bd_req->rq_set != NULL)
530                                 wq = &req->rq_set->set_waitq;
531                         else
532                                 wq = &req->rq_reply_waitq;
533                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
534                         rc = l_wait_event(*wq, ptlrpc_bulk_complete(desc), &lwi);
535                         LASSERT (rc == 0 || rc == -ETIMEDOUT);
536                         if (rc == 0)
537                                 break;
538                         CERROR ("Unexpectedly long timeout: desc %p\n", desc);
539                         LBUG();
540                 }
541                 /* Fall through */
542         case PTL_INV_MD:                     /* Lost the race with completion */
543                 LASSERT (desc->bd_complete);/* Callback has run to completion */
544                 LASSERT (!desc->bd_network_rw);
545                 return;
546         }
547 }
548
549 int ptlrpc_reply(struct ptlrpc_request *req)
550 {
551         unsigned long flags;
552         int rc;
553
554         /* We must already have a reply buffer (only ptlrpc_error() may be
555          * called without one).  We must also have a request buffer which
556          * is either the actual (swabbed) incoming request, or a saved copy
557          * if this is a req saved in target_queue_final_reply(). */
558         LASSERT (req->rq_repmsg != NULL);
559         LASSERT (req->rq_reqmsg != NULL);
560
561         /* FIXME: we need to increment the count of handled events */
562         if (req->rq_type != PTL_RPC_MSG_ERR)
563                 req->rq_type = PTL_RPC_MSG_REPLY;
564
565         req->rq_repmsg->status = req->rq_status;
566         req->rq_repmsg->opc = req->rq_reqmsg->opc;
567
568         init_waitqueue_head(&req->rq_reply_waitq);
569         rc = ptl_send_buf(req, req->rq_connection, req->rq_svc->srv_rep_portal);
570         if (rc != 0) {
571                 /* Do what the callback handler would have done */
572                 OBD_FREE (req->rq_repmsg, req->rq_replen);
573
574                 spin_lock_irqsave (&req->rq_lock, flags);
575                 req->rq_want_ack = 0;
576                 spin_unlock_irqrestore (&req->rq_lock, flags);
577         }
578         return rc;
579 }
580
581 int ptlrpc_error(struct ptlrpc_request *req)
582 {
583         int rc;
584         ENTRY;
585
586         if (!req->rq_repmsg) {
587                 rc = lustre_pack_reply(req, 0, NULL, NULL);
588                 if (rc)
589                         RETURN(rc);
590         }
591
592
593         req->rq_type = PTL_RPC_MSG_ERR;
594
595         rc = ptlrpc_reply(req);
596         RETURN(rc);
597 }
598
599 int ptl_send_rpc(struct ptlrpc_request *request)
600 {
601         int rc;
602         int rc2;
603         unsigned long flags;
604         ptl_process_id_t source_id;
605         ptl_handle_me_t  reply_me_h;
606         ENTRY;
607
608         LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
609
610         /* If this is a re-transmit, we're required to have disengaged
611          * cleanly from the previous attempt */
612         LASSERT (!request->rq_receiving_reply);
613
614         if (request->rq_bulk != NULL) {
615                 rc = ptlrpc_register_bulk (request);
616                 if (rc != 0)
617                         RETURN(rc);
618         }
619
620         request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
621         request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
622
623         source_id.nid = request->rq_connection->c_peer.peer_nid;
624         source_id.pid = PTL_PID_ANY;
625
626         LASSERT (request->rq_replen != 0);
627         if (request->rq_repmsg == NULL)
628                 OBD_ALLOC(request->rq_repmsg, request->rq_replen);
629         if (request->rq_repmsg == NULL) {
630                 LBUG();
631                 RETURN(-ENOMEM);
632         }
633
634         rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
635                          request->rq_reply_portal, /* XXX FIXME bug 249 */
636                          source_id, request->rq_xid, 0, PTL_UNLINK,
637                          PTL_INS_AFTER, &reply_me_h);
638         if (rc != PTL_OK) {
639                 CERROR("PtlMEAttach failed: %d\n", rc);
640                 LASSERT (rc == PTL_NOSPACE);
641                 LBUG();
642                 GOTO(cleanup, rc = -ENOMEM);
643         }
644
645         request->rq_reply_md.start = request->rq_repmsg;
646         request->rq_reply_md.length = request->rq_replen;
647         request->rq_reply_md.threshold = 1;
648         request->rq_reply_md.options = PTL_MD_OP_PUT;
649         request->rq_reply_md.user_ptr = request;
650         request->rq_reply_md.eventq =
651                 request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
652
653         rc = PtlMDAttach(reply_me_h, request->rq_reply_md,
654                          PTL_UNLINK, &request->rq_reply_md_h);
655         if (rc != PTL_OK) {
656                 CERROR("PtlMDAttach failed: %d\n", rc);
657                 LASSERT (rc == PTL_NOSPACE);
658                 LBUG();
659                 GOTO(cleanup2, rc -ENOMEM);
660         }
661
662         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
663                ", portal %u on %s\n",
664                request->rq_replen, request->rq_xid,
665                request->rq_reply_portal,
666                request->rq_connection->c_peer.peer_ni->pni_name);
667
668         ptlrpc_request_addref(request);        /* 1 ref for the SENT callback */
669
670         spin_lock_irqsave (&request->rq_lock, flags);
671         request->rq_receiving_reply = 1;
672         /* Clear any flags that may be present from previous sends. */
673         request->rq_replied = 0;
674         request->rq_err = 0;
675         request->rq_timedout = 0;
676         request->rq_resend = 0;
677         request->rq_restart = 0;
678         spin_unlock_irqrestore (&request->rq_lock, flags);
679
680         request->rq_sent = LTIME_S(CURRENT_TIME);
681         ptlrpc_pinger_sending_on_import(request->rq_import);
682         rc = ptl_send_buf(request, request->rq_connection,
683                           request->rq_request_portal);
684         if (rc == 0) {
685                 ptlrpc_lprocfs_rpc_sent(request);
686                 RETURN(rc);
687         }
688
689         spin_lock_irqsave (&request->rq_lock, flags);
690         request->rq_receiving_reply = 0;
691         spin_unlock_irqrestore (&request->rq_lock, flags);
692         ptlrpc_req_finished (request);          /* drop callback ref */
693  cleanup2:
694         /* MEUnlink is safe; the PUT didn't even get off the ground, and
695          * nobody apart from the PUT's target has the right nid+XID to
696          * access the reply buffer. */
697         rc2 = PtlMEUnlink(reply_me_h);
698         LASSERT (rc2 == PTL_OK);
699  cleanup:
700         OBD_FREE(request->rq_repmsg, request->rq_replen);
701         request->rq_repmsg = NULL;
702         return rc;
703 }
704
705 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
706 {
707         struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
708         struct ptlrpc_service *service = srv_ni->sni_service;
709         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
710         int rc;
711         ptl_md_t dummy;
712         ptl_handle_md_t md_h;
713
714         LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
715
716         CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx."LPX64"\n",
717                service->srv_req_portal, srv_ni->sni_ni->pni_name,
718                srv_ni->sni_ni->pni_ni_h.nal_idx,
719                srv_ni->sni_ni->pni_ni_h.cookie);
720
721         /* Attach the leading ME on which we build the ring */
722         rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
723                          match_id, 0, ~0,
724                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
725         if (rc != PTL_OK) {
726                 CERROR("PtlMEAttach failed: %d\n", rc);
727                 /* BUG 1191 */
728                 LBUG();
729         }
730
731         dummy.start      = rqbd->rqbd_buffer;
732         dummy.length     = service->srv_buf_size;
733         dummy.max_size   = service->srv_max_req_size;
734         dummy.threshold  = PTL_MD_THRESH_INF;
735         dummy.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
736         dummy.user_ptr   = rqbd;
737         dummy.eventq     = srv_ni->sni_eq_h;
738
739         atomic_inc(&srv_ni->sni_nrqbds_receiving);
740         atomic_set(&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
741
742         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
743         if (rc != PTL_OK) {
744                 CERROR("PtlMDAttach failed: %d\n", rc);
745                 LASSERT (rc == PTL_NOSPACE);
746                 LBUG();
747                 /* BUG 1191 */
748                 PtlMEUnlink (rqbd->rqbd_me_h);
749                 atomic_set(&rqbd->rqbd_refcount, 0);
750                 atomic_dec(&srv_ni->sni_nrqbds_receiving);
751         }
752 }