Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ptlrpc / sec.c
index 59fedf7..977ac89 100644 (file)
@@ -66,7 +66,7 @@
  * policy registers                            *
  ***********************************************/
 
-static rwlock_t policy_lock = RW_LOCK_UNLOCKED;
+static rwlock_t policy_lock;
 static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
         NULL,
 };
@@ -933,7 +933,8 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
         LASSERT(req->rq_repmsg == NULL);
         LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
 
-        if (req->rq_reply_off == 0) {
+        if (req->rq_reply_off == 0 &&
+            (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
                 CERROR("real reply with offset 0\n");
                 return -EPROTO;
         }
@@ -950,114 +951,108 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
         return do_cli_unwrap_reply(req);
 }
 
-/*
+/**
  * Upon called, the receive buffer might be still posted, so the reply data
  * might be changed at any time, no matter we're holding rq_lock or not. we
  * expect the rq_reply_off be 0, rq_nob_received is the early reply size.
  *
- * we allocate a separate buffer to hold early reply data, pointed by
- * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
- * is the actual buffer size.
- *
- * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
- * process another early reply or real reply, to restore ptlrpc_request
- * to normal status.
+ * we allocate separate ptlrpc_request and reply buffer for early reply
+ * processing, return 0 and @req_ret is a duplicated ptlrpc_request. caller
+ * must call sptlrpc_cli_finish_early_reply() on the returned request to
+ * release it. if anything goes wrong @req_ret will not be set.
  */
-int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
+                                   struct ptlrpc_request **req_ret)
 {
-        struct lustre_msg      *early_buf;
+        struct ptlrpc_request  *early_req;
+        char                   *early_buf;
         int                     early_bufsz, early_size;
         int                     rc;
         ENTRY;
 
-        LASSERT(req->rq_repbuf);
-        LASSERT(req->rq_repdata == NULL);
-        LASSERT(req->rq_repmsg == NULL);
+        OBD_ALLOC_PTR(early_req);
+        if (early_req == NULL)
+                RETURN(-ENOMEM);
 
         early_size = req->rq_nob_received;
-        if (early_size < sizeof(struct lustre_msg)) {
-                CERROR("early reply length %d too small\n", early_size);
-                RETURN(-EPROTO);
-        }
-
         early_bufsz = size_roundup_power2(early_size);
         OBD_ALLOC(early_buf, early_bufsz);
         if (early_buf == NULL)
-                RETURN(-ENOMEM);
+                GOTO(err_req, rc = -ENOMEM);
 
-        /* copy data out, do it inside spinlock */
+        /* sanity checkings and copy data out, do it inside spinlock */
         spin_lock(&req->rq_lock);
 
         if (req->rq_replied) {
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EALREADY);
+                GOTO(err_buf, rc = -EALREADY);
         }
 
+        LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata == NULL);
+        LASSERT(req->rq_repmsg == NULL);
+
         if (req->rq_reply_off != 0) {
                 CERROR("early reply with offset %u\n", req->rq_reply_off);
-                GOTO(err_free, rc = -EPROTO);
+                spin_unlock(&req->rq_lock);
+                GOTO(err_buf, rc = -EPROTO);
         }
 
         if (req->rq_nob_received != early_size) {
                 /* even another early arrived the size should be the same */
-                CWARN("data size has changed from %u to %u\n",
-                      early_size, req->rq_nob_received);
+                CERROR("data size has changed from %u to %u\n",
+                       early_size, req->rq_nob_received);
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EINVAL);
+                GOTO(err_buf, rc = -EINVAL);
         }
 
         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
                 CERROR("early reply length %d too small\n",
                        req->rq_nob_received);
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EALREADY);
+                GOTO(err_buf, rc = -EALREADY);
         }
 
         memcpy(early_buf, req->rq_repbuf, early_size);
         spin_unlock(&req->rq_lock);
 
-        req->rq_repdata = early_buf;
-        req->rq_repdata_len = early_size;
-
-        rc = do_cli_unwrap_reply(req);
-
-        /* treate resend as an error case. in fact server should never ask
-         * resend via early reply. */
-        if (req->rq_resend) {
-                req->rq_resend = 0;
-                rc = -EPROTO;
-        }
+        early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
+        early_req->rq_flvr = req->rq_flvr;
+        early_req->rq_repbuf = early_buf;
+        early_req->rq_repbuf_len = early_bufsz;
+        early_req->rq_repdata = (struct lustre_msg *) early_buf;
+        early_req->rq_repdata_len = early_size;
+        early_req->rq_early = 1;
 
+        rc = do_cli_unwrap_reply(early_req);
         if (rc) {
-                LASSERT(req->rq_repmsg == NULL);
-                req->rq_repdata = NULL;
-                req->rq_repdata_len = 0;
-                GOTO(err_free, rc);
+                DEBUG_REQ(D_ADAPTTO, early_req,
+                          "error %d unwrap early reply", rc);
+                GOTO(err_ctx, rc);
         }
 
-        LASSERT(req->rq_repmsg);
+        LASSERT(early_req->rq_repmsg);
+        *req_ret = early_req;
         RETURN(0);
 
-err_free:
+err_ctx:
+        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+err_buf:
         OBD_FREE(early_buf, early_bufsz);
+err_req:
+        OBD_FREE_PTR(early_req);
         RETURN(rc);
 }
 
-int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req)
+void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
 {
-        int     early_bufsz;
+        LASSERT(early_req->rq_repbuf);
+        LASSERT(early_req->rq_repdata);
+        LASSERT(early_req->rq_repmsg);
 
-        LASSERT(req->rq_repdata);
-        LASSERT(req->rq_repdata_len);
-        LASSERT(req->rq_repmsg);
-
-        early_bufsz = size_roundup_power2(req->rq_repdata_len);
-        OBD_FREE(req->rq_repdata, early_bufsz);
-
-        req->rq_repdata = NULL;
-        req->rq_repdata_len = 0;
-        req->rq_repmsg = NULL;
-        return 0;
+        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+        OBD_FREE(early_req->rq_repbuf, early_req->rq_repbuf_len);
+        OBD_FREE_PTR(early_req);
 }
 
 /**************************************************
@@ -2313,6 +2308,8 @@ int __init sptlrpc_init(void)
 {
         int rc;
 
+        rwlock_init(&policy_lock);
+
         rc = sptlrpc_gc_start_thread();
         if (rc)
                 goto out;