1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #define DEBUG_SUBSYSTEM S_RPC
25 #include <liblustre.h>
27 #include <linux/obd_class.h>
28 #include <linux/obd_support.h>
29 #include <linux/lustre_net.h>
30 #include <linux/lustre_lib.h>
31 #include <linux/obd.h>
32 #include <linux/lustre_sec.h>
33 #include "ptlrpc_internal.h"
35 static int ptl_send_buf (ptl_handle_md_t *mdh, void *base, int len,
36 ptl_ack_req_t ack, struct ptlrpc_cb_id *cbid,
37 struct ptlrpc_connection *conn, int portal, __u64 xid)
41 char str[PTL_NALFMT_SIZE];
44 LASSERT (portal != 0);
45 LASSERT (conn != NULL);
46 CDEBUG (D_INFO, "conn=%p ni %s id %s on %s\n",
47 conn, conn->c_peer.peer_ni->pni_name,
48 ptlrpc_id2str(&conn->c_peer, str),
49 conn->c_peer.peer_ni->pni_name);
52 md.threshold = (ack == PTL_ACK_REQ) ? 2 : 1;
53 md.options = PTLRPC_MD_OPTIONS;
55 md.eq_handle = conn->c_peer.peer_ni->pni_eq_h;
57 if (ack == PTL_ACK_REQ &&
58 OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
59 /* don't ask for the ack to simulate failing client */
61 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
64 rc = PtlMDBind (conn->c_peer.peer_ni->pni_ni_h, md,
67 CERROR ("PtlMDBind failed: %d\n", rc);
68 LASSERT (rc == PTL_NO_SPACE);
72 CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
75 rc = PtlPut (*mdh, ack, conn->c_peer.peer_id, portal, 0, xid, 0, 0);
78 /* We're going to get an UNLINK event when I unlink below,
79 * which will complete just like any other failed send, so
80 * I fall through and return success here! */
81 CERROR("PtlPut(%s, %d, "LPD64") failed: %d\n",
82 ptlrpc_id2str(&conn->c_peer, str),
84 rc2 = PtlMDUnlink(*mdh);
85 LASSERTF(rc2 == PTL_OK, "rc2 = %d\n", rc2);
91 int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
95 struct ptlrpc_peer *peer;
98 char str[PTL_NALFMT_SIZE];
101 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET))
104 /* NB no locking required until desc is on the network */
105 LASSERT (!desc->bd_network_rw);
106 LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
107 desc->bd_type == BULK_GET_SINK);
108 desc->bd_success = 0;
109 peer = &desc->bd_export->exp_connection->c_peer;
111 md.user_ptr = &desc->bd_cbid;
112 md.eq_handle = peer->peer_ni->pni_eq_h;
113 md.threshold = 2; /* SENT and ACK/REPLY */
114 md.options = PTLRPC_MD_OPTIONS;
115 ptlrpc_fill_bulk_md(&md, desc);
117 LASSERT (desc->bd_cbid.cbid_fn == server_bulk_callback);
118 LASSERT (desc->bd_cbid.cbid_arg == desc);
120 /* NB total length may be 0 for a read past EOF, so we send a 0
121 * length bulk, since the client expects a bulk event. */
123 rc = PtlMDBind(peer->peer_ni->pni_ni_h, md,
124 PTL_UNLINK, &desc->bd_md_h);
126 CERROR("PtlMDBind failed: %d\n", rc);
127 LASSERT (rc == PTL_NO_SPACE);
131 /* Client's bulk and reply matchbits are the same */
132 xid = desc->bd_req->rq_xid;
133 CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d on %s "
134 "nid %s pid %d xid "LPX64"\n", desc->bd_iov_count,
135 desc->bd_nob, desc->bd_portal, peer->peer_ni->pni_name,
136 ptlrpc_id2str(peer, str), peer->peer_id.pid, xid);
138 /* Network is about to get at the memory */
139 desc->bd_network_rw = 1;
141 if (desc->bd_type == BULK_PUT_SOURCE)
142 rc = PtlPut (desc->bd_md_h, PTL_ACK_REQ, peer->peer_id,
143 desc->bd_portal, 0, xid, 0, 0);
145 rc = PtlGet (desc->bd_md_h, peer->peer_id,
146 desc->bd_portal, 0, xid, 0);
149 /* Can't send, so we unlink the MD bound above. The UNLINK
150 * event this creates will signal completion with failure,
151 * so we return SUCCESS here! */
152 CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
153 ptlrpc_id2str(peer, str),
154 desc->bd_portal, xid, rc);
155 rc2 = PtlMDUnlink(desc->bd_md_h);
156 LASSERT (rc2 == PTL_OK);
162 void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
164 /* Server side bulk abort. Idempotent. Not thread-safe (i.e. only
165 * serialises with completion callback) */
166 struct l_wait_info lwi;
169 LASSERT (!in_interrupt ()); /* might sleep */
171 if (!ptlrpc_bulk_active(desc)) /* completed or */
172 return; /* never started */
174 /* The unlink ensures the callback happens ASAP and is the last
175 * one. If it fails, it must be because completion just happened,
176 * but we must still l_wait_event() in this case, to give liblustre
177 * a chance to run server_bulk_callback()*/
179 PtlMDUnlink (desc->bd_md_h);
182 /* Network access will complete in finite time but the HUGE
183 * timeout lets us CWARN for visibility of sluggish NALs */
184 lwi = LWI_TIMEOUT (300 * HZ, NULL, NULL);
185 rc = l_wait_event(desc->bd_waitq,
186 !ptlrpc_bulk_active(desc), &lwi);
190 LASSERT(rc == -ETIMEDOUT);
191 CWARN("Unexpectedly long timeout: desc %p\n", desc);
195 int ptlrpc_register_bulk (struct ptlrpc_request *req)
197 struct ptlrpc_bulk_desc *desc = req->rq_bulk;
198 struct ptlrpc_peer *peer;
201 ptl_handle_me_t me_h;
205 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_GET_NET))
208 /* NB no locking required until desc is on the network */
209 LASSERT (desc->bd_nob > 0);
210 LASSERT (!desc->bd_network_rw);
211 LASSERT (desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
212 LASSERT (desc->bd_req != NULL);
213 LASSERT (desc->bd_type == BULK_PUT_SINK ||
214 desc->bd_type == BULK_GET_SOURCE);
216 desc->bd_success = 0;
218 peer = &desc->bd_import->imp_connection->c_peer;
220 md.user_ptr = &desc->bd_cbid;
221 md.eq_handle = peer->peer_ni->pni_eq_h;
222 md.threshold = 1; /* PUT or GET */
223 md.options = PTLRPC_MD_OPTIONS |
224 ((desc->bd_type == BULK_GET_SOURCE) ?
225 PTL_MD_OP_GET : PTL_MD_OP_PUT);
226 ptlrpc_fill_bulk_md(&md, desc);
228 LASSERT (desc->bd_cbid.cbid_fn == client_bulk_callback);
229 LASSERT (desc->bd_cbid.cbid_arg == desc);
231 /* XXX Registering the same xid on retried bulk makes my head
232 * explode trying to understand how the original request's bulk
233 * might interfere with the retried request -eeb */
234 LASSERT (!desc->bd_registered || req->rq_xid != desc->bd_last_xid);
235 desc->bd_registered = 1;
236 desc->bd_last_xid = req->rq_xid;
238 rc = PtlMEAttach(peer->peer_ni->pni_ni_h, desc->bd_portal,
239 desc->bd_import->imp_connection->c_peer.peer_id,
240 req->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &me_h);
242 CERROR("PtlMEAttach failed: %d\n", rc);
243 LASSERT (rc == PTL_NO_SPACE);
247 /* About to let the network at it... */
248 desc->bd_network_rw = 1;
249 rc = PtlMDAttach(me_h, md, PTL_UNLINK, &desc->bd_md_h);
251 CERROR("PtlMDAttach failed: %d\n", rc);
252 LASSERT (rc == PTL_NO_SPACE);
253 desc->bd_network_rw = 0;
254 rc2 = PtlMEUnlink (me_h);
255 LASSERT (rc2 == PTL_OK);
259 CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
261 desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
262 desc->bd_iov_count, desc->bd_nob,
263 req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
267 void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
269 /* Disconnect a bulk desc from the network. Idempotent. Not
270 * thread-safe (i.e. only interlocks with completion callback). */
271 struct ptlrpc_bulk_desc *desc = req->rq_bulk;
272 wait_queue_head_t *wq;
273 struct l_wait_info lwi;
276 LASSERT (!in_interrupt ()); /* might sleep */
278 if (!ptlrpc_bulk_active(desc)) /* completed or */
279 return; /* never registered */
281 LASSERT (desc->bd_req == req); /* bd_req NULL until registered */
283 /* the unlink ensures the callback happens ASAP and is the last
284 * one. If it fails, it must be because completion just happened,
285 * but we must still l_wait_event() in this case to give liblustre
286 * a chance to run client_bulk_callback() */
288 PtlMDUnlink (desc->bd_md_h);
290 if (req->rq_set != NULL)
291 wq = &req->rq_set->set_waitq;
293 wq = &req->rq_reply_waitq;
296 /* Network access will complete in finite time but the HUGE
297 * timeout lets us CWARN for visibility of sluggish NALs */
298 lwi = LWI_TIMEOUT (300 * HZ, NULL, NULL);
299 rc = l_wait_event(*wq, !ptlrpc_bulk_active(desc), &lwi);
303 LASSERT (rc == -ETIMEDOUT);
304 CWARN("Unexpectedly long timeout: desc %p\n", desc);
308 int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
310 struct ptlrpc_service *svc = req->rq_rqbd->rqbd_srv_ni->sni_service;
311 struct ptlrpc_reply_state *rs = req->rq_reply_state;
312 struct ptlrpc_connection *conn;
315 /* We must already have a reply buffer (only ptlrpc_error() may be
316 * called without one). We usually also have a request buffer which
317 * is either the actual (swabbed) incoming request, or a saved copy
318 * if this is a req saved in target_queue_final_reply(). but this
319 * will not be true since some security handling may skip the reqmsg
320 * setting and prepare reply under normal ptlrpc layer */
321 LASSERT (rs != NULL);
322 LASSERT (req->rq_repmsg != NULL);
323 LASSERT (may_be_difficult || !rs->rs_difficult);
324 LASSERT (req->rq_repmsg == rs->rs_msg);
325 LASSERT (rs->rs_cb_id.cbid_fn == reply_out_callback);
326 LASSERT (rs->rs_cb_id.cbid_arg == rs);
328 LASSERT (req->rq_repmsg != NULL);
329 if (req->rq_type != PTL_RPC_MSG_ERR)
330 req->rq_type = PTL_RPC_MSG_REPLY;
332 req->rq_repmsg->type = req->rq_type;
333 req->rq_repmsg->status = req->rq_status;
334 req->rq_repmsg->opc = req->rq_reqmsg ? req->rq_reqmsg->opc : 0;
336 if (req->rq_export == NULL)
337 conn = ptlrpc_get_connection(&req->rq_peer, NULL);
339 conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
341 atomic_inc (&svc->srv_outstanding_replies);
343 rc = svcsec_authorize(req);
345 CDEBUG(D_ERROR, "Error wrap reply message "LPU64"\n", req->rq_xid);
349 rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
350 rs->rs_difficult ? PTL_ACK_REQ : PTL_NOACK_REQ,
352 svc->srv_rep_portal, req->rq_xid);
355 atomic_dec (&svc->srv_outstanding_replies);
357 if (!rs->rs_difficult) {
358 /* Callers other than target_send_reply() expect me
359 * to clean up on a comms error */
360 lustre_free_reply_state (rs);
361 req->rq_reply_state = NULL;
362 req->rq_repmsg = NULL;
365 ptlrpc_put_connection(conn);
369 int ptlrpc_reply (struct ptlrpc_request *req)
371 return (ptlrpc_send_reply (req, 0));
374 int ptlrpc_error(struct ptlrpc_request *req)
379 if (!req->rq_repmsg) {
380 rc = lustre_pack_reply(req, 0, NULL, NULL);
385 req->rq_type = PTL_RPC_MSG_ERR;
387 rc = ptlrpc_send_reply (req, 0);
391 int ptl_send_rpc(struct ptlrpc_request *request)
395 struct ptlrpc_connection *connection;
397 ptl_handle_me_t reply_me_h;
401 LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
403 /* If this is a re-transmit, we're required to have disengaged
404 * cleanly from the previous attempt */
405 LASSERT (!request->rq_receiving_reply);
407 connection = request->rq_import->imp_connection;
409 if (request->rq_bulk != NULL) {
410 rc = ptlrpc_register_bulk (request);
415 request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
416 request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST;
417 request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
419 /* wrap_request might need to refresh gss cred, if this is called
420 * in ptlrpcd then the whole daemon thread will be waiting on
421 * gss negotiate rpc. FIXME
423 rc = ptlrpcs_cli_wrap_request(request);
425 GOTO(cleanup_bulk, rc);
427 LASSERT (request->rq_replen != 0);
428 if (request->rq_repbuf == NULL) {
429 rc = ptlrpcs_cli_alloc_repbuf(request, request->rq_replen);
431 GOTO(cleanup_bulk, rc);
434 rc = PtlMEAttach(connection->c_peer.peer_ni->pni_ni_h,
435 request->rq_reply_portal, /* XXX FIXME bug 249 */
436 connection->c_peer.peer_id, request->rq_xid, 0,
437 PTL_UNLINK, PTL_INS_AFTER, &reply_me_h);
439 CERROR("PtlMEAttach failed: %d\n", rc);
440 LASSERT (rc == PTL_NO_SPACE);
441 GOTO(cleanup_repmsg, rc = -ENOMEM);
444 spin_lock_irqsave (&request->rq_lock, flags);
445 /* If the MD attach succeeds, there _will_ be a reply_in callback */
446 request->rq_receiving_reply = 1;
447 /* Clear any flags that may be present from previous sends. */
448 request->rq_replied = 0;
450 request->rq_timedout = 0;
451 request->rq_net_err = 0;
452 request->rq_resend = 0;
453 request->rq_restart = 0;
454 request->rq_ptlrpcs_restart = 0;
455 request->rq_ptlrpcs_err = 0;
456 spin_unlock_irqrestore (&request->rq_lock, flags);
458 reply_md.start = request->rq_repbuf;
459 reply_md.length = request->rq_repbuf_len;
460 reply_md.threshold = 1;
461 reply_md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT;
462 reply_md.user_ptr = &request->rq_reply_cbid;
463 reply_md.eq_handle = connection->c_peer.peer_ni->pni_eq_h;
465 rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK,
466 &request->rq_reply_md_h);
468 CERROR("PtlMDAttach failed: %d\n", rc);
469 LASSERT (rc == PTL_NO_SPACE);
470 GOTO(cleanup_me, rc -ENOMEM);
473 CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
474 ", portal %u on %s\n",
475 request->rq_replen, request->rq_xid,
476 request->rq_reply_portal,
477 connection->c_peer.peer_ni->pni_name);
479 ptlrpc_request_addref(request); /* +1 ref for the SENT callback */
481 request->rq_sent = LTIME_S(CURRENT_TIME);
482 rc = ptl_send_buf(&request->rq_req_md_h,
483 request->rq_reqbuf, request->rq_reqdata_len,
484 PTL_NOACK_REQ, &request->rq_req_cbid,
486 request->rq_request_portal,
489 ptlrpc_lprocfs_rpc_sent(request);
493 ptlrpc_req_finished (request); /* drop callback ref */
496 /* MEUnlink is safe; the PUT didn't even get off the ground, and
497 * nobody apart from the PUT's target has the right nid+XID to
498 * access the reply buffer. */
499 rc2 = PtlMEUnlink(reply_me_h);
500 LASSERT (rc2 == PTL_OK);
501 /* UNLINKED callback called synchronously */
502 LASSERT (!request->rq_receiving_reply);
505 ptlrpcs_cli_free_repbuf(request);
508 if (request->rq_bulk != NULL)
509 ptlrpc_unregister_bulk(request);
514 int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
516 struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
517 struct ptlrpc_service *service = srv_ni->sni_service;
518 static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
521 ptl_handle_me_t me_h;
523 CDEBUG(D_NET, "PtlMEAttach: portal %d on %s\n",
524 service->srv_req_portal, srv_ni->sni_ni->pni_name);
526 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_RQBD))
529 rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
530 match_id, 0, ~0, PTL_UNLINK, PTL_INS_AFTER, &me_h);
532 CERROR("PtlMEAttach failed: %d\n", rc);
536 LASSERT(rqbd->rqbd_refcount == 0);
537 rqbd->rqbd_refcount = 1;
539 md.start = rqbd->rqbd_buffer;
540 md.length = service->srv_buf_size;
541 md.max_size = service->srv_max_req_size;
542 md.threshold = PTL_MD_THRESH_INF;
543 md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT | PTL_MD_MAX_SIZE;
544 md.user_ptr = &rqbd->rqbd_cbid;
545 md.eq_handle = srv_ni->sni_ni->pni_eq_h;
547 rc = PtlMDAttach(me_h, md, PTL_UNLINK, &rqbd->rqbd_md_h);
551 CERROR("PtlMDAttach failed: %d; \n", rc);
552 LASSERT (rc == PTL_NO_SPACE);
553 rc = PtlMEUnlink (me_h);
554 LASSERT (rc == PTL_OK);
555 rqbd->rqbd_refcount = 0;
561 /********************************************
562 * rawrpc stuff, currently only used by gss *
563 ********************************************/
565 static int rawrpc_timedout(void *data)
567 struct ptlrpc_request *req = (struct ptlrpc_request *) data;
570 spin_lock_irqsave(&req->rq_lock, flags);
571 if (!req->rq_replied)
572 req->rq_timedout = 1;
573 spin_unlock_irqrestore(&req->rq_lock, flags);
578 static int rawrpc_timedout_wait(void *data)
583 /* to make things as simple as possible */
584 static int rawrpc_check_reply(struct ptlrpc_request *req)
589 spin_lock_irqsave (&req->rq_lock, flags);
590 rc = req->rq_replied || req->rq_net_err || req->rq_err ||
591 req->rq_resend || req->rq_restart;
592 spin_unlock_irqrestore(&req->rq_lock, flags);
596 void rawrpc_req_finished(struct ptlrpc_request *req)
598 struct obd_import *imp;
599 unsigned long irq_flags;
604 if (!atomic_dec_and_test(&req->rq_refcount))
607 LASSERT(req->rq_import);
608 imp = req->rq_import;
610 spin_lock_irqsave(&imp->imp_lock, irq_flags);
611 list_del_init(&req->rq_list);
612 spin_unlock_irqrestore(&imp->imp_lock, irq_flags);
614 class_import_put(imp);
616 if (req->rq_reqbuf) {
617 LASSERT(req->rq_reqbuf_len);
618 OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
620 if (req->rq_repbuf) {
621 LASSERT(req->rq_repbuf_len);
622 OBD_FREE(req->rq_repbuf, req->rq_repbuf_len);
624 OBD_FREE(req, sizeof(*req));
628 * if returned non-NULL, reqbuf and repbuf will be take over by the
629 * request: the caller can't release them directly, instead should
630 * call rawrpc_req_finished().
632 struct ptlrpc_request *
633 ptl_do_rawrpc(struct obd_import *imp,
634 char *reqbuf, int reqbuf_len, int reqlen,
635 char *repbuf, int repbuf_len, int *replenp,
636 int timeout, int *res)
638 struct ptlrpc_connection *conn;
639 struct ptlrpc_request *request;
640 ptl_handle_me_t reply_me_h;
641 ptl_md_t reply_md, req_md;
642 struct l_wait_info lwi;
643 unsigned long irq_flags;
648 LASSERT(reqbuf && reqbuf_len);
649 LASSERT(repbuf && repbuf_len);
650 LASSERT(reqlen && reqlen <= reqbuf_len);
652 OBD_ALLOC(request, sizeof(*request));
658 imp = request->rq_import = class_import_get(imp);
659 conn = imp->imp_connection;
661 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
662 CDEBUG(D_SEC, "raw rpc on closed imp(=>%s)\n",
663 imp->imp_target_uuid.uuid);
666 /* initialize request */
667 request->rq_req_cbid.cbid_fn = rawrpc_request_out_callback;
668 request->rq_req_cbid.cbid_arg = request;
669 request->rq_reply_cbid.cbid_fn = reply_in_callback;
670 request->rq_reply_cbid.cbid_arg = request;
671 request->rq_reqbuf = reqbuf;
672 request->rq_reqbuf_len = reqbuf_len;
673 request->rq_repbuf = repbuf;
674 request->rq_repbuf_len = repbuf_len;
675 request->rq_set = NULL;
676 spin_lock_init(&request->rq_lock);
677 init_waitqueue_head(&request->rq_reply_waitq);
678 atomic_set(&request->rq_refcount, 1);
679 request->rq_xid = ptlrpc_next_xid();
681 /* add into sending list */
682 spin_lock_irqsave(&imp->imp_lock, irq_flags);
683 list_add_tail(&request->rq_list, &imp->imp_rawrpc_list);
684 spin_unlock_irqrestore(&imp->imp_lock, irq_flags);
686 /* prepare reply buffer */
687 rc = PtlMEAttach(conn->c_peer.peer_ni->pni_ni_h,
688 imp->imp_client->cli_reply_portal,
689 conn->c_peer.peer_id, request->rq_xid, 0, PTL_UNLINK,
690 PTL_INS_AFTER, &reply_me_h);
692 CERROR("PtlMEAttach failed: %d\n", rc);
693 LASSERT (rc == PTL_NO_SPACE);
694 GOTO(out, rc = -ENOMEM);
697 spin_lock_irqsave(&request->rq_lock, irq_flags);
698 request->rq_receiving_reply = 1;
699 spin_unlock_irqrestore(&request->rq_lock, irq_flags);
701 reply_md.start = repbuf;
702 reply_md.length = repbuf_len;
703 reply_md.threshold = 1;
704 reply_md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT;
705 reply_md.user_ptr = &request->rq_reply_cbid;
706 reply_md.eq_handle = conn->c_peer.peer_ni->pni_eq_h;
708 rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK,
709 &request->rq_reply_md_h);
711 CERROR("PtlMDAttach failed: %d\n", rc);
712 LASSERT (rc == PTL_NO_SPACE);
713 GOTO(cleanup_me, rc = -ENOMEM);
717 * the extra 2 refcount will be balanced by out_callback
719 atomic_set(&request->rq_refcount, 3);
721 /* prepare request buffer */
722 req_md.start = reqbuf;
723 req_md.length = reqlen;
724 req_md.threshold = 1;
725 req_md.options = PTLRPC_MD_OPTIONS;
726 req_md.user_ptr = &request->rq_req_cbid;
727 req_md.eq_handle = conn->c_peer.peer_ni->pni_eq_h;
729 rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h,
730 req_md, PTL_UNLINK, &request->rq_req_md_h);
732 CERROR("PtlMDBind failed %d\n", rc);
733 LASSERT (rc == PTL_NO_SPACE);
734 atomic_set(&request->rq_refcount, 1);
735 GOTO(cleanup_me, rc = -ENOMEM);
738 rc = PtlPut(request->rq_req_md_h, PTL_NOACK_REQ, conn->c_peer.peer_id,
739 imp->imp_client->cli_request_portal, 0,
740 request->rq_xid, 0, 0);
742 CERROR("PtlPut failed %d\n", rc);
743 GOTO(cleanup_md, rc);
747 lwi = LWI_TIMEOUT(timeout * HZ, rawrpc_timedout, request);
749 lwi = LWI_TIMEOUT(100 * HZ, rawrpc_timedout_wait, request);
751 l_wait_event(request->rq_reply_waitq,
752 rawrpc_check_reply(request), &lwi);
754 ptlrpc_unregister_reply(request);
756 if (request->rq_replied) {
757 *replenp = request->rq_nob_received;
760 CERROR("rawrpc error: err %d, neterr %d, int %d, timedout %d\n",
761 request->rq_err, request->rq_net_err,
762 request->rq_intr, request->rq_timedout);
764 /* give timedout higher priority */
765 rc = request->rq_timedout ? -ETIMEDOUT : -EIO;
773 PtlMDUnlink(request->rq_req_md_h);
775 PtlMEUnlink(reply_me_h);
780 * caller will take care the reqbuf & repbuf, and only return
781 * when the rpc really finished on wire.
783 int ptl_do_rawrpc_simple(struct obd_import *imp,
784 char *reqbuf, int reqlen,
785 char *repbuf, int *replenp)
787 struct ptlrpc_request *req;
790 req = ptl_do_rawrpc(imp, reqbuf, reqlen, reqlen,
791 repbuf, *replenp, replenp, 0, &res);
796 req->rq_reqbuf = req->rq_repbuf = NULL;
797 req->rq_reqbuf_len = req->rq_repbuf_len = 0;
799 rawrpc_req_finished(req);