if (number >= SPTLRPC_POLICY_MAX)
return NULL;
-again:
- read_lock(&policy_lock);
- policy = policies[number];
- if (policy && !try_module_get(policy->sp_owner))
- policy = NULL;
- if (policy == NULL)
- flag = atomic_read(&loaded);
- read_unlock(&policy_lock);
-
- /* if failure, try to load gss module, once */
- if (unlikely(policy == NULL) && flag == 0 &&
- number == SPTLRPC_POLICY_GSS) {
+ while (1) {
+ read_lock(&policy_lock);
+ policy = policies[number];
+ if (policy && !try_module_get(policy->sp_owner))
+ policy = NULL;
+ if (policy == NULL)
+ flag = atomic_read(&loaded);
+ read_unlock(&policy_lock);
+
+ if (policy != NULL || flag != 0 ||
+ number != SPTLRPC_POLICY_GSS)
+ break;
+
+ /* try to load gss module, once */
mutex_down(&load_mutex);
if (atomic_read(&loaded) == 0) {
- if (request_module("ptlrpc_gss") != 0)
- CERROR("Unable to load module ptlrpc_gss\n");
- else
+ if (request_module("ptlrpc_gss") == 0)
CWARN("module ptlrpc_gss loaded on demand\n");
+ else
+ CERROR("Unable to load module ptlrpc_gss\n");
atomic_set(&loaded, 1);
}
mutex_up(&load_mutex);
-
- goto again;
}
return policy;
return SPTLRPC_FLVR_PLAIN;
if (!strcmp(name, "krb5n"))
return SPTLRPC_FLVR_KRB5N;
+ if (!strcmp(name, "krb5a"))
+ return SPTLRPC_FLVR_KRB5A;
if (!strcmp(name, "krb5i"))
return SPTLRPC_FLVR_KRB5I;
if (!strcmp(name, "krb5p"))
RETURN(rc);
}
-/*
- * rq_nob_received is the actual received data length
- */
-int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
+static int do_cli_unwrap_reply(struct ptlrpc_request *req)
{
struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
int rc;
LASSERT(ctx);
LASSERT(ctx->cc_sec);
- LASSERT(ctx->cc_ops);
LASSERT(req->rq_repbuf);
+ LASSERT(req->rq_repdata);
+ LASSERT(req->rq_repmsg == NULL);
- req->rq_repdata_len = req->rq_nob_received;
-
- if (req->rq_nob_received < sizeof(struct lustre_msg)) {
+ if (req->rq_repdata_len < sizeof(struct lustre_msg)) {
CERROR("replied data length %d too small\n",
- req->rq_nob_received);
+ req->rq_repdata_len);
RETURN(-EPROTO);
}
+ /* v2 message, check request/reply policy match */
+ rpc_flvr = WIRE_FLVR_RPC(req->rq_repdata->lm_secflvr);
- /*
- * v2 message, check request/reply policy match
- */
- rpc_flvr = WIRE_FLVR_RPC(req->rq_repbuf->lm_secflvr);
-
- if (req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
+ if (req->rq_repdata->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
__swab16s(&rpc_flvr);
if (RPC_FLVR_POLICY(rpc_flvr) !=
- RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
+ RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
CERROR("request policy was %u while reply with %u\n",
- RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
- RPC_FLVR_POLICY(rpc_flvr));
+ RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
+ RPC_FLVR_POLICY(rpc_flvr));
RETURN(-EPROTO);
}
/* do nothing if it's null policy; otherwise unpack the
- * wrapper message
- */
+ * wrapper message */
if (RPC_FLVR_POLICY(rpc_flvr) != SPTLRPC_POLICY_NULL &&
- lustre_unpack_msg(req->rq_repbuf, req->rq_nob_received))
+ lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len))
RETURN(-EPROTO);
switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
RETURN(rc);
}
+/*
+ * upon this be called, the reply buffer should have been un-posted,
+ * so nothing is going to change.
+ */
+int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
+{
+ LASSERT(req->rq_repbuf);
+ LASSERT(req->rq_repdata == NULL);
+ LASSERT(req->rq_repmsg == NULL);
+ LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
+
+ if (req->rq_reply_off == 0) {
+ CERROR("real reply with offset 0\n");
+ return -EPROTO;
+ }
+
+ if (req->rq_reply_off % 8 != 0) {
+ CERROR("reply at odd offset %u\n", req->rq_reply_off);
+ return -EPROTO;
+ }
+
+ req->rq_repdata = (struct lustre_msg *)
+ (req->rq_repbuf + req->rq_reply_off);
+ req->rq_repdata_len = req->rq_nob_received;
+
+ return do_cli_unwrap_reply(req);
+}
+
+/*
+ * Upon called, the receive buffer might be still posted, so the reply data
+ * might be changed at any time, no matter we're holding rq_lock or not. we
+ * expect the rq_reply_off be 0, rq_nob_received is the early reply size.
+ *
+ * we allocate a separate buffer to hold early reply data, pointed by
+ * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
+ * is the actual buffer size.
+ *
+ * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
+ * process another early reply or real reply, to restore ptlrpc_request
+ * to normal status.
+ */
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+{
+ struct lustre_msg *early_buf;
+ int early_bufsz, early_size;
+ int rc;
+ ENTRY;
+
+ LASSERT(req->rq_repbuf);
+ LASSERT(req->rq_repdata == NULL);
+ LASSERT(req->rq_repmsg == NULL);
+
+ early_size = req->rq_nob_received;
+ if (early_size < sizeof(struct lustre_msg)) {
+ CERROR("early reply length %d too small\n", early_size);
+ RETURN(-EPROTO);
+ }
+
+ early_bufsz = size_roundup_power2(early_size);
+ OBD_ALLOC(early_buf, early_bufsz);
+ if (early_buf == NULL)
+ RETURN(-ENOMEM);
+
+ /* copy data out, do it inside spinlock */
+ spin_lock(&req->rq_lock);
+
+ if (req->rq_replied) {
+ spin_unlock(&req->rq_lock);
+ GOTO(err_free, rc = -EALREADY);
+ }
+
+ if (req->rq_reply_off != 0) {
+ CERROR("early reply with offset %u\n", req->rq_reply_off);
+ GOTO(err_free, rc = -EPROTO);
+ }
+
+ if (req->rq_nob_received != early_size) {
+ /* even another early arrived the size should be the same */
+ CWARN("data size has changed from %u to %u\n",
+ early_size, req->rq_nob_received);
+ spin_unlock(&req->rq_lock);
+ GOTO(err_free, rc = -EINVAL);
+ }
+
+ if (req->rq_nob_received < sizeof(struct lustre_msg)) {
+ CERROR("early reply length %d too small\n",
+ req->rq_nob_received);
+ spin_unlock(&req->rq_lock);
+ GOTO(err_free, rc = -EALREADY);
+ }
+
+ memcpy(early_buf, req->rq_repbuf, early_size);
+ spin_unlock(&req->rq_lock);
+
+ req->rq_repdata = early_buf;
+ req->rq_repdata_len = early_size;
+
+ rc = do_cli_unwrap_reply(req);
+
+ /* treate resend as an error case. in fact server should never ask
+ * resend via early reply. */
+ if (req->rq_resend) {
+ req->rq_resend = 0;
+ rc = -EPROTO;
+ }
+
+ if (rc) {
+ LASSERT(req->rq_repmsg == NULL);
+ req->rq_repdata = NULL;
+ req->rq_repdata_len = 0;
+ GOTO(err_free, rc);
+ }
+
+ LASSERT(req->rq_repmsg);
+ RETURN(0);
+
+err_free:
+ OBD_FREE(early_buf, early_bufsz);
+ RETURN(rc);
+}
+
+int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req)
+{
+ int early_bufsz;
+
+ LASSERT(req->rq_repdata);
+ LASSERT(req->rq_repdata_len);
+ LASSERT(req->rq_repmsg);
+
+ early_bufsz = size_roundup_power2(req->rq_repdata_len);
+ OBD_FREE(req->rq_repdata, early_bufsz);
+
+ req->rq_repdata = NULL;
+ req->rq_repdata_len = 0;
+ req->rq_repmsg = NULL;
+ return 0;
+}
+
/**************************************************
* sec ID *
**************************************************/