Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <liblustre.h>
26 #include <portals/lib-types.h>
27 #endif
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32
33 static int ptl_send_buf(struct ptlrpc_request *request,
34                         struct ptlrpc_connection *conn, int portal)
35 {
36         int rc;
37         int rc2;
38         ptl_process_id_t remote_id;
39         ptl_handle_md_t md_h;
40         ptl_ack_req_t ack_req;
41
42         LASSERT (portal != 0);
43         LASSERT (conn != NULL);
44         CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" on %s\n",
45                 conn, conn->c_peer.peer_ni->pni_name,
46                 conn->c_peer.peer_nid, conn->c_peer.peer_ni->pni_name);
47
48         request->rq_req_md.user_ptr = request;
49
50         switch (request->rq_type) {
51         case PTL_RPC_MSG_REQUEST:
52                 request->rq_reqmsg->type = request->rq_type;
53                 request->rq_req_md.start = request->rq_reqmsg;
54                 request->rq_req_md.length = request->rq_reqlen;
55                 request->rq_req_md.eventq =
56                         conn->c_peer.peer_ni->pni_request_out_eq_h;
57                 LASSERT (!request->rq_want_ack);
58                 break;
59         case PTL_RPC_MSG_ERR:
60         case PTL_RPC_MSG_REPLY:
61                 request->rq_repmsg->type = request->rq_type;
62                 request->rq_req_md.start = request->rq_repmsg;
63                 request->rq_req_md.length = request->rq_replen;
64                 request->rq_req_md.eventq =
65                         conn->c_peer.peer_ni->pni_reply_out_eq_h;
66                 break;
67         default:
68                 LBUG();
69                 return -1; /* notreached */
70         }
71         if (request->rq_want_ack) {
72                 request->rq_req_md.threshold = 2; /* SENT and ACK */
73                 ack_req = PTL_ACK_REQ;
74         } else {
75                 request->rq_req_md.threshold = 1;
76                 ack_req = PTL_NOACK_REQ;
77         }
78         request->rq_req_md.options = PTL_MD_OP_PUT;
79         request->rq_req_md.user_ptr = request;
80
81         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
82                 request->rq_req_md.options |= PTL_MD_ACK_DISABLE;
83                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
84         }
85
86         /* NB if the send fails, we back out of the send and return
87          * failure; it's down to the caller to handle missing callbacks */
88
89         rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h, request->rq_req_md,
90                        &md_h);
91         if (rc != PTL_OK) {
92                 CERROR("PtlMDBind failed: %d\n", rc);
93                 LASSERT (rc == PTL_NOSPACE);
94                 RETURN (-ENOMEM);
95         }
96         if (request->rq_type != PTL_RPC_MSG_REQUEST)
97                 memcpy(&request->rq_reply_md_h, &md_h, sizeof(md_h));
98
99         remote_id.nid = conn->c_peer.peer_nid;
100         remote_id.pid = 0;
101
102         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
103                request->rq_req_md.length, portal, request->rq_xid);
104
105         rc = PtlPut(md_h, ack_req, remote_id, portal, 0, request->rq_xid, 0, 0);
106         if (rc != PTL_OK) {
107                 CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
108                        remote_id.nid, portal, request->rq_xid, rc);
109                 rc2 = PtlMDUnlink(md_h);
110                 LASSERT (rc2 == PTL_OK);
111                 RETURN ((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
112         }
113
114         return 0;
115 }
116
117 static inline ptl_kiov_t *
118 ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
119 {
120         ptl_kiov_t *iov;
121
122         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
123                 return (desc->bd_iov);
124
125         OBD_ALLOC (iov, desc->bd_page_count * sizeof (*iov));
126         if (iov == NULL)
127                 LBUG();
128
129         return (iov);
130 }
131
132 static inline void
133 ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, ptl_kiov_t *iov)
134 {
135         if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (*iov))
136                 return;
137
138         OBD_FREE (iov, desc->bd_page_count * sizeof (*iov));
139 }
140
141 int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *desc)
142 {
143         int rc;
144         int rc2;
145         struct ptlrpc_peer *peer;
146         struct list_head *tmp, *next;
147         ptl_process_id_t remote_id;
148         ptl_kiov_t *iov;
149         __u64 xid;
150         ENTRY;
151
152         /* NB no locking required until desc is on the network */
153         LASSERT (!desc->bd_network_rw);
154         LASSERT (desc->bd_type == BULK_PUT_SOURCE);
155         desc->bd_complete = 0;
156
157         iov = ptlrpc_get_bulk_iov (desc);
158         if (iov == NULL)
159                 RETURN (-ENOMEM);
160
161         peer = &desc->bd_export->exp_connection->c_peer;
162
163         desc->bd_md.start = iov;
164         desc->bd_md.niov = 0;
165         desc->bd_md.length = 0;
166         desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_source_eq_h;
167         desc->bd_md.threshold = 2; /* SENT and ACK */
168         desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
169         desc->bd_md.user_ptr = desc;
170
171         desc->bd_callback_count = 2;
172
173         list_for_each_safe(tmp, next, &desc->bd_page_list) {
174                 struct ptlrpc_bulk_page *bulk;
175                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
176
177                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
178
179                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
180                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
181                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
182
183                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
184                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
185                 desc->bd_md.niov++;
186                 desc->bd_md.length += bulk->bp_buflen;
187         }
188
189         /* NB total length may be 0 for a read past EOF, so we send a 0
190          * length bulk, since the client expects a bulk event. */
191         LASSERT(desc->bd_md.niov == desc->bd_page_count);
192
193         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md,
194                        &desc->bd_md_h);
195
196         ptlrpc_put_bulk_iov (desc, iov); /*move down to reduce latency to send*/
197
198         if (rc != PTL_OK) {
199                 CERROR("PtlMDBind failed: %d\n", rc);
200                 LASSERT (rc == PTL_NOSPACE);
201                 RETURN(-ENOMEM);
202         }
203
204         /* Client's bulk and reply matchbits are the same */
205         xid = desc->bd_req->rq_xid;
206         remote_id.nid = peer->peer_nid;
207         remote_id.pid = 0;
208
209         CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d on %s "
210                "nid "LPX64" pid %d xid "LPX64"\n",
211                desc->bd_md.niov, desc->bd_md.length,
212                desc->bd_portal, peer->peer_ni->pni_name,
213                remote_id.nid, remote_id.pid, xid);
214
215         desc->bd_network_rw = 1;
216         rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id,
217                     desc->bd_portal, 0, xid, 0, 0);
218         if (rc != PTL_OK) {
219                 desc->bd_network_rw = 0;
220                 CERROR("PtlPut("LPU64", %d, "LPX64") failed: %d\n",
221                        remote_id.nid, desc->bd_portal, xid, rc);
222                 rc2 = PtlMDUnlink(desc->bd_md_h);
223                 LASSERT (rc2 == PTL_OK);
224                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
225         }
226
227         RETURN(0);
228 }
229
230 int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *desc)
231 {
232         int rc;
233         int rc2;
234         struct ptlrpc_peer *peer;
235         struct list_head *tmp, *next;
236         ptl_process_id_t remote_id;
237         ptl_kiov_t *iov;
238         __u64 xid;
239         ENTRY;
240
241         /* NB no locking required until desc is on the network */
242         LASSERT (!desc->bd_network_rw);
243         LASSERT (desc->bd_type == BULK_GET_SINK);
244         desc->bd_complete = 0;
245
246         iov = ptlrpc_get_bulk_iov (desc);
247         if (iov == NULL)
248                 RETURN(-ENOMEM);
249
250         peer = &desc->bd_export->exp_connection->c_peer;
251
252         desc->bd_md.start = iov;
253         desc->bd_md.niov = 0;
254         desc->bd_md.length = 0;
255         desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_sink_eq_h;
256         desc->bd_md.threshold = 2; /* SENT and REPLY */
257         desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
258         desc->bd_md.user_ptr = desc;
259
260         desc->bd_callback_count = 2;
261
262         list_for_each_safe(tmp, next, &desc->bd_page_list) {
263                 struct ptlrpc_bulk_page *bulk;
264                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
265
266                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
267
268                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
269                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
270                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
271
272                 LASSERT (iov[desc->bd_md.niov].kiov_offset +
273                          iov[desc->bd_md.niov].kiov_len <= PAGE_SIZE);
274                 desc->bd_md.niov++;
275                 desc->bd_md.length += bulk->bp_buflen;
276         }
277
278         LASSERT(desc->bd_md.niov == desc->bd_page_count);
279         LASSERT(desc->bd_md.niov != 0);
280
281         rc = PtlMDBind(peer->peer_ni->pni_ni_h, desc->bd_md, &desc->bd_md_h);
282
283         ptlrpc_put_bulk_iov(desc, iov); /*move down to reduce latency to send*/
284
285         if (rc != PTL_OK) {
286                 CERROR("PtlMDBind failed: %d\n", rc);
287                 LASSERT (rc == PTL_NOSPACE);
288                 RETURN(-ENOMEM);
289         }
290
291         /* Client's bulk and reply matchbits are the same */
292         xid = desc->bd_req->rq_xid;
293         remote_id.nid = desc->bd_export->exp_connection->c_peer.peer_nid;
294         remote_id.pid = 0;
295
296         CDEBUG(D_NET, "Fetching %u pages %u bytes from portal %d on %s "
297                "nid "LPX64" pid %d xid "LPX64"\n",
298                desc->bd_md.niov, desc->bd_md.length, desc->bd_portal,
299                peer->peer_ni->pni_name, remote_id.nid, remote_id.pid,
300                xid);
301
302         desc->bd_network_rw = 1;
303         rc = PtlGet(desc->bd_md_h, remote_id, desc->bd_portal, 0,
304                     xid, 0);
305         if (rc != PTL_OK) {
306                 desc->bd_network_rw = 0;
307                 CERROR("PtlGet("LPU64", %d, "LPX64") failed: %d\n",
308                        remote_id.nid, desc->bd_portal, xid, rc);
309                 rc2 = PtlMDUnlink(desc->bd_md_h);
310                 LASSERT (rc2 == PTL_OK);
311                 RETURN((rc == PTL_NOSPACE) ? -ENOMEM : -ECOMM);
312         }
313
314         RETURN(0);
315 }
316
317 void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
318 {
319         /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
320          * serialises with completion callback) */
321         unsigned long      flags;
322         struct l_wait_info lwi;
323         int                callback_count;
324         int                rc;
325
326         LASSERT (!in_interrupt ());             /* might sleep */
327
328         /* NB. server-side bulk gets 2 events, so we have to keep trying to
329          * unlink the MD until all callbacks have happened, or
330          * PtlMDUnlink() returns OK or INVALID */
331  again:
332         spin_lock_irqsave (&desc->bd_lock, flags);
333         if (!desc->bd_network_rw) {
334                 /* completed or never even registered. NB holding bd_lock
335                  * guarantees callback has completed if it ran. */
336                 spin_unlock_irqrestore (&desc->bd_lock, flags);
337                 return;
338         }
339
340         /* sample callback count while we have the lock */
341         callback_count = desc->bd_callback_count;
342         spin_unlock_irqrestore (&desc->bd_lock, flags);
343
344         rc = PtlMDUnlink (desc->bd_md_h);
345         switch (rc) {
346         default:
347                 CERROR("PtlMDUnlink returned %d\n", rc);
348                 LBUG ();
349         case PTL_OK:                    /* Won the race with the network */
350                 LASSERT (!desc->bd_complete); /* Not all callbacks ran */
351                 desc->bd_network_rw = 0;
352                 return;
353
354         case PTL_MD_INUSE:              /* MD is being accessed right now */
355                 for (;;) {
356                         /* Network access will complete in finite time but the
357                          * timeout lets us CERROR for visibility */
358                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
359                         rc = l_wait_event(desc->bd_waitq,
360                                           desc->bd_callback_count !=
361                                           callback_count, &lwi);
362                         if (rc == -ETIMEDOUT) {
363                                 CERROR("Unexpectedly long timeout: desc %p\n",
364                                        desc);
365                                 continue;
366                         }
367                         LASSERT (rc == 0);
368                         break;
369                 }
370                 /* go back and try again... */
371                 goto again;
372
373         case PTL_INV_MD:            /* Lost the race with completion */
374                 LASSERT (desc->bd_complete);    /* Callbacks all ran */
375                 LASSERT (!desc->bd_network_rw);
376                 return;
377         }
378 }
379
380 int ptlrpc_register_bulk (struct ptlrpc_request *req)
381 {
382         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
383         struct ptlrpc_peer *peer;
384         struct list_head *tmp, *next;
385         int rc;
386         int rc2;
387         ptl_kiov_t *iov;
388         ptl_process_id_t source_id;
389         ENTRY;
390
391         /* NB no locking required until desc is on the network */
392         LASSERT (!desc->bd_network_rw);
393         LASSERT (desc->bd_page_count <= PTL_MD_MAX_IOV);
394         LASSERT (desc->bd_req != NULL);
395         LASSERT (desc->bd_type == BULK_PUT_SINK ||
396                  desc->bd_type == BULK_GET_SOURCE);
397
398         desc->bd_complete = 0;
399
400         iov = ptlrpc_get_bulk_iov (desc);
401         if (iov == NULL)
402                 return (-ENOMEM);
403
404         peer = &desc->bd_import->imp_connection->c_peer;
405
406         desc->bd_md.start = iov;
407         desc->bd_md.niov = 0;
408         desc->bd_md.length = 0;
409         desc->bd_md.threshold = 1;
410         desc->bd_md.user_ptr = desc;
411
412         if (desc->bd_type == BULK_GET_SOURCE) {
413                 desc->bd_md.options = PTL_MD_OP_GET | PTL_MD_KIOV;
414                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_get_source_eq_h;
415         } else {
416                 desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_KIOV;
417                 desc->bd_md.eventq = peer->peer_ni->pni_bulk_put_sink_eq_h;
418         }
419
420         list_for_each_safe(tmp, next, &desc->bd_page_list) {
421                 struct ptlrpc_bulk_page *bulk;
422                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
423
424                 LASSERT(desc->bd_md.niov < desc->bd_page_count);
425
426                 iov[desc->bd_md.niov].kiov_page = bulk->bp_page;
427                 iov[desc->bd_md.niov].kiov_len = bulk->bp_buflen;
428                 iov[desc->bd_md.niov].kiov_offset = bulk->bp_pageoffset;
429
430                 LASSERT (bulk->bp_pageoffset + bulk->bp_buflen <= PAGE_SIZE);
431                 desc->bd_md.niov++;
432                 desc->bd_md.length += bulk->bp_buflen;
433         }
434
435         LASSERT(desc->bd_md.niov == desc->bd_page_count);
436         LASSERT(desc->bd_md.niov != 0);
437
438         /* XXX Registering the same xid on retried bulk makes my head
439          * explode trying to understand how the original request's bulk
440          * might interfere with the retried request -eeb */
441         LASSERT (!desc->bd_registered || req->rq_xid != desc->bd_last_xid);
442         desc->bd_registered = 1;
443         desc->bd_last_xid = desc->bd_last_xid;
444
445         source_id.nid = desc->bd_import->imp_connection->c_peer.peer_nid;
446         source_id.pid = PTL_PID_ANY;
447
448         rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
449                          desc->bd_portal, source_id, req->rq_xid, 0,
450                          PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h);
451
452         if (rc != PTL_OK) {
453                 CERROR("PtlMEAttach failed: %d\n", rc);
454                 LASSERT (rc == PTL_NOSPACE);
455                 GOTO(out, rc = -ENOMEM);
456         }
457
458         /* About to let the network at it... */
459         desc->bd_network_rw = 1;
460         rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK,
461                          &desc->bd_md_h);
462         if (rc != PTL_OK) {
463                 CERROR("PtlMDAttach failed: %d\n", rc);
464                 LASSERT (rc == PTL_NOSPACE);
465                 desc->bd_network_rw = 0;
466                 rc2 = PtlMEUnlink (desc->bd_me_h);
467                 LASSERT (rc2 == PTL_OK);
468                 GOTO(out, rc = -ENOMEM);
469         }
470         rc = 0;
471
472         CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
473                "portal %u on %s\n",
474                desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
475                desc->bd_md.niov, desc->bd_md.length,
476                req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
477
478  out:
479         ptlrpc_put_bulk_iov (desc, iov);
480         RETURN(rc);
481 }
482
483 void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
484 {
485         /* Disconnect a bulk desc from the network. Idempotent. Not
486          * thread-safe (i.e. only interlocks with completion callback). */
487         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
488         wait_queue_head_t       *wq;
489         unsigned long            flags;
490         struct l_wait_info       lwi;
491         int                      rc;
492
493         LASSERT (!in_interrupt ());             /* might sleep */
494
495         spin_lock_irqsave (&desc->bd_lock, flags);
496         if (!desc->bd_network_rw) {     /* completed or never even registered */
497                 spin_unlock_irqrestore (&desc->bd_lock, flags);
498                 return;
499         }
500         spin_unlock_irqrestore (&desc->bd_lock, flags);
501
502         LASSERT (desc->bd_req == req);     /* NB bd_req NULL until registered */
503
504         /* NB...
505          * 1. If the MD unlink is successful, the ME gets unlinked too.
506          * 2. Since client-side bulk only gets a single event and a
507          * .. threshold of 1.  If the MD was inuse at the first link
508          * .. attempt, the callback is due any minute, and the MD/ME will
509          * .. unlink themselves.
510          */
511         rc = PtlMDUnlink (desc->bd_md_h);
512         switch (rc) {
513         default:
514                 CERROR("PtlMDUnlink returned %d\n", rc);
515                 LBUG ();
516         case PTL_OK:                          /* Won the race with completion */
517                 LASSERT (!desc->bd_complete);   /* Callback hasn't happened */
518                 desc->bd_network_rw = 0;
519                 return;
520         case PTL_MD_INUSE:                  /* MD is being accessed right now */
521                 for (;;) {
522                         /* Network access will complete in finite time but the
523                          * timeout lets us CERROR for visibility */
524                         if (desc->bd_req->rq_set != NULL)
525                                 wq = &req->rq_set->set_waitq;
526                         else
527                                 wq = &req->rq_wait_for_rep;
528                         lwi = LWI_TIMEOUT (10 * HZ, NULL, NULL);
529                         rc = l_wait_event(*wq, ptlrpc_bulk_complete(desc), &lwi);
530                         LASSERT (rc == 0 || rc == -ETIMEDOUT);
531                         if (rc == 0)
532                                 break;
533                         CERROR ("Unexpectedly long timeout: desc %p\n", desc);
534                         LBUG();
535                 }
536                 /* Fall through */
537         case PTL_INV_MD:                     /* Lost the race with completion */
538                 LASSERT (desc->bd_complete);/* Callback has run to completion */
539                 LASSERT (!desc->bd_network_rw);
540                 return;
541         }
542 }
543
544 int ptlrpc_reply(struct ptlrpc_request *req)
545 {
546         unsigned long flags;
547         int rc;
548
549         /* We must already have a reply buffer (only ptlrpc_error() may be
550          * called without one).  We must also have a request buffer which
551          * is either the actual (swabbed) incoming request, or a saved copy
552          * if this is a req saved in target_queue_final_reply(). */
553         LASSERT (req->rq_repmsg != NULL);
554         LASSERT (req->rq_reqmsg != NULL);
555
556         /* FIXME: we need to increment the count of handled events */
557         if (req->rq_type != PTL_RPC_MSG_ERR)
558                 req->rq_type = PTL_RPC_MSG_REPLY;
559
560         req->rq_repmsg->status = req->rq_status;
561         req->rq_repmsg->opc = req->rq_reqmsg->opc;
562
563         init_waitqueue_head(&req->rq_wait_for_rep);
564         rc = ptl_send_buf(req, req->rq_connection, req->rq_svc->srv_rep_portal);
565         if (rc != 0) {
566                 /* Do what the callback handler would have done */
567                 OBD_FREE (req->rq_repmsg, req->rq_replen);
568
569                 spin_lock_irqsave (&req->rq_lock, flags);
570                 req->rq_want_ack = 0;
571                 spin_unlock_irqrestore (&req->rq_lock, flags);
572         }
573         return rc;
574 }
575
576 int ptlrpc_error(struct ptlrpc_request *req)
577 {
578         int rc;
579         ENTRY;
580
581         if (!req->rq_repmsg) {
582                 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
583                                      &req->rq_repmsg);
584                 if (rc)
585                         RETURN(rc);
586         }
587
588
589         req->rq_type = PTL_RPC_MSG_ERR;
590
591         rc = ptlrpc_reply(req);
592         RETURN(rc);
593 }
594
595 int ptl_send_rpc(struct ptlrpc_request *request)
596 {
597         int rc;
598         int rc2;
599         unsigned long flags;
600         ptl_process_id_t source_id;
601         ptl_handle_me_t  reply_me_h;
602         ENTRY;
603
604         LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
605
606         /* If this is a re-transmit, we're required to have disengaged
607          * cleanly from the previous attempt */
608         LASSERT (!request->rq_receiving_reply);
609
610         if (request->rq_bulk != NULL) {
611                 rc = ptlrpc_register_bulk (request);
612                 if (rc != 0)
613                         RETURN(rc);
614         }
615
616         request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
617
618         source_id.nid = request->rq_connection->c_peer.peer_nid;
619         source_id.pid = PTL_PID_ANY;
620
621         LASSERT (request->rq_replen != 0);
622         OBD_ALLOC(request->rq_repmsg, request->rq_replen);
623         if (request->rq_repmsg == NULL) {
624                 LBUG();
625                 RETURN(-ENOMEM);
626         }
627
628         rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni->pni_ni_h,
629                          request->rq_reply_portal, /* XXX FIXME bug 249 */
630                          source_id, request->rq_xid, 0, PTL_UNLINK,
631                          PTL_INS_AFTER, &reply_me_h);
632         if (rc != PTL_OK) {
633                 CERROR("PtlMEAttach failed: %d\n", rc);
634                 LASSERT (rc == PTL_NOSPACE);
635                 LBUG();
636                 GOTO(cleanup, rc = -ENOMEM);
637         }
638
639         request->rq_reply_md.start = request->rq_repmsg;
640         request->rq_reply_md.length = request->rq_replen;
641         request->rq_reply_md.threshold = 1;
642         request->rq_reply_md.options = PTL_MD_OP_PUT;
643         request->rq_reply_md.user_ptr = request;
644         request->rq_reply_md.eventq =
645                 request->rq_connection->c_peer.peer_ni->pni_reply_in_eq_h;
646
647         rc = PtlMDAttach(reply_me_h, request->rq_reply_md,
648                          PTL_UNLINK, &request->rq_reply_md_h);
649         if (rc != PTL_OK) {
650                 CERROR("PtlMDAttach failed: %d\n", rc);
651                 LASSERT (rc == PTL_NOSPACE);
652                 LBUG();
653                 GOTO(cleanup2, rc -ENOMEM);
654         }
655
656         CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
657                ", portal %u on %s\n",
658                request->rq_replen, request->rq_xid,
659                request->rq_reply_portal,
660                request->rq_connection->c_peer.peer_ni->pni_name);
661
662         ptlrpc_request_addref(request);        /* 1 ref for the SENT callback */
663
664         spin_lock_irqsave (&request->rq_lock, flags);
665         request->rq_receiving_reply = 1;
666         /* Clear any flags that may be present from previous sends. */
667         request->rq_replied = 0;
668         request->rq_err = 0;
669         request->rq_timedout = 0;
670         request->rq_resend = 0;
671         request->rq_restart = 0;
672         spin_unlock_irqrestore (&request->rq_lock, flags);
673
674         request->rq_sent = LTIME_S(CURRENT_TIME);
675         rc = ptl_send_buf(request, request->rq_connection,
676                           request->rq_request_portal);
677         if (rc == 0)
678                 RETURN(rc);
679
680         spin_lock_irqsave (&request->rq_lock, flags);
681         request->rq_receiving_reply = 0;
682         spin_unlock_irqrestore (&request->rq_lock, flags);
683         ptlrpc_req_finished (request);          /* drop callback ref */
684  cleanup2:
685         /* MEUnlink is safe; the PUT didn't even get off the ground, and
686          * nobody apart from the PUT's target has the right nid+XID to
687          * access the reply buffer. */
688         rc2 = PtlMEUnlink(reply_me_h);
689         LASSERT (rc2 == PTL_OK);
690  cleanup:
691         OBD_FREE(request->rq_repmsg, request->rq_replen);
692         request->rq_repmsg = NULL;
693         return rc;
694 }
695
696 void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
697 {
698         struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
699         struct ptlrpc_service *service = srv_ni->sni_service;
700         static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
701         int rc;
702         ptl_md_t dummy;
703         ptl_handle_md_t md_h;
704
705         LASSERT(atomic_read(&rqbd->rqbd_refcount) == 0);
706
707         CDEBUG(D_NET, "PtlMEAttach: portal %d on %s h %lx."LPX64"\n",
708                service->srv_req_portal, srv_ni->sni_ni->pni_name,
709                srv_ni->sni_ni->pni_ni_h.nal_idx,
710                srv_ni->sni_ni->pni_ni_h.cookie);
711
712         /* Attach the leading ME on which we build the ring */
713         rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
714                          match_id, 0, ~0,
715                          PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h);
716         if (rc != PTL_OK) {
717                 CERROR("PtlMEAttach failed: %d\n", rc);
718                 /* BUG 1191 */
719                 LBUG();
720         }
721
722         dummy.start      = rqbd->rqbd_buffer;
723         dummy.length     = service->srv_buf_size;
724         dummy.max_size   = service->srv_max_req_size;
725         dummy.threshold  = PTL_MD_THRESH_INF;
726         dummy.options    = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
727         dummy.user_ptr   = rqbd;
728         dummy.eventq     = srv_ni->sni_eq_h;
729
730         atomic_inc(&srv_ni->sni_nrqbds_receiving);
731         atomic_set(&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
732
733         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
734         if (rc != PTL_OK) {
735                 CERROR("PtlMDAttach failed: %d\n", rc);
736                 LASSERT (rc == PTL_NOSPACE);
737                 LBUG();
738                 /* BUG 1191 */
739                 PtlMEUnlink (rqbd->rqbd_me_h);
740                 atomic_set(&rqbd->rqbd_refcount, 0);
741                 atomic_dec(&srv_ni->sni_nrqbds_receiving);
742         }
743 }