Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ptlrpc / sec.c
index a92a5c4..977ac89 100644 (file)
@@ -1,23 +1,41 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2004-2007 Cluster File Systems, Inc.
- *   Author: Eric Mei <ericm@clusterfs.com>
+ * GPL HEADER START
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/ptlrpc/sec.c
+ *
+ * Author: Eric Mei <ericm@clusterfs.com>
  */
 
 #ifndef EXPORT_SYMTAB
@@ -48,7 +66,7 @@
  * policy registers                            *
  ***********************************************/
 
-static rwlock_t policy_lock = RW_LOCK_UNLOCKED;
+static rwlock_t policy_lock;
 static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
         NULL,
 };
@@ -465,7 +483,7 @@ int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
                 CWARN("ctx (%p, fl %lx) doesn't switch, relax a little bit\n",
                       newctx, newctx->cc_flags);
 
-                schedule_timeout(HZ);
+                cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
         } else {
                 rc = sptlrpc_req_ctx_switch(req, oldctx, newctx);
                 if (rc) {
@@ -791,7 +809,7 @@ int sptlrpc_import_check_ctx(struct obd_import *imp)
         spin_lock_init(&req->rq_lock);
         atomic_set(&req->rq_refcount, 10000);
         CFS_INIT_LIST_HEAD(&req->rq_ctx_chain);
-        init_waitqueue_head(&req->rq_reply_waitq);
+        cfs_waitq_init(&req->rq_reply_waitq);
         req->rq_import = imp;
         req->rq_cli_ctx = ctx;
 
@@ -915,7 +933,8 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
         LASSERT(req->rq_repmsg == NULL);
         LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
 
-        if (req->rq_reply_off == 0) {
+        if (req->rq_reply_off == 0 &&
+            (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
                 CERROR("real reply with offset 0\n");
                 return -EPROTO;
         }
@@ -932,114 +951,108 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
         return do_cli_unwrap_reply(req);
 }
 
-/*
+/**
  * Upon called, the receive buffer might be still posted, so the reply data
  * might be changed at any time, no matter we're holding rq_lock or not. we
  * expect the rq_reply_off be 0, rq_nob_received is the early reply size.
  *
- * we allocate a separate buffer to hold early reply data, pointed by
- * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
- * is the actual buffer size.
- *
- * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
- * process another early reply or real reply, to restore ptlrpc_request
- * to normal status.
+ * we allocate separate ptlrpc_request and reply buffer for early reply
+ * processing, return 0 and @req_ret is a duplicated ptlrpc_request. caller
+ * must call sptlrpc_cli_finish_early_reply() on the returned request to
+ * release it. if anything goes wrong @req_ret will not be set.
  */
-int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
+                                   struct ptlrpc_request **req_ret)
 {
-        struct lustre_msg      *early_buf;
+        struct ptlrpc_request  *early_req;
+        char                   *early_buf;
         int                     early_bufsz, early_size;
         int                     rc;
         ENTRY;
 
-        LASSERT(req->rq_repbuf);
-        LASSERT(req->rq_repdata == NULL);
-        LASSERT(req->rq_repmsg == NULL);
+        OBD_ALLOC_PTR(early_req);
+        if (early_req == NULL)
+                RETURN(-ENOMEM);
 
         early_size = req->rq_nob_received;
-        if (early_size < sizeof(struct lustre_msg)) {
-                CERROR("early reply length %d too small\n", early_size);
-                RETURN(-EPROTO);
-        }
-
         early_bufsz = size_roundup_power2(early_size);
         OBD_ALLOC(early_buf, early_bufsz);
         if (early_buf == NULL)
-                RETURN(-ENOMEM);
+                GOTO(err_req, rc = -ENOMEM);
 
-        /* copy data out, do it inside spinlock */
+        /* sanity checkings and copy data out, do it inside spinlock */
         spin_lock(&req->rq_lock);
 
         if (req->rq_replied) {
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EALREADY);
+                GOTO(err_buf, rc = -EALREADY);
         }
 
+        LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata == NULL);
+        LASSERT(req->rq_repmsg == NULL);
+
         if (req->rq_reply_off != 0) {
                 CERROR("early reply with offset %u\n", req->rq_reply_off);
-                GOTO(err_free, rc = -EPROTO);
+                spin_unlock(&req->rq_lock);
+                GOTO(err_buf, rc = -EPROTO);
         }
 
         if (req->rq_nob_received != early_size) {
                 /* even another early arrived the size should be the same */
-                CWARN("data size has changed from %u to %u\n",
-                      early_size, req->rq_nob_received);
+                CERROR("data size has changed from %u to %u\n",
+                       early_size, req->rq_nob_received);
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EINVAL);
+                GOTO(err_buf, rc = -EINVAL);
         }
 
         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
                 CERROR("early reply length %d too small\n",
                        req->rq_nob_received);
                 spin_unlock(&req->rq_lock);
