#define DEBUG_SUBSYSTEM S_RPC
#ifndef __KERNEL__
#include <liblustre.h>
-#include <portals/lib-types.h>
#endif
+#include <linux/obd_class.h>
#include <linux/obd_support.h>
#include <linux/lustre_net.h>
#include <linux/lustre_lib.h>
#include <linux/obd.h>
+#include <linux/lustre_sec.h>
#include "ptlrpc_internal.h"
static int ptl_send_buf (ptl_handle_md_t *mdh, void *base, int len,
ptl_ack_req_t ack, struct ptlrpc_cb_id *cbid,
struct ptlrpc_connection *conn, int portal, __u64 xid)
{
- ptl_process_id_t remote_id;
int rc;
- int rc2;
ptl_md_t md;
char str[PTL_NALFMT_SIZE];
ENTRY;
LASSERT (portal != 0);
LASSERT (conn != NULL);
- CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" (%s) on %s\n",
+ CDEBUG (D_INFO, "conn=%p ni %s id %s on %s\n",
conn, conn->c_peer.peer_ni->pni_name,
- conn->c_peer.peer_nid,
- portals_nid2str(conn->c_peer.peer_ni->pni_number,
- conn->c_peer.peer_nid, str),
+ ptlrpc_id2str(&conn->c_peer, str),
conn->c_peer.peer_ni->pni_name);
-
- remote_id.nid = conn->c_peer.peer_nid,
- remote_id.pid = 0;
-
md.start = base;
md.length = len;
md.threshold = (ack == PTL_ACK_REQ) ? 2 : 1;
md.options = PTLRPC_MD_OPTIONS;
md.user_ptr = cbid;
- md.eventq = conn->c_peer.peer_ni->pni_eq_h;
+ md.eq_handle = conn->c_peer.peer_ni->pni_eq_h;
if (ack == PTL_ACK_REQ &&
OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
len, portal, xid);
- rc2 = PtlPut (*mdh, ack, remote_id, portal, 0, xid, 0, 0);
+ rc = PtlPut (*mdh, ack, conn->c_peer.peer_id, portal, 0, xid, 0, 0);
if (rc != PTL_OK) {
+ int rc2;
/* We're going to get an UNLINK event when I unlink below,
* which will complete just like any other failed send, so
* I fall through and return success here! */
- CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
- remote_id.nid, portal, xid, rc);
+ CERROR("PtlPut(%s, %d, "LPD64") failed: %d\n",
+ ptlrpc_id2str(&conn->c_peer, str),
+ portal, xid, rc);
rc2 = PtlMDUnlink(*mdh);
- LASSERT (rc2 == PTL_OK);
+ LASSERTF(rc2 == PTL_OK, "rc2 = %d\n", rc2);
}
RETURN (0);
}
-static void ptlrpc_fill_md(ptl_md_t *md, struct ptlrpc_bulk_desc *desc)
-{
- LASSERT(ptl_md_max_iovs() == 0 ||
- (desc->bd_iov_count <= ptl_md_max_iovs()));
-
- if (ptl_requires_iov() || desc->bd_iov_count > 0) {
- md->options |= PTLRPC_PTL_MD_IOV;
- md->start = &desc->bd_iov[0];
- md->niov = desc->bd_iov_count;
- } else {
- md->start = ptl_iov_base(&desc->bd_iov[0]);
- }
-}
-
int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
{
int rc;
int rc2;
struct ptlrpc_peer *peer;
- ptl_process_id_t remote_id;
ptl_md_t md;
__u64 xid;
+ char str[PTL_NALFMT_SIZE];
ENTRY;
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET))
desc->bd_success = 0;
peer = &desc->bd_export->exp_connection->c_peer;
- md.length = desc->bd_nob;
- md.eventq = peer->peer_ni->pni_eq_h;
+ md.user_ptr = &desc->bd_cbid;
+ md.eq_handle = peer->peer_ni->pni_eq_h;
md.threshold = 2; /* SENT and ACK/REPLY */
md.options = PTLRPC_MD_OPTIONS;
+ ptlrpc_fill_bulk_md(&md, desc);
- ptlrpc_fill_md(&md, desc);
- md.user_ptr = &desc->bd_cbid;
LASSERT (desc->bd_cbid.cbid_fn == server_bulk_callback);
LASSERT (desc->bd_cbid.cbid_arg == desc);
/* Client's bulk and reply matchbits are the same */
xid = desc->bd_req->rq_xid;
- remote_id.nid = peer->peer_nid;
- remote_id.pid = 0;
-
CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d on %s "
- "nid "LPX64" pid %d xid "LPX64"\n",
- md.niov, md.length, desc->bd_portal, peer->peer_ni->pni_name,
- remote_id.nid, remote_id.pid, xid);
+ "nid %s pid %d xid "LPX64"\n", desc->bd_iov_count,
+ desc->bd_nob, desc->bd_portal, peer->peer_ni->pni_name,
+ ptlrpc_id2str(peer, str), peer->peer_id.pid, xid);
/* Network is about to get at the memory */
desc->bd_network_rw = 1;
if (desc->bd_type == BULK_PUT_SOURCE)
- rc = PtlPut (desc->bd_md_h, PTL_ACK_REQ, remote_id,
+ rc = PtlPut (desc->bd_md_h, PTL_ACK_REQ, peer->peer_id,
desc->bd_portal, 0, xid, 0, 0);
else
- rc = PtlGet (desc->bd_md_h, remote_id,
+ rc = PtlGet (desc->bd_md_h, peer->peer_id,
desc->bd_portal, 0, xid, 0);
-
+
if (rc != PTL_OK) {
/* Can't send, so we unlink the MD bound above. The UNLINK
* event this creates will signal completion with failure,
* so we return SUCCESS here! */
- CERROR("Transfer("LPU64", %d, "LPX64") failed: %d\n",
- remote_id.nid, desc->bd_portal, xid, rc);
+ CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
+ ptlrpc_id2str(peer, str),
+ desc->bd_portal, xid, rc);
rc2 = PtlMDUnlink(desc->bd_md_h);
LASSERT (rc2 == PTL_OK);
}
return; /* never started */
/* The unlink ensures the callback happens ASAP and is the last
- * one. If it fails, it must be because completion just
- * happened. */
+ * one. If it fails, it must be because completion just happened,
+ * but we must still l_wait_event() in this case, to give liblustre
+ * a chance to run server_bulk_callback()*/
- rc = PtlMDUnlink (desc->bd_md_h);
- if (rc == PTL_MD_INVALID) {
- LASSERT(!ptlrpc_bulk_active(desc));
- return;
- }
-
- LASSERT (rc == PTL_OK);
+ PtlMDUnlink (desc->bd_md_h);
for (;;) {
/* Network access will complete in finite time but the HUGE
struct ptlrpc_peer *peer;
int rc;
int rc2;
- ptl_process_id_t source_id;
ptl_handle_me_t me_h;
ptl_md_t md;
ENTRY;
peer = &desc->bd_import->imp_connection->c_peer;
- md.length = desc->bd_nob;
- md.eventq = peer->peer_ni->pni_eq_h;
+ md.user_ptr = &desc->bd_cbid;
+ md.eq_handle = peer->peer_ni->pni_eq_h;
md.threshold = 1; /* PUT or GET */
md.options = PTLRPC_MD_OPTIONS |
((desc->bd_type == BULK_GET_SOURCE) ?
PTL_MD_OP_GET : PTL_MD_OP_PUT);
- ptlrpc_fill_md(&md, desc);
- md.user_ptr = &desc->bd_cbid;
+ ptlrpc_fill_bulk_md(&md, desc);
+
LASSERT (desc->bd_cbid.cbid_fn == client_bulk_callback);
LASSERT (desc->bd_cbid.cbid_arg == desc);
desc->bd_registered = 1;
desc->bd_last_xid = req->rq_xid;
- source_id.nid = desc->bd_import->imp_connection->c_peer.peer_nid;
- source_id.pid = PTL_PID_ANY;
-
- rc = PtlMEAttach(peer->peer_ni->pni_ni_h,
- desc->bd_portal, source_id, req->rq_xid, 0,
- PTL_UNLINK, PTL_INS_AFTER, &me_h);
+ rc = PtlMEAttach(peer->peer_ni->pni_ni_h, desc->bd_portal,
+ desc->bd_import->imp_connection->c_peer.peer_id,
+ req->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
LASSERT (rc == PTL_NO_SPACE);
CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
"portal %u on %s\n",
desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
- md.niov, md.length,
+ desc->bd_iov_count, desc->bd_nob,
req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
RETURN(0);
}
LASSERT (desc->bd_req == req); /* bd_req NULL until registered */
/* the unlink ensures the callback happens ASAP and is the last
- * one. If it fails, it must be because completion just
- * happened. */
+ * one. If it fails, it must be because completion just happened,
+ * but we must still l_wait_event() in this case to give liblustre
+ * a chance to run client_bulk_callback() */
- rc = PtlMDUnlink (desc->bd_md_h);
- if (rc == PTL_MD_INVALID) {
- LASSERT(!ptlrpc_bulk_active(desc));
- return;
- }
-
- LASSERT (rc == PTL_OK);
-
- if (desc->bd_req->rq_set != NULL)
+ PtlMDUnlink (desc->bd_md_h);
+
+ if (req->rq_set != NULL)
wq = &req->rq_set->set_waitq;
else
wq = &req->rq_reply_waitq;
int rc;
/* We must already have a reply buffer (only ptlrpc_error() may be
- * called without one). We must also have a request buffer which
+ * called without one). We usually also have a request buffer which
* is either the actual (swabbed) incoming request, or a saved copy
- * if this is a req saved in target_queue_final_reply(). */
- LASSERT (req->rq_reqmsg != NULL);
+ * if this is a req saved in target_queue_final_reply(). but this
+ * will not be true since some security handling may skip the reqmsg
+ * setting and prepare reply under normal ptlrpc layer */
LASSERT (rs != NULL);
LASSERT (req->rq_repmsg != NULL);
LASSERT (may_be_difficult || !rs->rs_difficult);
- LASSERT (req->rq_repmsg == &rs->rs_msg);
+ LASSERT (req->rq_repmsg == rs->rs_msg);
LASSERT (rs->rs_cb_id.cbid_fn == reply_out_callback);
LASSERT (rs->rs_cb_id.cbid_arg == rs);
req->rq_repmsg->type = req->rq_type;
req->rq_repmsg->status = req->rq_status;
- req->rq_repmsg->opc = req->rq_reqmsg->opc;
+ req->rq_repmsg->opc = req->rq_reqmsg ? req->rq_reqmsg->opc : 0;
if (req->rq_export == NULL)
conn = ptlrpc_get_connection(&req->rq_peer, NULL);
atomic_inc (&svc->srv_outstanding_replies);
- rc = ptl_send_buf (&rs->rs_md_h, req->rq_repmsg, req->rq_replen,
+ rc = svcsec_authorize(req);
+ if (rc) {
+ CERROR("Error wrap reply message "LPX64"\n", req->rq_xid);
+ goto out;
+ }
+
+ rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
rs->rs_difficult ? PTL_ACK_REQ : PTL_NOACK_REQ,
&rs->rs_cb_id, conn,
svc->srv_rep_portal, req->rq_xid);
+out:
if (rc != 0) {
atomic_dec (&svc->srv_outstanding_replies);
int rc2;
struct ptlrpc_connection *connection;
unsigned long flags;
- ptl_process_id_t source_id;
ptl_handle_me_t reply_me_h;
ptl_md_t reply_md;
ENTRY;
request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST;
request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
- source_id.nid = connection->c_peer.peer_nid;
- source_id.pid = PTL_PID_ANY;
+ /* wrap_request might need to refresh gss cred, if this is called
+ * in ptlrpcd then the whole daemon thread will be waiting on
+ * gss negotiate rpc. FIXME
+ */
+ rc = ptlrpcs_cli_wrap_request(request);
+ if (rc)
+ GOTO(cleanup_bulk, rc);
LASSERT (request->rq_replen != 0);
- if (request->rq_repmsg == NULL)
- OBD_ALLOC(request->rq_repmsg, request->rq_replen);
- if (request->rq_repmsg == NULL)
- GOTO(cleanup_bulk, rc = -ENOMEM);
+ if (request->rq_repbuf == NULL) {
+ rc = ptlrpcs_cli_alloc_repbuf(request, request->rq_replen);
+ if (rc)
+ GOTO(cleanup_bulk, rc);
+ }
rc = PtlMEAttach(connection->c_peer.peer_ni->pni_ni_h,
request->rq_reply_portal, /* XXX FIXME bug 249 */
- source_id, request->rq_xid, 0, PTL_UNLINK,
- PTL_INS_AFTER, &reply_me_h);
+ connection->c_peer.peer_id, request->rq_xid, 0,
+ PTL_UNLINK, PTL_INS_AFTER, &reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
LASSERT (rc == PTL_NO_SPACE);
request->rq_replied = 0;
request->rq_err = 0;
request->rq_timedout = 0;
+ request->rq_net_err = 0;
request->rq_resend = 0;
request->rq_restart = 0;
+ request->rq_ptlrpcs_restart = 0;
+ request->rq_ptlrpcs_err = 0;
spin_unlock_irqrestore (&request->rq_lock, flags);
- reply_md.start = request->rq_repmsg;
- reply_md.length = request->rq_replen;
+ reply_md.start = request->rq_repbuf;
+ reply_md.length = request->rq_repbuf_len;
reply_md.threshold = 1;
reply_md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT;
reply_md.user_ptr = &request->rq_reply_cbid;
- reply_md.eventq = connection->c_peer.peer_ni->pni_eq_h;
+ reply_md.eq_handle = connection->c_peer.peer_ni->pni_eq_h;
rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK,
&request->rq_reply_md_h);
ptlrpc_request_addref(request); /* +1 ref for the SENT callback */
request->rq_sent = LTIME_S(CURRENT_TIME);
- ptlrpc_pinger_sending_on_import(request->rq_import);
rc = ptl_send_buf(&request->rq_req_md_h,
- request->rq_reqmsg, request->rq_reqlen,
+ request->rq_reqbuf, request->rq_reqdata_len,
PTL_NOACK_REQ, &request->rq_req_cbid,
connection,
request->rq_request_portal,
LASSERT (!request->rq_receiving_reply);
cleanup_repmsg:
- OBD_FREE(request->rq_repmsg, request->rq_replen);
- request->rq_repmsg = NULL;
+ ptlrpcs_cli_free_repbuf(request);
cleanup_bulk:
if (request->rq_bulk != NULL)
md.threshold = PTL_MD_THRESH_INF;
md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT | PTL_MD_MAX_SIZE;
md.user_ptr = &rqbd->rqbd_cbid;
- md.eventq = srv_ni->sni_ni->pni_eq_h;
+ md.eq_handle = srv_ni->sni_ni->pni_eq_h;
rc = PtlMDAttach(me_h, md, PTL_UNLINK, &rqbd->rqbd_md_h);
if (rc == PTL_OK)
return (-ENOMEM);
}
+
+static int rawrpc_timedout(void *data)
+{
+ struct ptlrpc_request *req = (struct ptlrpc_request *) data;
+ unsigned long flags;
+
+ spin_lock_irqsave(&req->rq_lock, flags);
+ if (!req->rq_replied)
+ req->rq_timedout = 1;
+ spin_unlock_irqrestore(&req->rq_lock, flags);
+
+ return 1;
+}
+
+/* to make things as simple as possible */
+static int rawrpc_check_reply(struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ int rc;
+
+ spin_lock_irqsave (&req->rq_lock, flags);
+ rc = req->rq_replied || req->rq_net_err || req->rq_err ||
+ req->rq_resend || req->rq_restart;
+ spin_unlock_irqrestore(&req->rq_lock, flags);
+ return rc;
+}
+
+/*
+ * Construct a fake ptlrpc_request to do the work, in order to
+ * user the existing callback/wakeup facilities
+ */
+int ptlrpc_do_rawrpc(struct obd_import *imp,
+ char *reqbuf, int reqlen,
+ char *repbuf, int *replenp,
+ int timeout)
+{
+ struct ptlrpc_connection *conn;
+ struct ptlrpc_request request; /* just a fake one */
+ ptl_handle_me_t reply_me_h;
+ ptl_md_t reply_md, req_md;
+ struct l_wait_info lwi;
+ unsigned long irq_flags;
+ int rc;
+ ENTRY;
+
+ LASSERT(imp);
+ class_import_get(imp);
+ if (imp->imp_state == LUSTRE_IMP_CLOSED) {
+ CDEBUG(D_SEC, "raw rpc on closed imp(=>%s)? send anyway\n",
+ imp->imp_target_uuid.uuid);
+ }
+
+ conn = imp->imp_connection;
+
+ /* initialize request */
+ memset(&request, 0, sizeof(request));
+ request.rq_req_cbid.cbid_fn = request_out_callback;
+ request.rq_req_cbid.cbid_arg = &request;
+ request.rq_reply_cbid.cbid_fn = reply_in_callback;
+ request.rq_reply_cbid.cbid_arg = &request;
+ request.rq_reqbuf = reqbuf;
+ request.rq_reqbuf_len = reqlen;
+ request.rq_repbuf = repbuf;
+ request.rq_repbuf_len = *replenp;
+ request.rq_set = NULL;
+ spin_lock_init(&request.rq_lock);
+ init_waitqueue_head(&request.rq_reply_waitq);
+ atomic_set(&request.rq_refcount, 1000000); /* never be droped */
+ request.rq_xid = ptlrpc_next_xid();
+
+ /* add into sending list */
+ spin_lock_irqsave(&imp->imp_lock, irq_flags);
+ list_add_tail(&request.rq_list, &imp->imp_rawrpc_list);
+ spin_unlock_irqrestore(&imp->imp_lock, irq_flags);
+
+ /* prepare reply buffer */
+ rc = PtlMEAttach(conn->c_peer.peer_ni->pni_ni_h,
+ imp->imp_client->cli_reply_portal,
+ conn->c_peer.peer_id, request.rq_xid, 0, PTL_UNLINK,
+ PTL_INS_AFTER, &reply_me_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMEAttach failed: %d\n", rc);
+ LASSERT (rc == PTL_NO_SPACE);
+ GOTO(cleanup_imp, rc = -ENOMEM);
+ }
+
+ spin_lock_irqsave(&request.rq_lock, irq_flags);
+ request.rq_receiving_reply = 1;
+ spin_unlock_irqrestore(&request.rq_lock, irq_flags);
+
+ reply_md.start = repbuf;
+ reply_md.length = *replenp;
+ reply_md.threshold = 1;
+ reply_md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT;
+ reply_md.user_ptr = &request.rq_reply_cbid;
+ reply_md.eq_handle = conn->c_peer.peer_ni->pni_eq_h;
+
+ rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK,
+ &request.rq_reply_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDAttach failed: %d\n", rc);
+ LASSERT (rc == PTL_NO_SPACE);
+ GOTO(cleanup_me, rc = -ENOMEM);
+ }
+
+ /* prepare request buffer */
+ req_md.start = reqbuf;
+ req_md.length = reqlen;
+ req_md.threshold = 1;
+ req_md.options = PTLRPC_MD_OPTIONS;
+ req_md.user_ptr = &request.rq_req_cbid;
+ req_md.eq_handle = conn->c_peer.peer_ni->pni_eq_h;
+
+ rc = PtlMDBind(conn->c_peer.peer_ni->pni_ni_h,
+ req_md, PTL_UNLINK, &request.rq_req_md_h);
+ if (rc != PTL_OK) {
+ CERROR("PtlMDBind failed %d\n", rc);
+ LASSERT (rc == PTL_NO_SPACE);
+ GOTO(cleanup_me, rc = -ENOMEM);
+ }
+
+ rc = PtlPut(request.rq_req_md_h, PTL_NOACK_REQ, conn->c_peer.peer_id,
+ imp->imp_client->cli_request_portal,
+ 0, request.rq_xid, 0, 0);
+ if (rc != PTL_OK) {
+ CERROR("PtlPut failed %d\n", rc);
+ GOTO(cleanup_md, rc);
+ }
+
+ lwi = LWI_TIMEOUT(timeout * HZ, rawrpc_timedout, &request);
+ l_wait_event(request.rq_reply_waitq,
+ rawrpc_check_reply(&request), &lwi);
+
+ ptlrpc_unregister_reply(&request);
+
+ if (request.rq_err || request.rq_resend || request.rq_intr ||
+ request.rq_timedout || !request.rq_replied) {
+ CERROR("secinit rpc error: err %d, resend %d, "
+ "intr %d, timedout %d, replied %d\n",
+ request.rq_err, request.rq_resend, request.rq_intr,
+ request.rq_timedout, request.rq_replied);
+ if (request.rq_timedout)
+ rc = -ETIMEDOUT;
+ else
+ rc = -EINVAL;
+ } else {
+ *replenp = request.rq_nob_received;
+ rc = 0;
+ }
+ GOTO(cleanup_imp, rc);
+
+cleanup_md:
+ PtlMDUnlink(request.rq_req_md_h);
+cleanup_me:
+ PtlMEUnlink(reply_me_h);
+cleanup_imp:
+ spin_lock_irqsave(&imp->imp_lock, irq_flags);
+ list_del_init(&request.rq_list);
+ spin_unlock_irqrestore(&imp->imp_lock, irq_flags);
+
+ class_import_put(imp);
+ RETURN(rc);
+}