Whamcloud - gitweb
LU-3280 ldlm: suppress useless lock RPC for layout
[fs/lustre-release.git] / lustre / llite / file.c
index fe4f03c..cfb48ca 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -1257,7 +1257,7 @@ static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
 }
 #endif
 
-static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
+static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
                            obd_count ost_idx)
 {
        struct obd_export *exp = ll_i2dtexp(inode);
@@ -1283,8 +1283,7 @@ static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
         if (lsm2 == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        oa->o_id = id;
-        oa->o_seq = seq;
+       oa->o_oi = *oi;
         oa->o_nlink = ost_idx;
         oa->o_flags |= OBD_FL_RECREATE_OBJS;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
@@ -1307,6 +1306,7 @@ out:
 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
 {
        struct ll_recreate_obj ucreat;
+       struct ost_id           oi;
        ENTRY;
 
        if (!cfs_capable(CFS_CAP_SYS_ADMIN))
@@ -1316,14 +1316,15 @@ static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
                           sizeof(ucreat)))
                RETURN(-EFAULT);
 
-       RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
-                              ucreat.lrc_ost_idx));
+       ostid_set_seq_mdt0(&oi);
+       ostid_set_id(&oi, ucreat.lrc_id);
+       RETURN(ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx));
 }
 
 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
 {
        struct lu_fid   fid;
-       obd_id          id;
+       struct ost_id   oi;
        obd_count       ost_idx;
         ENTRY;
 
@@ -1333,9 +1334,9 @@ static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
        if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
                RETURN(-EFAULT);
 
-       id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
+       fid_to_ostid(&fid, &oi);
        ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
-       RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
+       RETURN(ll_lov_recreate(inode, &oi, ost_idx));
 }
 
 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
@@ -1868,7 +1869,7 @@ static int ll_swap_layouts(struct file *file1, struct file *file2,
        __u32                    gid;
        __u64                    dv;
        struct ll_swap_stack    *llss = NULL;
-       int                      rc, rc1;
+       int                      rc;
 
        OBD_ALLOC_PTR(llss);
        if (llss == NULL)
@@ -1990,12 +1991,22 @@ putgl:
        }
 
        /* update time if requested */
-       rc = rc1 = 0;
-       if (llss->ia2.ia_valid != 0)
+       rc = 0;
+       if (llss->ia2.ia_valid != 0) {
+               mutex_lock(&llss->inode1->i_mutex);
                rc = ll_setattr(file1->f_dentry, &llss->ia2);
+               mutex_unlock(&llss->inode1->i_mutex);
+       }
+
+       if (llss->ia1.ia_valid != 0) {
+               int rc1;
 
-       if (llss->ia1.ia_valid != 0)
+               mutex_lock(&llss->inode2->i_mutex);
                rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
+               mutex_unlock(&llss->inode2->i_mutex);
+               if (rc == 0)
+                       rc = rc1;
+       }
 
 free:
        if (llss != NULL)
@@ -2363,7 +2374,7 @@ int ll_flush(struct file *file, fl_owner_t id)
  * Return how many pages have been written.
  */
 int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
-                      enum cl_fsync_mode mode)
+                      enum cl_fsync_mode mode, int ignore_layout)
 {
        struct cl_env_nest nest;
        struct lu_env *env;
@@ -2385,7 +2396,7 @@ int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
 
        io = ccc_env_thread_io(env);
        io->ci_obj = cl_i2info(inode)->lli_clob;
-       io->ci_ignore_layout = 1;
+       io->ci_ignore_layout = ignore_layout;
 
        /* initialize parameters for sync */
        fio = &io->u.ci_fsync;
@@ -2473,7 +2484,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int datasync)
                struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
 
                err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
-                               CL_FSYNC_ALL);
+                               CL_FSYNC_ALL, 0);
                if (rc == 0 && err < 0)
                        rc = err;
                if (rc < 0)
@@ -3225,6 +3236,72 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
        RETURN(result);
 }
 
