Whamcloud - gitweb
Correctly set lsm size for open replay.
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
index de5f409..1da0c78 100644 (file)
@@ -1,25 +1,37 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ * GPL HEADER START
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #ifndef EXPORT_SYMTAB
@@ -36,7 +48,7 @@
 # include <liblustre.h>
 #endif
 
-#include <linux/lustre_acl.h>
+#include <lustre_acl.h>
 #include <obd_class.h>
 #include <lustre_dlm.h>
 /* fid_res_name_eq() */
@@ -99,11 +111,15 @@ int it_open_error(int phase, struct lookup_intent *it)
 EXPORT_SYMBOL(it_open_error);
 
 /* this must be called on a lockh that is known to have a referenced lock */
-int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
+int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
+                      __u32 *bits)
 {
         struct ldlm_lock *lock;
         ENTRY;
 
+        if(bits)
+                *bits = 0;
+
         if (!*lockh) {
                 EXIT;
                 RETURN(0);
@@ -126,6 +142,9 @@ int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
         }
 #endif
         lock->l_ast_data = data;
+        if (bits)
+                *bits = lock->l_policy_data.l_inodebits.bits;
+
         unlock_res_and_lock(lock);
         LDLM_LOCK_PUT(lock);
 
@@ -143,7 +162,7 @@ ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
 
         fid_build_reg_res_name(fid, &res_id);
         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
-                             &res_id, type, policy, mode, lockh);
+                             &res_id, type, policy, mode, lockh, 0);
         RETURN(rc);
 }
 
@@ -229,8 +248,8 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
         struct ptlrpc_request *req;
         struct obd_device     *obddev = class_exp2obd(exp);
         struct ldlm_intent    *lit;
-        int                    joinfile = !!((it->it_flags & O_JOIN_FILE) && 
-                                              op_data->op_data);
+        int           joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
+                                    op_data->op_data);
         CFS_LIST_HEAD(cancels);
         int                    count = 0;
         int                    mode;
@@ -298,7 +317,7 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
         }
 
         spin_lock(&req->rq_lock);
-        req->rq_replay = 1;
+        req->rq_replay = req->rq_import->imp_replayable;
         spin_unlock(&req->rq_lock);
 
         /* pack the intent */
@@ -467,6 +486,7 @@ static int mdc_finish_enqueue(struct obd_export *exp,
         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
         it->d.lustre.it_lock_mode = einfo->ei_mode;
+        it->d.lustre.it_lock_handle = lockh->cookie;
         it->d.lustre.it_data = req;
 
         if (it->d.lustre.it_status < 0 && req->rq_replay)
@@ -519,25 +539,6 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                         if (eadata == NULL)
                                 RETURN(-EPROTO);
 
-                        if (body->valid & OBD_MD_FLMODEASIZE) {
-                                struct obd_device *obddev = class_exp2obd(exp);
-
-                                if (obddev->u.cli.cl_max_mds_easize <
-                                    body->max_mdsize) {
-                                        obddev->u.cli.cl_max_mds_easize =
-                                                body->max_mdsize;
-                                        CDEBUG(D_INFO, "maxeasize become %d\n",
-                                               body->max_mdsize);
-                                }
-                                if (obddev->u.cli.cl_max_mds_cookiesize <
-                                    body->max_cookiesize) {
-                                        obddev->u.cli.cl_max_mds_cookiesize =
-                                                body->max_cookiesize;
-                                        CDEBUG(D_INFO, "cookiesize become %d\n",
-                                               body->max_cookiesize);
-                                }
-                        }
-
                         /*
                          * We save the reply LOV EA in case we have to replay a
                          * create for recovery.  If we didn't allocate a large
@@ -551,12 +552,17 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                                 void *lmm;
                                 if (req_capsule_get_size(pill, &RMF_EADATA,
                                                          RCL_CLIENT) <
-                                    body->eadatasize) {
+                                    body->eadatasize)
                                         mdc_realloc_openmsg(req, body);
-                                        req_capsule_set_size(pill, &RMF_EADATA,
-                                                             RCL_CLIENT,
-                                                             body->eadatasize);
-                                }
+                                else
+                                        req_capsule_shrink(pill, &RMF_EADATA,
+                                                           body->eadatasize,
+                                                           RCL_CLIENT);
+
+                                req_capsule_set_size(pill, &RMF_EADATA,
+                                                     RCL_CLIENT,
+                                                     body->eadatasize);
+
                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
                                 if (lmm)
                                         memcpy(lmm, eadata, body->eadatasize);
@@ -603,26 +609,40 @@ static int mdc_finish_enqueue(struct obd_export *exp,
 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                 struct lookup_intent *it, struct md_op_data *op_data,
                 struct lustre_handle *lockh, void *lmm, int lmmsize,
-                int extra_lock_flags)
+                struct ptlrpc_request **reqp, int extra_lock_flags)
 {
         struct obd_device     *obddev = class_exp2obd(exp);
-        struct ptlrpc_request *req;
+        struct ptlrpc_request *req = NULL;
         struct req_capsule    *pill;
-        int                    flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
+        int                    flags = extra_lock_flags;
         int                    rc;
         struct ldlm_res_id res_id;
         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
         ENTRY;
 
-        LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type);
+        LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
+                 einfo->ei_type);
 
         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
 
-        if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
+        if (it)
+                flags |= LDLM_FL_HAS_INTENT;
+        if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
-        if (it->it_op & IT_OPEN) {
-                int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
+        if (reqp)
+                req = *reqp;
+
+        if (!it) {
+                /* The only way right now is FLOCK, in this case we hide flock
+                   policy as lmm, but lmmsize is 0 */
+                LASSERT(lmm && lmmsize == 0);
+                LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
+                         einfo->ei_type);
+                policy = *(ldlm_policy_data_t *)lmm;
+                res_id.name[3] = LDLM_FLOCK;
+        } else if (it->it_op & IT_OPEN) {
+                int joinfile = !!((it->it_create_mode & M_JOIN_FILE) &&
                                               op_data->op_data);
 
                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
@@ -632,7 +652,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
                         einfo->ei_cbdata = NULL;
                         lmm = NULL;
                 } else
