Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / sec.c
index 315d439..a92a5c4 100644 (file)
@@ -110,30 +110,30 @@ struct ptlrpc_sec_policy * sptlrpc_rpcflavor2policy(__u16 flavor)
         if (number >= SPTLRPC_POLICY_MAX)
                 return NULL;
 
-again:
-        read_lock(&policy_lock);
-        policy = policies[number];
-        if (policy && !try_module_get(policy->sp_owner))
-                policy = NULL;
-        if (policy == NULL)
-                flag = atomic_read(&loaded);
-        read_unlock(&policy_lock);
-
-        /* if failure, try to load gss module, once */
-        if (unlikely(policy == NULL) && flag == 0 &&
-            number == SPTLRPC_POLICY_GSS) {
+        while (1) {
+                read_lock(&policy_lock);
+                policy = policies[number];
+                if (policy && !try_module_get(policy->sp_owner))
+                        policy = NULL;
+                if (policy == NULL)
+                        flag = atomic_read(&loaded);
+                read_unlock(&policy_lock);
+
+                if (policy != NULL || flag != 0 ||
+                    number != SPTLRPC_POLICY_GSS)
+                        break;
+
+                /* try to load gss module, once */
                 mutex_down(&load_mutex);
                 if (atomic_read(&loaded) == 0) {
-                        if (request_module("ptlrpc_gss") != 0)
-                                CERROR("Unable to load module ptlrpc_gss\n");
-                        else
+                        if (request_module("ptlrpc_gss") == 0)
                                 CWARN("module ptlrpc_gss loaded on demand\n");
+                        else
+                                CERROR("Unable to load module ptlrpc_gss\n");
 
                         atomic_set(&loaded, 1);
                 }
                 mutex_up(&load_mutex);
-
-                goto again;
         }
 
         return policy;
@@ -147,6 +147,8 @@ __u16 sptlrpc_name2rpcflavor(const char *name)
                 return SPTLRPC_FLVR_PLAIN;
         if (!strcmp(name, "krb5n"))
                 return SPTLRPC_FLVR_KRB5N;
+        if (!strcmp(name, "krb5a"))
+                return SPTLRPC_FLVR_KRB5A;
         if (!strcmp(name, "krb5i"))
                 return SPTLRPC_FLVR_KRB5I;
         if (!strcmp(name, "krb5p"))
@@ -844,10 +846,7 @@ int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
-/*
- * rq_nob_received is the actual received data length
- */
-int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
+static int do_cli_unwrap_reply(struct ptlrpc_request *req)
 {
         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
         int                    rc;
@@ -856,39 +855,34 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
 
         LASSERT(ctx);
         LASSERT(ctx->cc_sec);
-        LASSERT(ctx->cc_ops);
         LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata);
+        LASSERT(req->rq_repmsg == NULL);
 
-        req->rq_repdata_len = req->rq_nob_received;
-
-        if (req->rq_nob_received < sizeof(struct lustre_msg)) {
+        if (req->rq_repdata_len < sizeof(struct lustre_msg)) {
                 CERROR("replied data length %d too small\n",
-                       req->rq_nob_received);
+                       req->rq_repdata_len);
                 RETURN(-EPROTO);
         }
 
+        /* v2 message, check request/reply policy match */
+        rpc_flvr = WIRE_FLVR_RPC(req->rq_repdata->lm_secflvr);
 
-        /*
-         * v2 message, check request/reply policy match
-         */
-        rpc_flvr = WIRE_FLVR_RPC(req->rq_repbuf->lm_secflvr);
-
-        if (req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
+        if (req->rq_repdata->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
                 __swab16s(&rpc_flvr);
 
         if (RPC_FLVR_POLICY(rpc_flvr) !=
-                RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
+            RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
                 CERROR("request policy was %u while reply with %u\n",
-                        RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
-                        RPC_FLVR_POLICY(rpc_flvr));
+                       RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
+                       RPC_FLVR_POLICY(rpc_flvr));
                 RETURN(-EPROTO);
         }
 
         /* do nothing if it's null policy; otherwise unpack the
-         * wrapper message
-         */
+         * wrapper message */
         if (RPC_FLVR_POLICY(rpc_flvr) != SPTLRPC_POLICY_NULL &&
-            lustre_unpack_msg(req->rq_repbuf, req->rq_nob_received))
+            lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len))
                 RETURN(-EPROTO);
 
         switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
