Whamcloud - gitweb
LU-8592 mdt: hold mdt_device::mdt_md_root until service stop
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 38e7eb0..9bd929f 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2014, Intel Corporation.
+ * Copyright (c) 2011, 2015, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -123,7 +119,7 @@ static int mdt_create_data(struct mdt_thread_info *info,
        ma->ma_need = MA_INODE | MA_LOV;
        ma->ma_valid = 0;
        mutex_lock(&o->mot_lov_mutex);
-       if (!(o->mot_flags & MOF_LOV_CREATED)) {
+       if (!o->mot_lov_created) {
                rc = mdo_create_data(info->mti_env,
                                     p ? mdt_object_child(p) : NULL,
                                     mdt_object_child(o), spec, ma);
@@ -131,7 +127,7 @@ static int mdt_create_data(struct mdt_thread_info *info,
                        rc = mdt_attr_get_complex(info, o, ma);
 
                if (rc == 0 && ma->ma_valid & MA_LOV)
-                       o->mot_flags |= MOF_LOV_CREATED;
+                       o->mot_lov_created = 1;
        }
 
        mutex_unlock(&o->mot_lov_mutex);
@@ -224,7 +220,7 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
                if (info->mti_transno != 0) {
                        struct obd_export *exp = req->rq_export;
 
-                       CERROR("%s: replay trans "LPU64" NID %s: rc = %d\n",
+                       CERROR("%s: replay trans %llu NID %s: rc = %d\n",
                               mdt_obd_name(mdt), info->mti_transno,
                               libcfs_nid2str(exp->exp_connection->c_peer.nid),
                               rc);
@@ -240,7 +236,7 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
        }
        spin_unlock(&mdt->mdt_lut.lut_translock);
 
-       CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+       CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
               info->mti_transno,
               req->rq_export->exp_obd->obd_last_committed);
 
@@ -259,7 +255,7 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
                 * otherwise the following resend(after replay) can not
                 * be checked correctly by xid */
                mutex_unlock(&ted->ted_lcd_lock);
-               CDEBUG(D_HA, "%s: transno = "LPU64" < last_transno = "LPU64"\n",
+               CDEBUG(D_HA, "%s: transno = %llu < last_transno = %llu\n",
                       mdt_obd_name(mdt), info->mti_transno,
                       lcd->lcd_last_transno);
                RETURN_EXIT;
@@ -295,7 +291,7 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
 {
        LASSERT(mfd != NULL);
 
-       CDEBUG(D_HA, DFID " Change mfd mode "LPO64" -> "LPO64".\n",
+       CDEBUG(D_HA, DFID " Change mfd mode %#llo -> %#llo.\n",
               PFID(mdt_object_fid(mfd->mfd_object)), mfd->mfd_mode, mode);
 
        mfd->mfd_mode = mode;
@@ -363,8 +359,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                        mdt_set_disposition(info, rep, DISP_OPEN_STRIPE);
         }
 
-        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
-               ma->ma_valid, ma->ma_lmm_size);
+       CDEBUG(D_INODE, "after open, ma_valid bit = %#llx lmm_size = %d\n",
+              ma->ma_valid, ma->ma_lmm_size);
 
         if (ma->ma_valid & MA_LOV) {
                 LASSERT(ma->ma_lmm_size != 0);
@@ -443,7 +439,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                old_mfd = mdt_handle2mfd(med, info->mti_rr.rr_handle, true);
                if (old_mfd != NULL) {
                        CDEBUG(D_HA, "delete orphan mfd = %p, fid = "DFID", "
-                              "cookie = "LPX64"\n", mfd,
+                              "cookie = %#llx\n", mfd,
                               PFID(mdt_object_fid(mfd->mfd_object)),
                               info->mti_rr.rr_handle->cookie);
                        class_handle_unhash(&old_mfd->mfd_handle);
@@ -459,12 +455,12 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                } else {
                        spin_unlock(&med->med_open_lock);
                        CDEBUG(D_HA, "orphan mfd not found, fid = "DFID", "
-                              "cookie = "LPX64"\n",
+                              "cookie = %#llx\n",
                               PFID(mdt_object_fid(mfd->mfd_object)),
                               info->mti_rr.rr_handle->cookie);
                }
 
-               CDEBUG(D_HA, "Store old cookie "LPX64" in new mfd\n",
+               CDEBUG(D_HA, "Store old cookie %#llx in new mfd\n",
                       info->mti_rr.rr_handle->cookie);
 
                mfd->mfd_old_handle.cookie = info->mti_rr.rr_handle->cookie;
@@ -537,22 +533,17 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                RETURN(-ENOENT);
        }
 
-        if (exp_connect_rmtclient(exp)) {
-                void *buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
-
-                rc = mdt_pack_remote_perm(info, o, buf);
-                if (rc) {
-                       repbody->mbo_valid &= ~OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = 0;
-               } else {
-                       repbody->mbo_valid |= OBD_MD_FLRMTPERM;
-                       repbody->mbo_aclsize = sizeof(struct mdt_remote_perm);
-                }
-        }
 #ifdef CONFIG_FS_POSIX_ACL
-       else if (exp_connect_flags(exp) & OBD_CONNECT_ACL)
-               rc = mdt_pack_acl2body(info, repbody, o,
-                                      exp->exp_target_data.ted_nodemap);
+       if (exp_connect_flags(exp) & OBD_CONNECT_ACL) {
+               struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       RETURN(PTR_ERR(nodemap));
+
+               rc = mdt_pack_acl2body(info, repbody, o, nodemap);
+               nodemap_putref(nodemap);
+               if (rc)
+                       RETURN(rc);
+       }
 #endif
 
         /*
@@ -648,7 +639,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
        opdata = mdt_req_from_lrd(req, info->mti_reply_data);
        mdt_set_disposition(info, ldlm_rep, opdata);
 
-        CDEBUG(D_INODE, "This is reconstruct open: disp="LPX64", result=%d\n",
+       CDEBUG(D_INODE, "This is reconstruct open: disp=%#llx, result=%d\n",
                ldlm_rep->lock_policy_res1, req->rq_status);
 
         if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
@@ -779,13 +770,13 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                                struct mdt_lock_handle *lhc,
                                __u64 *ibits)
 {
-       struct md_attr  *ma = &info->mti_attr;
-       __u64            open_flags = info->mti_spec.sp_cr_flags;
-       ldlm_mode_t      lm = LCK_CR;
-       bool             acq_lease = !!(open_flags & MDS_OPEN_LEASE);
-       bool             try_layout = false;
-       bool             create_layout = false;
-       int              rc = 0;
+       struct md_attr *ma = &info->mti_attr;
+       __u64 open_flags = info->mti_spec.sp_cr_flags;
+       enum ldlm_mode lm = LCK_CR;
+       bool acq_lease = !!(open_flags & MDS_OPEN_LEASE);
+       bool try_layout = false;
+       bool create_layout = false;
+       int rc = 0;
        ENTRY;
 
        *ibits = 0;
@@ -816,9 +807,10 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
 
                /* Lease must be with open lock */
                if (!(open_flags & MDS_OPEN_LOCK)) {
-                       CERROR("Request lease for file:"DFID ", but open lock "
-                               "is missed, open_flags = "LPO64".\n",
-                               PFID(mdt_object_fid(obj)), open_flags);
+                       CERROR("%s: Request lease for file:"DFID ", but open lock "
+                              "is missed, open_flags = %#llo : rc = %d\n",
+                              mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(obj)), open_flags, -EPROTO);
                        GOTO(out, rc = -EPROTO);
                }
 
@@ -878,16 +870,17 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                rc = mdt_object_lock(info, obj, lhc, *ibits);
        }
 
-       CDEBUG(D_INODE, "Requested bits lock:"DFID ", ibits = "LPX64
-               ", open_flags = "LPO64", try_layout = %d, rc = %d\n",
-               PFID(mdt_object_fid(obj)), *ibits, open_flags, try_layout, rc);
+       CDEBUG(D_INODE, "%s: Requested bits lock:"DFID ", ibits = %#llx"
+              ", open_flags = %#llo, try_layout = %d : rc = %d\n",
+              mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(obj)),
+              *ibits, open_flags, try_layout, rc);
 
        /* will change layout, revoke layout locks by enqueuing EX lock. */
        if (rc == 0 && create_layout) {
                struct mdt_lock_handle *ll = &info->mti_lh[MDT_LH_LAYOUT];
 
                CDEBUG(D_INODE, "Will create layout, get EX layout lock:"DFID
-                       ", open_flags = "LPO64"\n",
+                       ", open_flags = %#llo\n",
                        PFID(mdt_object_fid(obj)), open_flags);
 
                /* We cannot enqueue another lock for the same resource we
@@ -1145,7 +1138,7 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                       mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
                LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
                                &o->mot_obj,
-                               "Object isn't on this server! FLD error?\n");
+                               "Object isn't on this server! FLD error?");
                 rc = -EFAULT;
        } else {
                if (mdt_object_exists(o)) {
@@ -1177,6 +1170,60 @@ out:
         RETURN(rc);
 }
 
+/*
+ * find root object and take its xattr lock if it's on remote MDT, later create
+ * may use fs default striping (which is stored in root xattr).
+ */
+static int mdt_lock_root_xattr(struct mdt_thread_info *info,
+                              struct mdt_device *mdt)
+{
+       struct mdt_object *md_root = mdt->mdt_md_root;
+       struct lustre_handle lhroot;
+       int rc;
+
+       if (md_root == NULL) {
+               lu_root_fid(&info->mti_tmp_fid1);
+               md_root = mdt_object_find(info->mti_env, mdt,
+                                         &info->mti_tmp_fid1);
+               if (IS_ERR(md_root))
+                       return PTR_ERR(md_root);
+
+               spin_lock(&mdt->mdt_lock);
+               if (mdt->mdt_md_root != NULL) {
+                       spin_unlock(&mdt->mdt_lock);
+
+                       LASSERTF(mdt->mdt_md_root == md_root,
+                                "Different root object ("
+                                DFID") instances, %p, %p\n",
+                                PFID(&info->mti_tmp_fid1),
+                                mdt->mdt_md_root, md_root);
+                       LASSERT(atomic_read(
+                               &md_root->mot_obj.lo_header->loh_ref) > 1);
+
+                       mdt_object_put(info->mti_env, md_root);
+               } else {
+                       mdt->mdt_md_root = md_root;
+                       spin_unlock(&mdt->mdt_lock);
+               }
+       }
+
+       if (md_root->mot_cache_attr || !mdt_object_remote(md_root))
+               return 0;
+
+       rc = mdt_remote_object_lock(info, md_root, mdt_object_fid(md_root),
+                                   &lhroot, LCK_PR, MDS_INODELOCK_XATTR,
+                                   false, true);
+       if (rc < 0)
+               return rc;
+
+       md_root->mot_cache_attr = 1;
+
+       /* don't cancel this lock, so that we know the cached xattr is valid. */
+       ldlm_lock_decref(&lhroot, LCK_PR);
+
+       return 0;
+}
+
 int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 {
         struct mdt_device       *mdt = info->mti_mdt;
@@ -1219,8 +1266,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
             info->mti_spec.u.sp_ea.eadata == NULL)
                 GOTO(out, result = err_serious(-EINVAL));
 
+       if (create_flags & FMODE_WRITE &&
+           exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+               GOTO(out, result = -EROFS);
+
        CDEBUG(D_INODE, "I am going to open "DFID"/("DNAME"->"DFID") "
-              "cr_flag="LPO64" mode=0%06o msg_flag=0x%x\n",
+              "cr_flag=%#llo mode=0%06o msg_flag=0x%x\n",
               PFID(rr->rr_fid1), PNAME(&rr->rr_name),
               PFID(rr->rr_fid2), create_flags,
               ma->ma_attr.la_mode, msg_flags);
@@ -1264,17 +1315,21 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                GOTO(out, result);
        }
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
-                GOTO(out, result = err_serious(-ENOMEM));
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
+               GOTO(out, result = err_serious(-ENOMEM));
 
-        mdt_set_disposition(info, ldlm_rep,
-                            (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
+       mdt_set_disposition(info, ldlm_rep,
+                           (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
        if (!lu_name_is_valid(&rr->rr_name))
                GOTO(out, result = -EPROTO);
 
+       result = mdt_lock_root_xattr(info, mdt);
+       if (result < 0)
+               GOTO(out, result);
+
 again:
-        lh = &info->mti_lh[MDT_LH_PARENT];
+       lh = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh,
                          (create_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR,
                          &rr->rr_name);
@@ -1417,7 +1472,8 @@ again:
                         if (result != 0)
                                 GOTO(out_child, result);
                 }
-                created = 1;
+               created = 1;
+               mdt_counter_incr(req, LPROC_MDT_MKNOD);
         } else {
                 /*
                  * The object is on remote node, return its FID for remote open.
@@ -1642,7 +1698,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        lease_broken = ldlm_is_cancel(lease);
        unlock_res_and_lock(lease);
 
-       LDLM_DEBUG(lease, DFID " lease broken? %d\n",
+       LDLM_DEBUG(lease, DFID " lease broken? %d",
                   PFID(mdt_object_fid(o)), lease_broken);
 
        /* Cancel server side lease. Client side counterpart should
@@ -1672,8 +1728,8 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
 
        /* Compare on-disk and packed data_version */
        if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
-               CDEBUG(D_HSM, DFID" data_version mismatches: packed="LPU64
-                      " and on-disk="LPU64"\n", PFID(mdt_object_fid(o)),
+               CDEBUG(D_HSM, DFID" data_version mismatches: packed=%llu"
+                      " and on-disk=%llu\n", PFID(mdt_object_fid(o)),
                       data->cd_data_version, ma->ma_hsm.mh_arch_ver);
                GOTO(out_unlock, rc = -EPERM);
        }
@@ -1765,7 +1821,7 @@ out_close:
                CERROR("%s: error closing volatile file "DFID": rc = %d\n",
                       mdt_obd_name(info->mti_mdt), PFID(&data->cd_fid), rc2);
        LU_OBJECT_DEBUG(D_HSM, info->mti_env, &orphan->mot_obj,
-                       "object closed\n");
+                       "object closed");
        mdt_object_put(info->mti_env, orphan);
 
 out_unlock:
@@ -1860,7 +1916,7 @@ int mdt_close_swap_layouts(struct mdt_thread_info *info,
        lease_broken = ldlm_is_cancel(lease);
        unlock_res_and_lock(lease);
 
-       LDLM_DEBUG(lease, DFID " lease broken? %d\n",
+       LDLM_DEBUG(lease, DFID " lease broken? %d",
                   PFID(mdt_object_fid(o)), lease_broken);
 
        /* Cancel server side lease. Client side counterpart should
@@ -2019,7 +2075,7 @@ int mdt_close_internal(struct mdt_thread_info *info, struct ptlrpc_request *req,
        if (mdt_mfd_closed(mfd)) {
                spin_unlock(&med->med_open_lock);
                CDEBUG(D_INODE, "no handle for file close: fid = "DFID
-                      ": cookie = "LPX64"\n", PFID(info->mti_rr.rr_fid1),
+                      ": cookie = %#llx\n", PFID(info->mti_rr.rr_fid1),
                       info->mti_close_handle.cookie);
                /** not serious error since bug 3633 */
                rc = -ESTALE;