-                GOTO(err_free, rc = -EALREADY);
+                GOTO(err_buf, rc = -EALREADY);
         }
 
         memcpy(early_buf, req->rq_repbuf, early_size);
         spin_unlock(&req->rq_lock);
 
-        req->rq_repdata = early_buf;
-        req->rq_repdata_len = early_size;
-
-        rc = do_cli_unwrap_reply(req);
-
-        /* treate resend as an error case. in fact server should never ask
-         * resend via early reply. */
-        if (req->rq_resend) {
-                req->rq_resend = 0;
-                rc = -EPROTO;
-        }
+        early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
+        early_req->rq_flvr = req->rq_flvr;
+        early_req->rq_repbuf = early_buf;
+        early_req->rq_repbuf_len = early_bufsz;
+        early_req->rq_repdata = (struct lustre_msg *) early_buf;
+        early_req->rq_repdata_len = early_size;
+        early_req->rq_early = 1;
 
+        rc = do_cli_unwrap_reply(early_req);
         if (rc) {
-                LASSERT(req->rq_repmsg == NULL);
-                req->rq_repdata = NULL;
-                req->rq_repdata_len = 0;
-                GOTO(err_free, rc);
+                DEBUG_REQ(D_ADAPTTO, early_req,
+                          "error %d unwrap early reply", rc);
+                GOTO(err_ctx, rc);
         }
 
-        LASSERT(req->rq_repmsg);
+        LASSERT(early_req->rq_repmsg);
+        *req_ret = early_req;
         RETURN(0);
 
-err_free:
+err_ctx:
+        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+err_buf:
         OBD_FREE(early_buf, early_bufsz);
+err_req:
+        OBD_FREE_PTR(early_req);
         RETURN(rc);
 }
 
-int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req)
+void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
 {
-        int     early_bufsz;
-
-        LASSERT(req->rq_repdata);
-        LASSERT(req->rq_repdata_len);
-        LASSERT(req->rq_repmsg);
+        LASSERT(early_req->rq_repbuf);
+        LASSERT(early_req->rq_repdata);
+        LASSERT(early_req->rq_repmsg);
 
-        early_bufsz = size_roundup_power2(req->rq_repdata_len);
-        OBD_FREE(req->rq_repdata, early_bufsz);
-
-        req->rq_repdata = NULL;
-        req->rq_repdata_len = 0;
-        req->rq_repmsg = NULL;
-        return 0;
+        sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
+        OBD_FREE(early_req->rq_repbuf, early_req->rq_repbuf_len);
+        OBD_FREE_PTR(early_req);
 }
 
 /**************************************************
@@ -2056,7 +2069,9 @@ void pga_to_bulk_desc(int nob, obd_count pg_count, struct brw_page **pga,
                                            nob : pga[i]->count;
                 desc->bd_iov[i].kiov_offset = pga[i]->off & ~CFS_PAGE_MASK;
 #else
-#warning FIXME for liblustre!
+                /* FIXME currently liblustre doesn't support bulk encryption.
+                 * if we do, check again following may not be right. */
+                LASSERTF(0, "Bulk encryption not implemented for liblustre\n");
                 desc->bd_iov[i].iov_base = pga[i]->pg->addr;
                 desc->bd_iov[i].iov_len = pga[i]->count > nob ?
                                            nob : pga[i]->count;
@@ -2183,7 +2198,7 @@ int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset)
         pud->pud_gid = cfs_current()->gid;
         pud->pud_fsuid = cfs_current()->fsuid;
         pud->pud_fsgid = cfs_current()->fsgid;
-        pud->pud_cap = cfs_current()->cap_effective;
+        pud->pud_cap = cfs_curproc_cap_pack();
         pud->pud_ngroups = (msg->lm_buflens[offset] - sizeof(*pud)) / 4;
 
 #ifdef __KERNEL__
@@ -2293,6 +2308,8 @@ int __init sptlrpc_init(void)
 {
         int rc;
 
+        rwlock_init(&policy_lock);
+
         rc = sptlrpc_gc_start_thread();
         if (rc)
                 goto out;