Whamcloud - gitweb
file jbd-2.4.19-pre1-jcberr.patch was initially added on branch b_devel.
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <portals/lib-types.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32 #include "ptlrpc_internal.h"
33
34 static int ptl_send_buf(struct ptlrpc_request *request,
35                         struct ptlrpc_connection *conn, int portal)
36 {
37         int rc;
38         int rc2;
39         ptl_process_id_t remote_id;
40         ptl_handle_md_t md_h;
41         ptl_ack_req_t ack_req;
42
43         LASSERT (portal != 0);
44         LASSERT (conn != NULL);
45         CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" on %s\n",
46                 conn, conn->c_peer.peer_ni->pni_name,
47                 conn->c_peer.peer_nid, conn->c_peer.peer_ni->pni_name);
48
49         request->rq_req_md.user_ptr = request;
50
51         switch (request->rq_type) {
52         case PTL_RPC_MSG_REQUEST:
53                 request->rq_reqmsg->type = request->rq_type;
54                 request->rq_req_md.start = request->rq_reqmsg;
55                 request->rq_req_md.length = request->rq_reqlen;
56                 request->rq_req_md.eventq =
57                         conn->c_peer.peer_ni->pni_request_out_eq_h;
58                 LASSERT (!request->rq_want_ack);
59                 break;
60         case PTL_RPC_MSG_ERR:
61         case PTL_RPC_MSG_REPLY:
62                 request->rq_repmsg->type = request->rq_type;
63                 request->rq_req_md.start = request->rq_repmsg;
64                 request->rq_req_md.length = request->rq_replen;
65                 request->rq_req_md.eventq =
66                         conn->c_peer.peer_ni->pni_reply_out_eq_h;
67                 break;
68         default:
69                 LBUG();
70                 return -1; /* notreached */
71         }
72         if (request->rq_want_ack) {
73                 request->rq_req_md.threshold = 2; /* SENT and ACK */
74                 ack_req = PTL_ACK_REQ;
75         } else {
76                 request->rq_req_md.threshold = 1;
77                 ack_req = PTL_NOACK_REQ;
78         }
79         request->rq_req_md.options = PTL_MD_OP_PUT;
80         request->rq_req_md.user_ptr = request;
81
82         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
83                 request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
84                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
85         }
86
87         /* NB if the send fails, we back out of the send and return
88          * failure; it's down to the caller to handle missing callbacks */
89
90         rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md,
91                        &md_h);
92         if (rc != PTL_OK) {
93                 CERROR("PtlMDBind failed: %d\n", rc);
94                 LASSERT (rc == PTL_NOSPACE);
95                 RETURN (-ENOMEM);
96         }
97         if (request->rq_type != PTL_RPC_MSG_REQUEST)
98                 memcpy(&request->rq_reply_md_h, &md_h, sizeof(md_h));
99
100         remote_id.nid = conn->c_peer.peer_nid;
101         remote_id.pid = 0;
102
103         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
104                request->rq_req_md.length, portal, request->rq_xid);
105
106         rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
107         if (rc != PTL_OK) {
108                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
109                        remote_id.nid, portal, request->rq_xid, rc);
110                 rc2 = PtlMDUnlink(md_h);
111                 LASSERT (rc2 == PTL_OK);
112                 RETURN ((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
113         }
114
115         return 0;
116 }
117
118 static inline ptl_kiov_t *
119 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
120 {
121         ptl_kiov_t *iov;
122
123         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
124                 return (desc->bd_iov);
125
126         OBD_ALLOC (iov, desc->bd_page_count * sizeof (*iov));
127         if (iov == NULL)
128                 LBUG();
129
130         return (iov);
131 }
132
133 static inline void
134 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, ptl_kiov_t *iov)
135 {
136         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
137                 return;
138
139         OBD_FREE (iov, desc->bd_page_count * sizeof (*iov));
140 }
141
142 int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
143 {
144         int rc;
145         int rc2;
146         struct ptlrpc_peer *peer;
147         struct list_head *tmp, *next;
148         ptl_process_id_t remote_id;
149         ptl_kiov_t *iov;
150         __u64 xid;
151         ENTRY;
152
153         /* NB no locking required until desc is on the network */
154         LASSERT (!desc->bd_network_rw);
155         LASSERT (desc->bd_type == BULK_PUT_SOURCE);
156         desc->bd_complete = 0;
157
158         iov = ptlrpc_get_bulk_iov (desc);
159         if (iov == NULL)
160                 RETURN (-ENOMEM);
161
162         peer = &desc->bd_export->exp_connection->c_peer;
163
164         desc->bd_md.start = iov;
165         desc->bd_md.niov = 0;
166         desc->bd_md.length = 0;
167         desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
168         desc->bd_md.threshold = 2; /* SENT and ACK */
169         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
170         desc->bd_md.user_ptr = desc;
171
172         desc->bd_callback_count = 2;
173
174         list_for_each_safe(tmp, next, &desc->bd_page_list) {
175                 struct ptlrpc_bulk_page *bulk;
176                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
177
178                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
179
180                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
181                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
182                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
183
184                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
185                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
186                 desc->bd_md.niov++;
187                 desc->bd_md.length += bulk->bp_buflen;
188         }
189
190         /* NB total length may be 0 for a read past EOF, so we send a 0
191          * length bulk, since the client expects a bulk event. */
192         LASSERT(desc->bd_md.niov == desc->bd_page_count);
193
194         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
195                        &desc->bd_md_h);
196
197         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
198
199         if (rc != PTL_OK) {
200                 CERROR("PtlMDBind failed: %d\n", rc);
201                 LASSERT (rc == PTL_NOSPACE);
202                 RETURN(-ENOMEM);
203         }
204
205         /* Client's bulk and reply matchbits are the same */
206         xid = desc->bd_req->rq_xid;
207         remote_id.nid = peer->peer_nid;
208         remote_id.pid = 0;
209
210         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
211                "nid "LPX64" pid %d xid "LPX64"\n",
212                desc->bd_md.niov, desc->bd_md.length,
213                desc->bd_portal, peer->peer_ni->pni_name,
214                remote_id.nid, remote_id.pid, xid);
215
216         desc->bd_network_rw = 1;
217         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
218                     desc->bd_portal, 0, xid, 0, 0);
219         if (rc != PTL_OK) {
220                 desc->bd_network_rw = 0;
221                 CERROR("PtlPut("LPU64", %d, "LPX64") failed: %d\n",
222                        remote_id.nid, desc->bd_portal, xid, rc);
223                 rc2 = PtlMDUnlink(desc->bd_md_h);
224                 LASSERT (rc2 == PTL_OK);
225                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
226         }
227
228         RETURN(0);
229 }
230
231 int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
232 {
233         int rc;
234         int rc2;
235         struct ptlrpc_peer *peer;
236         struct list_head *tmp, *next;
237         ptl_process_id_t remote_id;
238         ptl_kiov_t *iov;
239         __u64 xid;
240         ENTRY;
241
242         /* NB no locking required until desc is on the network */
243         LASSERT (!desc->bd_network_rw);
244         LASSERT (desc->bd_type == BULK_GET_SINK);
245         desc->bd_complete = 0;
246
247         iov = ptlrpc_get_bulk_iov (desc);
248         if (iov == NULL)
249                 RETURN(-ENOMEM);
250
251         peer = &desc->bd_export->exp_connection->c_peer;
252
253         desc->bd_md.start = iov;
254         desc->bd_md.niov = 0;
255         desc->bd_md.length = 0;
256         desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
257         desc->bd_md.threshold = 2; /* SENT and REPLY */
258         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
259         desc->bd_md.user_ptr = desc;
260
261         desc->bd_callback_count = 2;
262
263         list_for_each_safe(tmp, next, &desc->bd_page_list) {
264                 struct ptlrpc_bulk_page *bulk;
265                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
266
267                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
268
269                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
270                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
271                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
272
273                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
274                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
275                 desc->bd_md.niov++;
276                 desc->bd_md.length += bulk->bp_buflen;
277         }
278
279         LASSERT(desc->bd_md.niov == desc->bd_page_count);
280         LASSERT(desc->bd_md.niov != 0);
281
282         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md, &desc->bd_md_h);
283
284         ptlrpc_put_bulk_iov(desc, iov); /*move down to reduce latency to send*/
285
286         if (rc != PTL_OK) {
287                 CERROR("PtlMDBind failed: %d\n", rc);
288                 LASSERT (rc == PTL_NOSPACE);
289                 RETURN(-ENOMEM);
290         }
291
292         /* Client's bulk and reply matchbits are the same */
293         xid = desc->bd_req->rq_xid;
294         remote_id.nid = desc->bd_export->exp_connection->c_peer.peer_nid;
295         remote_id.pid = 0;
296
297         CDEBUG(D_NET, "Fetching %u pages %u bytes from portal %d on %s "
298                "nid "LPX64" pid %d xid "LPX64"\n",
299                desc->bd_md.niov, desc->bd_md.length, desc->bd_portal,
300                peer->peer_ni->pni_name, remote_id.nid, remote_id.pid,
301                xid);
302
303         desc->bd_network_rw = 1;
304         rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0,
305                     xid, 0);
306         if (rc != PTL_OK) {
307                 desc->bd_network_rw = 0;
308                 CERROR("PtlGet("LPU64", %d, "LPX64") failed: %d\n",
309                        remote_id.nid, desc->bd_portal, xid, rc);
310                 rc2 = PtlMDUnlink(desc->bd_md_h);
311                 LASSERT (rc2 == PTL_OK);
312                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
313         }
314
315         RETURN(0);
316 }
317
318 void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
319 {
320         /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
321          * serialises with completion callback) */
322         unsigned long      flags;
323         struct l_wait_info lwi;
324         int                callback_count;
325         int                rc;
326
327         LASSERT (!in_interrupt ());             /* might sleep */
328
329         /* NB. server-side bulk gets 2 events, so we have to keep trying to
330          * unlink the MD until all callbacks have happened, or
331          * PtlMDUnlink() returns OK or INVALID */
332  again:
333         spin_lock_irqsave (&desc->bd_lock, flags);
334         if (!desc->bd_network_rw) {
335                 /* completed or never even registered. NB holding bd_lock
336                  * guarantees callback has completed if it ran. */
337                 spin_unlock_irqrestore (&desc->bd_lock, flags);
338                 return;
339         }
340
341         /* sample callback count while we have the lock */
342         callback_count = desc->bd_callback_count;
343         spin_unlock_irqrestore (&desc->bd_lock, flags);
344
345         rc = PtlMDUnlink (desc->bd_md_h);
346         switch (rc) {
347         default:
348                 CERROR("PtlMDUnlink returned %d\n", rc);
349                 LBUG ();
350         case PTL_OK:                    /* Won the race with the network */
351                 LASSERT (!desc->bd_complete); /* Not all callbacks ran */
352                 desc->bd_network_rw = 0;
353                 return;
354
355         case PTL_MD_INUSE:              /* MD is being accessed right now */
356                 for (;;) {
357                         /* Network access will complete in finite time but the
358                          * timeout lets us CERROR for visibility */
359                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
360                         rc = l_wait_event(desc->bd_waitq,
361                                           desc->bd_callback_count !=
362                                           callback_count, &lwi);
363                         if (rc == -ETIMEDOUT) {
364                                 CERROR("Unexpectedly long timeout: desc %p\n",
365                                        desc);
366                                 continue;
367                         }
368                         LASSERT (rc == 0);
369                         break;
370                 }
371                 /* go back and try again... */
372                 goto again;
373
374         case PTL_INV_MD:            /* Lost the race with completion */
375                 LASSERT (desc->bd_complete);    /* Callbacks all ran */
376                 LASSERT (!desc->bd_network_rw);
377                 return;
378         }
379 }
380
381 int ptlrpc_register_bulk (struct ptlrpc_request *req)
382 {
383         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
384         struct ptlrpc_peer *peer;
385         struct list_head *tmp, *next;
386         int rc;
387         int rc2;
388         ptl_kiov_t *iov;
389         ptl_process_id_t source_id;
390         ENTRY;
391
392         /* NB no locking required until desc is on the network */
393         LASSERT (!desc->bd_network_rw);
394         LASSERT (desc->bd_page_count <= PTL_MD_MAX_IOV);
395         LASSERT (desc->bd_req != NULL);
396         LASSERT (desc->bd_type == BULK_PUT_SINK ||
397                  desc->bd_type == BULK_GET_SOURCE);
398
399         desc->bd_complete = 0;
400
401         iov = ptlrpc_get_bulk_iov (desc);
402         if (iov == NULL)
403                 return (-ENOMEM);
404
405         peer = &desc->bd_import->imp_connection->c_peer;
406
407         desc->bd_md.start = iov;
408         desc->bd_md.niov = 0;
409         desc->bd_md.length = 0;
410         desc->bd_md.threshold = 1;
411         desc->bd_md.user_ptr = desc;
412
413         if (desc->bd_type == BULK_GET_SOURCE) {
414                 desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
415                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_source_eq_h;
416         } else {
417                 desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
418                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_sink_eq_h;
419         }
420
421         list_for_each_safe(tmp, next, &desc->bd_page_list) {
422                 struct ptlrpc_bulk_page *bulk;
423                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
424
425                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
426
427                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
428                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
429                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
430
431                 LASSERT (bulk->bp_pageoffset + bulk->bp_buflen <= PAGE_SIZE);
432                 desc->bd_md.niov++;
433                 desc->bd_md.length += bulk->bp_buflen;
434         }
435
436         LASSERT(desc->bd_md.niov == desc->bd_page_count);
437         LASSERT(desc->bd_md.niov != 0);
438
439         /* XXX Registering the same xid on retried bulk makes my head
440          * explode trying to understand how the original request's bulk
441          * might interfere with the retried request -eeb */
442         LASSERT (!desc->bd_registered || req->rq_xid != desc->bd_last_xid);
443         desc->bd_registered = 1;
444         desc->bd_last_xid = desc->bd_last_xid;
445
446         source_id.nid = desc->bd_import->imp_connection->c_peer.peer_nid;
447         source_id.pid = PTL_PID_ANY;
448
449         rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
450                          desc->bd_portal, source_id, req->rq_xid, 0,
451                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
452
453         if (rc != PTL_OK) {
454                 CERROR("PtlMEAttach failed: %d\n", rc);
455                 LASSERT (rc == PTL_NOSPACE);
456                 GOTO(out, rc = -ENOMEM);
457         }
458
459         /* About to let the network at it... */
460         desc->bd_network_rw = 1;
461         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
462                          &desc->bd_md_h);
463         if (rc != PTL_OK) {
464                 CERROR("PtlMDAttach failed: %d\n", rc);
465                 LASSERT (rc == PTL_NOSPACE);
466                 desc->bd_network_rw = 0;
467                 rc2 = PtlMEUnlink (desc->bd_me_h);
468                 LASSERT (rc2 == PTL_OK);
469                 GOTO(out, rc = -ENOMEM);
470         }
471         rc = 0;
472
473         CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
474                "portal %u on %s\n",
475                desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
476                desc->bd_md.niov, desc->bd_md.length,
477                req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
478
479  out:
480         ptlrpc_put_bulk_iov (desc, iov);
481         RETURN(rc);
482 }
483
484 void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
485 {
486         /* Disconnect a bulk desc from the network. Idempotent. Not
487          * thread-safe (i.e. only interlocks with completion callback). */
488         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
489         wait_queue_head_t       *wq;
490         unsigned long            flags;
491         struct l_wait_info       lwi;
492         int                      rc;
493
494         LASSERT (!in_interrupt ());             /* might sleep */
495
496         spin_lock_irqsave (&desc->bd_lock, flags);
497         if (!desc->bd_network_rw) {     /* completed or never even registered */
498                 spin_unlock_irqrestore (&desc->bd_lock, flags);
499                 return;
500         }
501         spin_unlock_irqrestore (&desc->bd_lock, flags);
502
503         LASSERT (desc->bd_req == req);     /* NB bd_req NULL until registered */
504
505         /* NB...
506          * 1. If the MD unlink is successful, the ME gets unlinked too.
507          * 2. Since client-side bulk only gets a single event and a
508          * .. threshold of 1.  If the MD was inuse at the first link
509          * .. attempt, the callback is due any minute, and the MD/ME will
510          * .. unlink themselves.
511          */
512         rc = PtlMDUnlink (desc->bd_md_h);
513         switch (rc) {
514         default:
515                 CERROR("PtlMDUnlink returned %d\n", rc);
516                 LBUG ();
517         case PTL_OK:                          /* Won the race with completion */
518                 LASSERT (!desc->bd_complete);   /* Callback hasn't happened */
519                 desc->bd_network_rw = 0;
520                 return;
521         case PTL_MD_INUSE:                  /* MD is being accessed right now */
522                 for (;;) {
523                         /* Network access will complete in finite time but the
524                          * timeout lets us CERROR for visibility */
525                         if (desc->bd_req->rq_set != NULL)
526                                 wq = &req->rq_set->set_waitq;
527                         else
528                                 wq = &req->rq_wait_for_rep;
529                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
530                         rc = l_wait_event(*wq, ptlrpc_bulk_complete(desc), &lwi);
531                         LASSERT (rc == 0 || rc == -ETIMEDOUT);
532                         if (rc == 0)
533                                 break;
534                         CERROR ("Unexpectedly long timeout: desc %p\n", desc);
535                         LBUG();
536                 }
537                 /* Fall through */
538         case PTL_INV_MD:                     /* Lost the race with completion */
539                 LASSERT (desc->bd_complete);/* Callback has run to completion */
540                 LASSERT (!desc->bd_network_rw);
541                 return;
542         }
543 }
544
545 int ptlrpc_reply(struct ptlrpc_request *req)
546 {
547         unsigned long flags;
548         int rc;
549
550         /* We must already have a reply buffer (only ptlrpc_error() may be
551          * called without one).  We must also have a request buffer which
552          * is either the actual (swabbed) incoming request, or a saved copy
553          * if this is a req saved in target_queue_final_reply(). */
554         LASSERT (req->rq_repmsg != NULL);
555         LASSERT (req->rq_reqmsg != NULL);
556
557         /* FIXME: we need to increment the count of handled events */
558         if (req->rq_type != PTL_RPC_MSG_ERR)
559                 req->rq_type = PTL_RPC_MSG_REPLY;
560
561         req->rq_repmsg->status = req->rq_status;
562         req->rq_repmsg->opc = req->rq_reqmsg->opc;
563
564         init_waitqueue_head(&req->rq_wait_for_rep);
565         rc = ptl_send_buf(req, req->rq_connection, req->rq_svc->srv_rep_portal);
566         if (rc != 0) {
567                 /* Do what the callback handler would have done */
568                 OBD_FREE (req->rq_repmsg, req->rq_replen);
569
570                 spin_lock_irqsave (&req->rq_lock, flags);
571                 req->rq_want_ack = 0;
572                 spin_unlock_irqrestore (&req->rq_lock, flags);
573         }
574         return rc;
575 }
576
577 int ptlrpc_error(struct ptlrpc_request *req)
578 {
579         int rc;
580         ENTRY;
581
582         if (!req->rq_repmsg) {
583                 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
584                                      &req->rq_repmsg);
585                 if (rc)
586                         RETURN(rc);
587         }
588
589
590         req->rq_type = PTL_RPC_MSG_ERR;
591
592         rc = ptlrpc_reply(req);
593         RETURN(rc);
594 }
595
596 int ptl_send_rpc(struct ptlrpc_request *request)
597 {
598         int rc;
599         int rc2;
600         unsigned long flags;
601         ptl_process_id_t source_id;
602         ptl_handle_me_t  reply_me_h;
603         ENTRY;
604
605         LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
606
607         /* If this is a re-transmit, we're required to have disengaged
608          * cleanly from the previous attempt */
609         LASSERT (!request->rq_receiving_reply);
610
611         if (request->rq_bulk != NULL) {
612                 rc = ptlrpc_register_bulk (request);
613                 if (rc != 0)
614                         RETURN(rc);
615         }
616
617         request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
618
619         source_id.nid = request->rq_connection->c_peer.peer_nid;
620         source_id.pid = PTL_PID_ANY;
621
622         LASSERT (request->rq_replen != 0);
623         OBD_ALLOC(request->rq_repmsg, request->rq_replen);
624         if (request->rq_repmsg == NULL) {
625                 LBUG();
626                 RETURN(-ENOMEM);
627         }
628
629         rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
630                          request->rq_reply_portal, /* XXX FIXME bug 249 */
631                          source_id, request->rq_xid, 0, PTL_UNLINK,
632                          PTL_INS_AFTER, &reply_me_h);
633         if (rc != PTL_OK) {
634                 CERROR("PtlMEAttach failed: %d\n", rc);
635                 LASSERT (rc == PTL_NOSPACE);
636                 LBUG();
637                 GOTO(cleanup, rc = -ENOMEM);
638         }
639
640         request->rq_reply_md.start = request->rq_repmsg;
641         request->rq_reply_md.length = request->rq_replen;
642         request->rq_reply_md.threshold = 1;
643         request->rq_reply_md.options = PTL_MD_OP_PUT;
644         request->rq_reply_md.user_ptr = request;
645         request->rq_reply_md.eventq =
646                 request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
647
648         rc = PtlMDAttach(reply_me_h, request->rq_reply_md,
649                          PTL_UNLINK, &request->rq_reply_md_h);
650         if (rc != PTL_OK) {
651                 CERROR("PtlMDAttach failed: %d\n", rc);
652                 LASSERT (rc == PTL_NOSPACE);
653                 LBUG();
654                 GOTO(cleanup2, rc -ENOMEM);
655         }
656
657         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
658                ", portal %u on %s\n",
659                request->rq_replen, request->rq_xid,
660                request->rq_reply_portal,
661                request->rq_connection->c_peer.peer_ni->pni_name);
662
663         ptlrpc_request_addref(request);        /* 1 ref for the SENT callback */
664
665         spin_lock_irqsave (&request->rq_lock, flags);
666         request->rq_receiving_reply = 1;
667         /* Clear any flags that may be present from previous sends. */
668         request->rq_replied = 0;
669         request->rq_err = 0;
670         request->rq_timedout = 0;
671         request->rq_resend = 0;
672         request->rq_restart = 0;
673         spin_unlock_irqrestore (&request->rq_lock, flags);
674
675         request->rq_sent = LTIME_S(CURRENT_TIME);
676         ptlrpc_pinger_sending_on_import(request->rq_import);
677         rc = ptl_send_buf(request, request->rq_connection,
678                           request->rq_request_portal);
679         if (rc == 0)
680                 RETURN(rc);
681
682         spin_lock_irqsave (&request->rq_lock, flags);
683         request->rq_receiving_reply = 0;
684         spin_unlock_irqrestore (&request->rq_lock, flags);
685         ptlrpc_req_finished (request);          /* drop callback ref */
686  cleanup2:
687         /* MEUnlink is safe; the PUT didn't even get off the ground, and
688          * nobody apart from the PUT's target has the right nid+XID to
689          * access the reply buffer. */
690         rc2 = PtlMEUnlink(reply_me_h);
691         LASSERT (rc2 == PTL_OK);
692  cleanup:
693         OBD_FREE(request->rq_repmsg, request->rq_replen);
694         request->rq_repmsg = NULL;
695         return rc;
696 }
697
698 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
699 {
700         struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
701         struct ptlrpc_service *service = srv_ni->sni_service;
702         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
703         int rc;
704         ptl_md_t dummy;
705         ptl_handle_md_t md_h;
706
707         LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
708
709         CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx."LPX64"\n",
710                service->srv_req_portal, srv_ni->sni_ni->pni_name,
711                srv_ni->sni_ni->pni_ni_h.nal_idx,
712                srv_ni->sni_ni->pni_ni_h.cookie);
713
714         /* Attach the leading ME on which we build the ring */
715         rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
716                          match_id, 0, ~0,
717                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
718         if (rc != PTL_OK) {
719                 CERROR("PtlMEAttach failed: %d\n", rc);
720                 /* BUG 1191 */
721                 LBUG();
722         }
723
724         dummy.start      = rqbd->rqbd_buffer;
725         dummy.length     = service->srv_buf_size;
726         dummy.max_size   = service->srv_max_req_size;
727         dummy.threshold  = PTL_MD_THRESH_INF;
728         dummy.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
729         dummy.user_ptr   = rqbd;
730         dummy.eventq     = srv_ni->sni_eq_h;
731
732         atomic_inc(&srv_ni->sni_nrqbds_receiving);
733         atomic_set(&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
734
735         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
736         if (rc != PTL_OK) {
737                 CERROR("PtlMDAttach failed: %d\n", rc);
738                 LASSERT (rc == PTL_NOSPACE);
739                 LBUG();
740                 /* BUG 1191 */
741                 PtlMEUnlink (rqbd->rqbd_me_h);
742                 atomic_set(&rqbd->rqbd_refcount, 0);
743                 atomic_dec(&srv_ni->sni_nrqbds_receiving);
744         }
745 }