@@ -910,6 +904,144 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
+/*
+ * upon this be called, the reply buffer should have been un-posted,
+ * so nothing is going to change.
+ */
+int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
+{
+        LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata == NULL);
+        LASSERT(req->rq_repmsg == NULL);
+        LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
+
+        if (req->rq_reply_off == 0) {
+                CERROR("real reply with offset 0\n");
+                return -EPROTO;
+        }
+
+        if (req->rq_reply_off % 8 != 0) {
+                CERROR("reply at odd offset %u\n", req->rq_reply_off);
+                return -EPROTO;
+        }
+
+        req->rq_repdata = (struct lustre_msg *)
+                                (req->rq_repbuf + req->rq_reply_off);
+        req->rq_repdata_len = req->rq_nob_received;
+
+        return do_cli_unwrap_reply(req);
+}
+
+/*
+ * Upon called, the receive buffer might be still posted, so the reply data
+ * might be changed at any time, no matter we're holding rq_lock or not. we
+ * expect the rq_reply_off be 0, rq_nob_received is the early reply size.
+ *
+ * we allocate a separate buffer to hold early reply data, pointed by
+ * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
+ * is the actual buffer size.
+ *
+ * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
+ * process another early reply or real reply, to restore ptlrpc_request
+ * to normal status.
+ */
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+{
+        struct lustre_msg      *early_buf;
+        int                     early_bufsz, early_size;
+        int                     rc;
+        ENTRY;
+
+        LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata == NULL);
+        LASSERT(req->rq_repmsg == NULL);
+
+        early_size = req->rq_nob_received;
+        if (early_size < sizeof(struct lustre_msg)) {
+                CERROR("early reply length %d too small\n", early_size);
+                RETURN(-EPROTO);
+        }
+
+        early_bufsz = size_roundup_power2(early_size);
+        OBD_ALLOC(early_buf, early_bufsz);
+        if (early_buf == NULL)
+                RETURN(-ENOMEM);
+
+        /* copy data out, do it inside spinlock */
+        spin_lock(&req->rq_lock);
+
+        if (req->rq_replied) {
+                spin_unlock(&req->rq_lock);
+                GOTO(err_free, rc = -EALREADY);
+        }
+
+        if (req->rq_reply_off != 0) {
+                CERROR("early reply with offset %u\n", req->rq_reply_off);
+                GOTO(err_free, rc = -EPROTO);
+        }
+
+        if (req->rq_nob_received != early_size) {
+                /* even another early arrived the size should be the same */
+                CWARN("data size has changed from %u to %u\n",
+                      early_size, req->rq_nob_received);
+                spin_unlock(&req->rq_lock);
+                GOTO(err_free, rc = -EINVAL);
+        }
+
+        if (req->rq_nob_received < sizeof(struct lustre_msg)) {
+                CERROR("early reply length %d too small\n",
+                       req->rq_nob_received);
+                spin_unlock(&req->rq_lock);
+                GOTO(err_free, rc = -EALREADY);
+        }
+
+        memcpy(early_buf, req->rq_repbuf, early_size);
+        spin_unlock(&req->rq_lock);
+
+        req->rq_repdata = early_buf;
+        req->rq_repdata_len = early_size;
+
+        rc = do_cli_unwrap_reply(req);
+
+        /* treate resend as an error case. in fact server should never ask
+         * resend via early reply. */
+        if (req->rq_resend) {
+                req->rq_resend = 0;
+                rc = -EPROTO;
+        }
+
+        if (rc) {
+                LASSERT(req->rq_repmsg == NULL);
+                req->rq_repdata = NULL;
+                req->rq_repdata_len = 0;
+                GOTO(err_free, rc);
+        }
+
+        LASSERT(req->rq_repmsg);
+        RETURN(0);
+
+err_free:
+        OBD_FREE(early_buf, early_bufsz);
+        RETURN(rc);
+}
+
+int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req)
+{
+        int     early_bufsz;
+
+        LASSERT(req->rq_repdata);
+        LASSERT(req->rq_repdata_len);
+        LASSERT(req->rq_repmsg);
+
+        early_bufsz = size_roundup_power2(req->rq_repdata_len);
+        OBD_FREE(req->rq_repdata, early_bufsz);
+
+        req->rq_repdata = NULL;
+        req->rq_repdata_len = 0;
+        req->rq_repmsg = NULL;
+        return 0;
+}
+
 /**************************************************
  * sec ID                                         *
  **************************************************/