+/* Fetch layout from MDT with getxattr request, if it's not ready yet */
+static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
+
+{
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
+       struct obd_capa *oc;
+       struct ptlrpc_request *req;
+       struct mdt_body *body;
+       void *lvbdata;
+       void *lmm;
+       int lmmsize;
+       int rc;
+       ENTRY;
+
+       if (lock->l_lvb_data != NULL)
+               RETURN(0);
+
+       /* if layout lock was granted right away, the layout is returned
+        * within DLM_LVB of dlm reply; otherwise if the lock was ever
+        * blocked and then granted via completion ast, we have to fetch
+        * layout here. Please note that we can't use the LVB buffer in
+        * completion AST because it doesn't have a large enough buffer */
+       oc = ll_mdscapa_get(inode);
+       rc = ll_get_max_mdsize(sbi, &lmmsize);
+       if (rc == 0)
+               rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
+                               OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
+                               lmmsize, 0, &req);
+       capa_put(oc);
+       if (rc < 0)
+               RETURN(rc);
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL || body->eadatasize > lmmsize)
+               GOTO(out, rc = -EPROTO);
+
+       lmmsize = body->eadatasize;
+       if (lmmsize == 0) /* empty layout */
+               GOTO(out, rc = 0);
+
+       lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
+       if (lmm == NULL)
+               GOTO(out, rc = -EFAULT);
+
+       OBD_ALLOC_LARGE(lvbdata, lmmsize);
+       if (lvbdata == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       memcpy(lvbdata, lmm, lmmsize);
+       lock_res_and_lock(lock);
+       if (lock->l_lvb_data == NULL) {
+               lock->l_lvb_data = lvbdata;
+               lock->l_lvb_len = lmmsize;
+               lvbdata = NULL;
+       }
+       unlock_res_and_lock(lock);
+
+       if (lvbdata != NULL)
+               OBD_FREE_LARGE(lvbdata, lmmsize);
+       EXIT;
+
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
 /**
  * Apply the layout to the inode. Layout lock is held and will be released
  * in this function.
@@ -3239,6 +3316,7 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
        struct cl_object_conf conf;
        int rc = 0;
        bool lvb_ready;
+       bool wait_layout = false;
        ENTRY;
 
        LASSERT(lustre_handle_is_used(lockh));
@@ -3250,14 +3328,15 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
        LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
                inode, PFID(&lli->lli_fid), reconf);
 
+       /* in case this is a caching lock and reinstate with new inode */
+       md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
+
        lock_res_and_lock(lock);
        lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
        unlock_res_and_lock(lock);
        /* checking lvb_ready is racy but this is okay. The worst case is
         * that multi processes may configure the file on the same time. */
        if (lvb_ready || !reconf) {
-               LDLM_LOCK_PUT(lock);
-
                rc = -ENODATA;
                if (lvb_ready) {
                        /* layout_gen must be valid if layout lock is not
@@ -3265,10 +3344,13 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
                        *gen = lli->lli_layout_gen;
                        rc = 0;
                }
-               ldlm_lock_decref(lockh, mode);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
+       rc = ll_layout_fetch(inode, lock);
+       if (rc < 0)
+               GOTO(out, rc);
+
        /* for layout lock, lmm is returned in lock's lvb.
         * lvb_data is immutable if the lock is held so it's safe to access it
         * without res lock. See the description in ldlm_lock_decref_internal()
@@ -3287,11 +3369,8 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
                                PFID(&lli->lli_fid), rc);
                }
        }
-       if (rc < 0) {
-               LDLM_LOCK_PUT(lock);
-               ldlm_lock_decref(lockh, mode);
-               RETURN(rc);
-       }
+       if (rc < 0)
+               GOTO(out, rc);
 
        /* set layout to file. Unlikely this will fail as old layout was
         * surely eliminated */
@@ -3301,15 +3380,20 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
        conf.coc_lock = lock;
        conf.u.coc_md = &md;
        rc = ll_layout_conf(inode, &conf);
-       LDLM_LOCK_PUT(lock);
-
-       ldlm_lock_decref(lockh, mode);
 
        if (md.lsm != NULL)
                obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
 
+       /* refresh layout failed, need to wait */
+       wait_layout = rc == -EBUSY;
+       EXIT;
+
+out:
+       LDLM_LOCK_PUT(lock);
+       ldlm_lock_decref(lockh, mode);
+
        /* wait for IO to complete if it's still being used. */
-       if (rc == -EBUSY) {
+       if (wait_layout) {
                CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
                        ll_get_fsname(inode->i_sb, NULL, 0),
                        inode, PFID(&lli->lli_fid));
@@ -3324,7 +3408,6 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
                CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
                        PFID(&lli->lli_fid), rc);
        }
-
        RETURN(rc);
 }
 
@@ -3357,7 +3440,7 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        int rc;
        ENTRY;
 
-       *gen = LL_LAYOUT_GEN_NONE;
+       *gen = lli->lli_layout_gen;
        if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
                RETURN(0);
 
@@ -3416,8 +3499,6 @@ again:
 
        ll_finish_md_op_data(op_data);
 
-       md_set_lock_data(sbi->ll_md_exp, &it.d.lustre.it_lock_handle, inode, NULL);
-
        mode = it.d.lustre.it_lock_mode;
        it.d.lustre.it_lock_mode = 0;
        ll_intent_drop_lock(&it);