Whamcloud - gitweb
Landing b_recovery
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <portals/lib-types.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32 #include "ptlrpc_internal.h"
33
34 static int ptl_send_buf(struct ptlrpc_request *request,
35                         struct ptlrpc_connection *conn, int portal)
36 {
37         int rc;
38         int rc2;
39         ptl_process_id_t remote_id;
40         ptl_handle_md_t md_h;
41         ptl_ack_req_t ack_req;
42         char str[PTL_NALFMT_SIZE];
43
44         LASSERT (portal != 0);
45         LASSERT (conn != NULL);
46         CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" (%s) on %s\n",
47                 conn, conn->c_peer.peer_ni->pni_name,
48                 conn->c_peer.peer_nid,
49                 portals_nid2str(conn->c_peer.peer_ni->pni_number,
50                                 conn->c_peer.peer_nid, str),
51                 conn->c_peer.peer_ni->pni_name);
52
53         request->rq_req_md.user_ptr = request;
54
55         switch (request->rq_type) {
56         case PTL_RPC_MSG_REQUEST:
57                 request->rq_reqmsg->type = request->rq_type;
58                 request->rq_req_md.start = request->rq_reqmsg;
59                 request->rq_req_md.length = request->rq_reqlen;
60                 request->rq_req_md.eventq =
61                         conn->c_peer.peer_ni->pni_request_out_eq_h;
62                 LASSERT (!request->rq_want_ack);
63                 break;
64         case PTL_RPC_MSG_ERR:
65         case PTL_RPC_MSG_REPLY:
66                 request->rq_repmsg->type = request->rq_type;
67                 request->rq_req_md.start = request->rq_repmsg;
68                 request->rq_req_md.length = request->rq_replen;
69                 request->rq_req_md.eventq =
70                         conn->c_peer.peer_ni->pni_reply_out_eq_h;
71                 break;
72         default:
73                 LBUG();
74                 return -1; /* notreached */
75         }
76         if (request->rq_want_ack) {
77                 request->rq_req_md.threshold = 2; /* SENT and ACK */
78                 ack_req = PTL_ACK_REQ;
79         } else {
80                 request->rq_req_md.threshold = 1;
81                 ack_req = PTL_NOACK_REQ;
82         }
83         request->rq_req_md.options = PTL_MD_OP_PUT;
84         request->rq_req_md.user_ptr = request;
85
86         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
87                 request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
88                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
89         }
90
91         /* NB if the send fails, we back out of the send and return
92          * failure; it's down to the caller to handle missing callbacks */
93
94         rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md,
95                        &md_h);
96         if (rc != PTL_OK) {
97                 CERROR("PtlMDBind failed: %d\n", rc);
98                 LASSERT (rc == PTL_NOSPACE);
99                 RETURN (-ENOMEM);
100         }
101         if (request->rq_type != PTL_RPC_MSG_REQUEST)
102                 memcpy(&request->rq_reply_md_h, &md_h, sizeof(md_h));
103
104         remote_id.nid = conn->c_peer.peer_nid;
105         remote_id.pid = 0;
106
107         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
108                request->rq_req_md.length, portal, request->rq_xid);
109
110         rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
111         if (rc != PTL_OK) {
112                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
113                        remote_id.nid, portal, request->rq_xid, rc);
114                 rc2 = PtlMDUnlink(md_h);
115                 LASSERT (rc2 == PTL_OK);
116                 RETURN ((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
117         }
118
119         return 0;
120 }
121
122 static inline ptl_kiov_t *
123 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
124 {
125         ptl_kiov_t *iov;
126
127         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
128                 return (desc->bd_iov);
129
130         OBD_ALLOC (iov, desc->bd_page_count * sizeof (*iov));
131         if (iov == NULL)
132                 LBUG();
133
134         return (iov);
135 }
136
137 static inline void
138 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, ptl_kiov_t *iov)
139 {
140         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
141                 return;
142
143         OBD_FREE (iov, desc->bd_page_count * sizeof (*iov));
144 }
145
146 int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
147 {
148         int rc;
149         int rc2;
150         struct ptlrpc_peer *peer;
151         struct list_head *tmp, *next;
152         ptl_process_id_t remote_id;
153         ptl_kiov_t *iov;
154         __u64 xid;
155         ENTRY;
156
157         /* NB no locking required until desc is on the network */
158         LASSERT (!desc->bd_network_rw);
159         LASSERT (desc->bd_type == BULK_PUT_SOURCE);
160         desc->bd_complete = 0;
161
162         iov = ptlrpc_get_bulk_iov (desc);
163         if (iov == NULL)
164                 RETURN (-ENOMEM);
165
166         peer = &desc->bd_export->exp_connection->c_peer;
167
168         desc->bd_md.start = iov;
169         desc->bd_md.niov = 0;
170         desc->bd_md.length = 0;
171         desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
172         desc->bd_md.threshold = 2; /* SENT and ACK */
173         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
174         desc->bd_md.user_ptr = desc;
175
176         desc->bd_callback_count = 2;
177
178         list_for_each_safe(tmp, next, &desc->bd_page_list) {
179                 struct ptlrpc_bulk_page *bulk;
180                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
181
182                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
183
184                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
185                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
186                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
187
188                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
189                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
190                 desc->bd_md.niov++;
191                 desc->bd_md.length += bulk->bp_buflen;
192         }
193
194         /* NB total length may be 0 for a read past EOF, so we send a 0
195          * length bulk, since the client expects a bulk event. */
196         LASSERT(desc->bd_md.niov == desc->bd_page_count);
197
198         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
199                        &desc->bd_md_h);
200
201         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
202
203         if (rc != PTL_OK) {
204                 CERROR("PtlMDBind failed: %d\n", rc);
205                 LASSERT (rc == PTL_NOSPACE);
206                 RETURN(-ENOMEM);
207         }
208
209         /* Client's bulk and reply matchbits are the same */
210         xid = desc->bd_req->rq_xid;
211         remote_id.nid = peer->peer_nid;
212         remote_id.pid = 0;
213
214         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
215                "nid "LPX64" pid %d xid "LPX64"\n",
216                desc->bd_md.niov, desc->bd_md.length,
217                desc->bd_portal, peer->peer_ni->pni_name,
218                remote_id.nid, remote_id.pid, xid);
219
220         desc->bd_network_rw = 1;
221         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
222                     desc->bd_portal, 0, xid, 0, 0);
223         if (rc != PTL_OK) {
224                 desc->bd_network_rw = 0;
225                 CERROR("PtlPut("LPU64", %d, "LPX64") failed: %d\n",
226                        remote_id.nid, desc->bd_portal, xid, rc);
227                 rc2 = PtlMDUnlink(desc->bd_md_h);
228                 LASSERT (rc2 == PTL_OK);
229                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
230         }
231
232         RETURN(0);
233 }
234
235 int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
236 {
237         int rc;
238         int rc2;
239         struct ptlrpc_peer *peer;
240         struct list_head *tmp, *next;
241         ptl_process_id_t remote_id;
242         ptl_kiov_t *iov;
243         __u64 xid;
244         ENTRY;
245
246         /* NB no locking required until desc is on the network */
247         LASSERT (!desc->bd_network_rw);
248         LASSERT (desc->bd_type == BULK_GET_SINK);
249         desc->bd_complete = 0;
250
251         iov = ptlrpc_get_bulk_iov (desc);
252         if (iov == NULL)
253                 RETURN(-ENOMEM);
254
255         peer = &desc->bd_export->exp_connection->c_peer;
256
257         desc->bd_md.start = iov;
258         desc->bd_md.niov = 0;
259         desc->bd_md.length = 0;
260         desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
261         desc->bd_md.threshold = 2; /* SENT and REPLY */
262         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
263         desc->bd_md.user_ptr = desc;
264
265         desc->bd_callback_count = 2;
266
267         list_for_each_safe(tmp, next, &desc->bd_page_list) {
268                 struct ptlrpc_bulk_page *bulk;
269                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
270
271                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
272
273                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
274                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
275                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
276
277                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
278                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
279                 desc->bd_md.niov++;
280                 desc->bd_md.length += bulk->bp_buflen;
281         }
282
283         LASSERT(desc->bd_md.niov == desc->bd_page_count);
284         LASSERT(desc->bd_md.niov != 0);
285
286         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md, &desc->bd_md_h);
287
288         ptlrpc_put_bulk_iov(desc, iov); /*move down to reduce latency to send*/
289
290         if (rc != PTL_OK) {
291                 CERROR("PtlMDBind failed: %d\n", rc);
292                 LASSERT (rc == PTL_NOSPACE);
293                 RETURN(-ENOMEM);
294         }
295
296         /* Client's bulk and reply matchbits are the same */
297         xid = desc->bd_req->rq_xid;
298         remote_id.nid = desc->bd_export->exp_connection->c_peer.peer_nid;
299         remote_id.pid = 0;
300
301         CDEBUG(D_NET, "Fetching %u pages %u bytes from portal %d on %s "
302                "nid "LPX64" pid %d xid "LPX64"\n",
303                desc->bd_md.niov, desc->bd_md.length, desc->bd_portal,
304                peer->peer_ni->pni_name, remote_id.nid, remote_id.pid,
305                xid);
306
307         desc->bd_network_rw = 1;
308         rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0,
309                     xid, 0);
310         if (rc != PTL_OK) {
311                 desc->bd_network_rw = 0;
312                 CERROR("PtlGet("LPU64", %d, "LPX64") failed: %d\n",
313                        remote_id.nid, desc->bd_portal, xid, rc);
314                 rc2 = PtlMDUnlink(desc->bd_md_h);
315                 LASSERT (rc2 == PTL_OK);
316                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
317         }
318
319         RETURN(0);
320 }
321
322 void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
323 {
324         /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
325          * serialises with completion callback) */
326         unsigned long      flags;
327         struct l_wait_info lwi;
328         int                callback_count;
329         int                rc;
330
331         LASSERT (!in_interrupt ());             /* might sleep */
332
333         /* NB. server-side bulk gets 2 events, so we have to keep trying to
334          * unlink the MD until all callbacks have happened, or
335          * PtlMDUnlink() returns OK or INVALID */
336  again:
337         spin_lock_irqsave (&desc->bd_lock, flags);
338         if (!desc->bd_network_rw) {
339                 /* completed or never even registered. NB holding bd_lock
340                  * guarantees callback has completed if it ran. */
341                 spin_unlock_irqrestore (&desc->bd_lock, flags);
342                 return;
343         }
344
345         /* sample callback count while we have the lock */
346         callback_count = desc->bd_callback_count;
347         spin_unlock_irqrestore (&desc->bd_lock, flags);
348
349         rc = PtlMDUnlink (desc->bd_md_h);
350         switch (rc) {
351         default:
352                 CERROR("PtlMDUnlink returned %d\n", rc);
353                 LBUG ();
354         case PTL_OK:                    /* Won the race with the network */
355                 LASSERT (!desc->bd_complete); /* Not all callbacks ran */
356                 desc->bd_network_rw = 0;
357                 return;
358
359         case PTL_MD_INUSE:              /* MD is being accessed right now */
360                 for (;;) {
361                         /* Network access will complete in finite time but the
362                          * timeout lets us CERROR for visibility */
363                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
364                         rc = l_wait_event(desc->bd_waitq,
365                                           desc->bd_callback_count !=
366                                           callback_count, &lwi);
367                         if (rc == -ETIMEDOUT) {
368                                 CERROR("Unexpectedly long timeout: desc %p\n",
369                                        desc);
370                                 continue;
371                         }
372                         LASSERT (rc == 0);
373                         break;
374                 }
375                 /* go back and try again... */
376                 goto again;
377
378         case PTL_INV_MD:            /* Lost the race with completion */
379                 LASSERT (desc->bd_complete);    /* Callbacks all ran */
380                 LASSERT (!desc->bd_network_rw);
381                 return;
382         }
383 }
384
385 int ptlrpc_register_bulk (struct ptlrpc_request *req)
386 {
387         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
388         struct ptlrpc_peer *peer;
389         struct list_head *tmp, *next;
390         int rc;
391         int rc2;
392         ptl_kiov_t *iov;
393         ptl_process_id_t source_id;
394         ENTRY;
395
396         /* NB no locking required until desc is on the network */
397         LASSERT (!desc->bd_network_rw);
398         LASSERT (desc->bd_page_count <= PTL_MD_MAX_PAGES);
399         LASSERT (desc->bd_req != NULL);
400         LASSERT (desc->bd_type == BULK_PUT_SINK ||
401                  desc->bd_type == BULK_GET_SOURCE);
402
403         desc->bd_complete = 0;
404
405         iov = ptlrpc_get_bulk_iov (desc);
406         if (iov == NULL)
407                 return (-ENOMEM);
408
409         peer = &desc->bd_import->imp_connection->c_peer;
410
411         desc->bd_md.start = iov;
412         desc->bd_md.niov = 0;
413         desc->bd_md.length = 0;
414         desc->bd_md.threshold = 1;
415         desc->bd_md.user_ptr = desc;
416
417         if (desc->bd_type == BULK_GET_SOURCE) {
418                 desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
419                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_source_eq_h;
420         } else {
421                 desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
422                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_sink_eq_h;
423         }
424
425         list_for_each_safe(tmp, next, &desc->bd_page_list) {
426                 struct ptlrpc_bulk_page *bulk;
427                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
428
429                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
430
431                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
432                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
433                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
434
435                 LASSERT (bulk->bp_pageoffset + bulk->bp_buflen <= PAGE_SIZE);
436                 desc->bd_md.niov++;
437                 desc->bd_md.length += bulk->bp_buflen;
438         }
439
440         LASSERT(desc->bd_md.niov == desc->bd_page_count);
441         LASSERT(desc->bd_md.niov != 0);
442
443         /* XXX Registering the same xid on retried bulk makes my head
444          * explode trying to understand how the original request's bulk
445          * might interfere with the retried request -eeb */
446         LASSERT (!desc->bd_registered || req->rq_xid != desc->bd_last_xid);
447         desc->bd_registered = 1;
448         desc->bd_last_xid = desc->bd_last_xid;
449
450         source_id.nid = desc->bd_import->imp_connection->c_peer.peer_nid;
451         source_id.pid = PTL_PID_ANY;
452
453         rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
454                          desc->bd_portal, source_id, req->rq_xid, 0,
455                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
456
457         if (rc != PTL_OK) {
458                 CERROR("PtlMEAttach failed: %d\n", rc);
459                 LASSERT (rc == PTL_NOSPACE);
460                 GOTO(out, rc = -ENOMEM);
461         }
462
463         /* About to let the network at it... */
464         desc->bd_network_rw = 1;
465         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
466                          &desc->bd_md_h);
467         if (rc != PTL_OK) {
468                 CERROR("PtlMDAttach failed: %d\n", rc);
469                 LASSERT (rc == PTL_NOSPACE);
470                 desc->bd_network_rw = 0;
471                 rc2 = PtlMEUnlink (desc->bd_me_h);
472                 LASSERT (rc2 == PTL_OK);
473                 GOTO(out, rc = -ENOMEM);
474         }
475         rc = 0;
476
477         CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
478                "portal %u on %s\n",
479                desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
480                desc->bd_md.niov, desc->bd_md.length,
481                req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
482
483  out:
484         ptlrpc_put_bulk_iov (desc, iov);
485         RETURN(rc);
486 }
487
488 void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
489 {
490         /* Disconnect a bulk desc from the network. Idempotent. Not
491          * thread-safe (i.e. only interlocks with completion callback). */
492         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
493         wait_queue_head_t       *wq;
494         unsigned long            flags;
495         struct l_wait_info       lwi;
496         int                      rc;
497
498         LASSERT (!in_interrupt ());             /* might sleep */
499
500         spin_lock_irqsave (&desc->bd_lock, flags);
501         if (!desc->bd_network_rw) {     /* completed or never even registered */
502                 spin_unlock_irqrestore (&desc->bd_lock, flags);
503                 return;
504         }
505         spin_unlock_irqrestore (&desc->bd_lock, flags);
506
507         LASSERT (desc->bd_req == req);     /* NB bd_req NULL until registered */
508
509         /* NB...
510          * 1. If the MD unlink is successful, the ME gets unlinked too.
511          * 2. Since client-side bulk only gets a single event and a
512          * .. threshold of 1.  If the MD was inuse at the first link
513          * .. attempt, the callback is due any minute, and the MD/ME will
514          * .. unlink themselves.
515          */
516         rc = PtlMDUnlink (desc->bd_md_h);
517         switch (rc) {
518         default:
519                 CERROR("PtlMDUnlink returned %d\n", rc);
520                 LBUG ();
521         case PTL_OK:                          /* Won the race with completion */
522                 LASSERT (!desc->bd_complete);   /* Callback hasn't happened */
523                 desc->bd_network_rw = 0;
524                 return;
525         case PTL_MD_INUSE:                  /* MD is being accessed right now */
526                 for (;;) {
527                         /* Network access will complete in finite time but the
528                          * timeout lets us CERROR for visibility */
529                         if (desc->bd_req->rq_set != NULL)
530                                 wq = &req->rq_set->set_waitq;
531                         else
532                                 wq = &req->rq_reply_waitq;
533                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
534                         rc = l_wait_event(*wq, ptlrpc_bulk_complete(desc), &lwi);
535                         LASSERT (rc == 0 || rc == -ETIMEDOUT);
536                         if (rc == 0)
537                                 break;
538                         CERROR ("Unexpectedly long timeout: desc %p\n", desc);
539                         LBUG();
540                 }
541                 /* Fall through */
542         case PTL_INV_MD:                     /* Lost the race with completion */
543                 LASSERT (desc->bd_complete);/* Callback has run to completion */
544                 LASSERT (!desc->bd_network_rw);
545                 return;
546         }
547 }
548
549 int ptlrpc_reply(struct ptlrpc_request *req)
550 {
551         struct ptlrpc_connection *conn;
552         unsigned long flags;
553         int rc;
554
555         /* We must already have a reply buffer (only ptlrpc_error() may be
556          * called without one).  We must also have a request buffer which
557          * is either the actual (swabbed) incoming request, or a saved copy
558          * if this is a req saved in target_queue_final_reply(). */
559         LASSERT (req->rq_repmsg != NULL);
560         LASSERT (req->rq_reqmsg != NULL);
561
562         /* FIXME: we need to increment the count of handled events */
563         if (req->rq_type != PTL_RPC_MSG_ERR)
564                 req->rq_type = PTL_RPC_MSG_REPLY;
565
566         req->rq_repmsg->status = req->rq_status;
567         req->rq_repmsg->opc = req->rq_reqmsg->opc;
568
569         if (req->rq_export == NULL) 
570                 conn = ptlrpc_get_connection(&req->rq_peer, NULL);
571         else
572                 conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
573
574         init_waitqueue_head(&req->rq_reply_waitq);
575         rc = ptl_send_buf(req, conn, 
576                           req->rq_svc->srv_rep_portal);
577         if (rc != 0) {
578                 /* Do what the callback handler would have done */
579                 OBD_FREE (req->rq_repmsg, req->rq_replen);
580
581                 spin_lock_irqsave (&req->rq_lock, flags);
582                 req->rq_want_ack = 0;
583                 spin_unlock_irqrestore (&req->rq_lock, flags);
584         }
585         ptlrpc_put_connection(conn);
586         return rc;
587 }
588
589 int ptlrpc_error(struct ptlrpc_request *req)
590 {
591         int rc;
592         ENTRY;
593
594         if (!req->rq_repmsg) {
595                 rc = lustre_pack_reply(req, 0, NULL, NULL);
596                 if (rc)
597                         RETURN(rc);
598         }
599
600
601         req->rq_type = PTL_RPC_MSG_ERR;
602
603         rc = ptlrpc_reply(req);
604         RETURN(rc);
605 }
606
607 int ptl_send_rpc(struct ptlrpc_request *request)
608 {
609         int rc;
610         int rc2;
611         struct ptlrpc_connection *connection;
612         unsigned long flags;
613         ptl_process_id_t source_id;
614         ptl_handle_me_t  reply_me_h;
615         ENTRY;
616
617         LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
618
619         /* If this is a re-transmit, we're required to have disengaged
620          * cleanly from the previous attempt */
621         LASSERT (!request->rq_receiving_reply);
622
623         connection = request->rq_import->imp_connection;
624
625         if (request->rq_bulk != NULL) {
626                 rc = ptlrpc_register_bulk (request);
627                 if (rc != 0)
628                         RETURN(rc);
629         }
630
631         request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
632         request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
633
634         source_id.nid = connection->c_peer.peer_nid;
635         source_id.pid = PTL_PID_ANY;
636
637         LASSERT (request->rq_replen != 0);
638         if (request->rq_repmsg == NULL)
639                 OBD_ALLOC(request->rq_repmsg, request->rq_replen);
640         if (request->rq_repmsg == NULL) {
641                 LBUG();
642                 RETURN(-ENOMEM);
643         }
644
645         rc = PtlMEAttach(connection->c_peer.peer_ni->pni_ni_h,
646                          request->rq_reply_portal, /* XXX FIXME bug 249 */
647                          source_id, request->rq_xid, 0, PTL_UNLINK,
648                          PTL_INS_AFTER, &reply_me_h);
649         if (rc != PTL_OK) {
650                 CERROR("PtlMEAttach failed: %d\n", rc);
651                 LASSERT (rc == PTL_NOSPACE);
652                 LBUG();
653                 GOTO(cleanup, rc = -ENOMEM);
654         }
655
656         request->rq_reply_md.start = request->rq_repmsg;
657         request->rq_reply_md.length = request->rq_replen;
658         request->rq_reply_md.threshold = 1;
659         request->rq_reply_md.options = PTL_MD_OP_PUT;
660         request->rq_reply_md.user_ptr = request;
661         request->rq_reply_md.eventq = 
662                 connection->c_peer.peer_ni->pni_reply_in_eq_h;
663
664         rc = PtlMDAttach(reply_me_h, request->rq_reply_md,
665                          PTL_UNLINK, &request->rq_reply_md_h);
666         if (rc != PTL_OK) {
667                 CERROR("PtlMDAttach failed: %d\n", rc);
668                 LASSERT (rc == PTL_NOSPACE);
669                 LBUG();
670                 GOTO(cleanup2, rc -ENOMEM);
671         }
672
673         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
674                ", portal %u on %s\n",
675                request->rq_replen, request->rq_xid,
676                request->rq_reply_portal,
677                connection->c_peer.peer_ni->pni_name);
678
679         ptlrpc_request_addref(request);        /* 1 ref for the SENT callback */
680
681         spin_lock_irqsave (&request->rq_lock, flags);
682         request->rq_receiving_reply = 1;
683         /* Clear any flags that may be present from previous sends. */
684         request->rq_replied = 0;
685         request->rq_err = 0;
686         request->rq_timedout = 0;
687         request->rq_resend = 0;
688         request->rq_restart = 0;
689         spin_unlock_irqrestore (&request->rq_lock, flags);
690
691         request->rq_sent = LTIME_S(CURRENT_TIME);
692         ptlrpc_pinger_sending_on_import(request->rq_import);
693         rc = ptl_send_buf(request, connection, request->rq_request_portal);
694         if (rc == 0) {
695                 ptlrpc_lprocfs_rpc_sent(request);
696                 RETURN(rc);
697         }
698
699         spin_lock_irqsave (&request->rq_lock, flags);
700         request->rq_receiving_reply = 0;
701         spin_unlock_irqrestore (&request->rq_lock, flags);
702         ptlrpc_req_finished (request);          /* drop callback ref */
703  cleanup2:
704         /* MEUnlink is safe; the PUT didn't even get off the ground, and
705          * nobody apart from the PUT's target has the right nid+XID to
706          * access the reply buffer. */
707         rc2 = PtlMEUnlink(reply_me_h);
708         LASSERT (rc2 == PTL_OK);
709  cleanup:
710         OBD_FREE(request->rq_repmsg, request->rq_replen);
711         request->rq_repmsg = NULL;
712         return rc;
713 }
714
715 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
716 {
717         struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
718         struct ptlrpc_service *service = srv_ni->sni_service;
719         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
720         int rc;
721         ptl_md_t dummy;
722         ptl_handle_md_t md_h;
723
724         LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
725
726         CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx."LPX64"\n",
727                service->srv_req_portal, srv_ni->sni_ni->pni_name,
728                srv_ni->sni_ni->pni_ni_h.nal_idx,
729                srv_ni->sni_ni->pni_ni_h.cookie);
730
731         /* Attach the leading ME on which we build the ring */
732         rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
733                          match_id, 0, ~0,
734                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
735         if (rc != PTL_OK) {
736                 CERROR("PtlMEAttach failed: %d\n", rc);
737                 /* BUG 1191 */
738                 LBUG();
739         }
740
741         dummy.start      = rqbd->rqbd_buffer;
742         dummy.length     = service->srv_buf_size;
743         dummy.max_size   = service->srv_max_req_size;
744         dummy.threshold  = PTL_MD_THRESH_INF;
745         dummy.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
746         dummy.user_ptr   = rqbd;
747         dummy.eventq     = srv_ni->sni_eq_h;
748
749         atomic_inc(&srv_ni->sni_nrqbds_receiving);
750         atomic_set(&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
751
752         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
753         if (rc != PTL_OK) {
754                 CERROR("PtlMDAttach failed: %d\n", rc);
755                 LASSERT (rc == PTL_NOSPACE);
756                 LBUG();
757                 /* BUG 1191 */
758                 PtlMEUnlink (rqbd->rqbd_me_h);
759                 atomic_set(&rqbd->rqbd_refcount, 0);
760                 atomic_dec(&srv_ni->sni_nrqbds_receiving);
761         }
762 }