-                        it->it_flags &= ~O_JOIN_FILE;
+                        it->it_create_mode &= ~M_JOIN_FILE;
         } else if (it->it_op & IT_UNLINK)
                 req = mdc_intent_unlink_pack(exp, it, op_data);
         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
@@ -650,13 +670,28 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
 
         /* It is important to obtain rpc_lock first (if applicable), so that
          * threads that are serialised with rpc_lock are not polluting our
-         * rpcs in flight counter */
-        mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
-        mdc_enter_request(&obddev->u.cli);
+         * rpcs in flight counter. We do not do flock request limiting, though*/
+        if (it) {
+                mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+                mdc_enter_request(&obddev->u.cli);
+        }
         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
                               0, NULL, lockh, 0);
-        mdc_exit_request(&obddev->u.cli);
-        mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+        if (reqp)
+                *reqp = req;
+
+        if (it) {
+                mdc_exit_request(&obddev->u.cli);
+                mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
+        }
+        if (!it) {
+                /* For flock requests we immediatelly return without further
+                   delay and let caller deal with the rest, since rest of
+                   this function metadata processing makes no sense for flock
+                   requests anyway */
+                RETURN(rc);
+        }
+
         if (rc < 0) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
                 mdc_clear_replay_flag(req, rc);
@@ -700,7 +735,7 @@ static int mdc_finish_intent_lock(struct obd_export *exp,
         /* If we were revalidating a fid/name pair, mark the intent in
          * case we fail and get called again from lookup */
         if (fid_is_sane(&op_data->op_fid2) &&
-            it->it_flags & O_CHECK_STALE &&
+            it->it_create_mode & M_CHECK_STALE &&
             it->it_op != IT_GETATTR) {
                 it_set_disposition(it, DISP_ENQ_COMPLETE);
 
@@ -771,7 +806,7 @@ static int mdc_finish_intent_lock(struct obd_export *exp,
 
                 memcpy(&old_lock, lockh, sizeof(*lockh));
                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
-                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
+                                    LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
                         ldlm_lock_decref_and_cancel(lockh,
                                                     it->d.lustre.it_lock_mode);
                         memcpy(lockh, &old_lock, sizeof(old_lock));
@@ -873,7 +908,7 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
                 struct ldlm_enqueue_info einfo =
                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
-                          ldlm_completion_ast, NULL, NULL };
+                          ldlm_completion_ast, NULL, NULL, NULL };
 
                 /* For case if upper layer did not alloc fid, do it now. */
                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
@@ -884,12 +919,11 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                         }
                 }
                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
-                                 lmm, lmmsize, extra_lock_flags);
+                                 lmm, lmmsize, NULL, extra_lock_flags);
                 if (rc < 0)
                         RETURN(rc);
-                it->d.lustre.it_lock_handle = lockh.cookie;
         } else if (!fid_is_sane(&op_data->op_fid2) ||
-                   !(it->it_flags & O_CHECK_STALE)) {
+                   !(it->it_create_mode & M_CHECK_STALE)) {
                 /* DISP_ENQ_COMPLETE set means there is extra reference on
                  * request referenced from this intent, saved for subsequent
                  * lookup.  This path is executed when we proceed to this
@@ -901,7 +935,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
         RETURN(rc);
 }
 
-static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
+static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
+                                              struct ptlrpc_request *req,
                                               void *unused, int rc)
 {
         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
@@ -934,8 +969,6 @@ static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
         if (rc)
                 GOTO(out, rc);
 
-        it->d.lustre.it_lock_handle = lockh->cookie;
-
         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
         EXIT;
 
@@ -982,7 +1015,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
         req->rq_async_args.pointer_arg[1] = minfo;
         req->rq_async_args.pointer_arg[2] = einfo;
         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
-        ptlrpcd_add_req(req);
+        ptlrpcd_add_req(req, PSCOPE_OTHER);
 
         RETURN(0);
 }
@@ -1001,8 +1034,8 @@ int mdc_revalidate_lock(struct obd_export *exp,
         ENTRY;
 
         fid_build_reg_res_name(fid, &res_id);
-        /* As not all attributes are kept under update lock, e.g. 
-           owner/group/acls are under lookup lock, we need both 
+        /* As not all attributes are kept under update lock, e.g.
+           owner/group/acls are under lookup lock, we need both
            ibits for GETATTR. */
         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
@@ -1010,7 +1043,7 @@ int mdc_revalidate_lock(struct obd_export *exp,
 
         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
-                               &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
+                               &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
         if (mode) {
                 it->d.lustre.it_lock_handle = lockh.cookie;
                 it->d.lustre.it_lock_mode = mode;