Whamcloud - gitweb
LU-1876 hsm: layout lock implementation on server side
[fs/lustre-release.git] / lustre / llite / file.c
index 2fbeb61..b540065 100644 (file)
@@ -411,7 +411,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                 GOTO(out, rc);
         }
 
-        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
         if (!rc && itp->d.lustre.it_lock_mode)
                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
                                  itp, NULL);
@@ -1486,7 +1486,11 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file,
         rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
         if (rc == 0) {
                struct lov_stripe_md *lsm;
+               __u32 gen;
+
                put_user(0, &lumv1p->lmm_stripe_count);
+
+               ll_layout_refresh(inode, &gen);
                lsm = ccc_inode_lsm_get(inode);
                rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
                                   0, lsm, (void *)arg);
@@ -2501,7 +2505,7 @@ int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
                         RETURN(rc);
                 }
 
-                rc = ll_prep_inode(&inode, req, NULL);
+                rc = ll_prep_inode(&inode, req, NULL, NULL);
         }
 out:
         ptlrpc_req_finished(req);
@@ -2909,10 +2913,126 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
 
        result = cl_conf_set(env, lli->lli_clob, conf);
        cl_env_nested_put(&nest, env);
+
+       if (conf->coc_opc == OBJECT_CONF_SET) {
+               struct ldlm_lock *lock = conf->coc_lock;
+
+               LASSERT(lock != NULL);
+               LASSERT(ldlm_has_layout(lock));
+               if (result == 0) {
+                       /* it can only be allowed to match after layout is
+                        * applied to inode otherwise false layout would be
+                        * seen. Applying layout shoud happen before dropping
+                        * the intent lock. */
+                       ldlm_lock_allow_match(lock);
+               }
+       }
        RETURN(result);
 }
 
 /**
+ * Apply the layout to the inode. Layout lock is held and will be released
+ * in this function.
+ */
+static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
+                               struct inode *inode, __u32 *gen, bool reconf)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_sb_info    *sbi = ll_i2sbi(inode);
+       struct ldlm_lock *lock;
+       struct lustre_md md = { NULL };
+       struct cl_object_conf conf;
+       int rc = 0;
+       bool lvb_ready;
+       ENTRY;
+
+       LASSERT(lustre_handle_is_used(lockh));
+
+       lock = ldlm_handle2lock(lockh);
+       LASSERT(lock != NULL);
+       LASSERT(ldlm_has_layout(lock));
+
+       LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
+               inode, PFID(&lli->lli_fid), reconf);
+
+       lock_res_and_lock(lock);
+       lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
+       unlock_res_and_lock(lock);
+       /* checking lvb_ready is racy but this is okay. The worst case is
+        * that multi processes may configure the file on the same time. */
+       if (lvb_ready || !reconf) {
+               LDLM_LOCK_PUT(lock);
+
+               rc = -ENODATA;
+               if (lvb_ready) {
+                       /* layout_gen must be valid if layout lock is not
+                        * cancelled and stripe has already set */
+                       *gen = lli->lli_layout_gen;
+                       rc = 0;
+               }
+               ldlm_lock_decref(lockh, mode);
+               RETURN(rc);
+       }
+
+       /* for layout lock, lmm is returned in lock's lvb.
+        * lvb_data is immutable if the lock is held so it's safe to access it
+        * without res lock. See the description in ldlm_lock_decref_internal()
+        * for the condition to free lvb_data of layout lock */
+       if (lock->l_lvb_data != NULL) {
+               rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+                                 lock->l_lvb_data, lock->l_lvb_len);
+               if (rc >= 0) {
+                       if (md.lsm != NULL)
+                               *gen = md.lsm->lsm_layout_gen;
+                       rc = 0;
+               } else {
+                       CERROR("%s: file "DFID" unpackmd error: %d\n",
+                               ll_get_fsname(inode->i_sb, NULL, 0),
+                               PFID(&lli->lli_fid), rc);
+               }
+       }
+       if (rc < 0) {
+               LDLM_LOCK_PUT(lock);
+               ldlm_lock_decref(lockh, mode);
+               RETURN(rc);
+       }
+
+       /* set layout to file. Unlikely this will fail as old layout was
+        * surely eliminated */
+       memset(&conf, 0, sizeof conf);
+       conf.coc_opc = OBJECT_CONF_SET;
+       conf.coc_inode = inode;
+       conf.coc_lock = lock;
+       conf.u.coc_md = &md;
+       rc = ll_layout_conf(inode, &conf);
+       LDLM_LOCK_PUT(lock);
+
+       ldlm_lock_decref(lockh, mode);
+
+       if (md.lsm != NULL)
+               obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+
+       /* wait for IO to complete if it's still being used. */
+       if (rc == -EBUSY) {
+               CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
+                       ll_get_fsname(inode->i_sb, NULL, 0),
+                       inode, PFID(&lli->lli_fid));
+
+               memset(&conf, 0, sizeof conf);
+               conf.coc_opc = OBJECT_CONF_WAIT;
+               conf.coc_inode = inode;
+               rc = ll_layout_conf(inode, &conf);
+               if (rc == 0)
+                       rc = -EAGAIN;
+
+               CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
+                       PFID(&lli->lli_fid), rc);
+       }
+
+       RETURN(rc);
+}
+
+/**
  * This function checks if there exists a LAYOUT lock on the client side,
  * or enqueues it if it doesn't have one in cache.
  *
@@ -2929,9 +3049,9 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 {
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
-       struct md_op_data     *op_data = NULL;
-       struct lookup_intent   it = { .it_op = IT_LAYOUT };
-       struct lustre_handle   lockh = { 0 };
+       struct md_op_data     *op_data;
+       struct lookup_intent   it;
+       struct lustre_handle   lockh;
        ldlm_mode_t            mode;
        struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
                                           .ei_mode = LCK_CR,
@@ -2941,7 +3061,7 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        int rc;
        ENTRY;
 
-       *gen = 0;
+       *gen = LL_LAYOUT_GEN_ZERO;
        if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
                RETURN(0);
 
@@ -2951,92 +3071,67 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 
        /* mostly layout lock is caching on the local side, so try to match
         * it before grabbing layout lock mutex. */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
-                               LDLM_FL_LVB_READY);
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
        if (mode != 0) { /* hit cached lock */
-               /* lsm_layout_gen is started from 0, plus 1 here to distinguish
-                * the cases of no layout and first layout. */
-               *gen = lli->lli_layout_gen + 1;
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
+               if (rc == 0)
+                       RETURN(0);
 
-               ldlm_lock_decref(&lockh, mode);
-               RETURN(0);
+               /* better hold lli_layout_mutex to try again otherwise
+                * it will have starvation problem. */
        }
 
-       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
-                                    0, 0, LUSTRE_OPC_ANY, NULL);
-       if (IS_ERR(op_data))
-               RETURN(PTR_ERR(op_data));
-
        /* take layout lock mutex to enqueue layout lock exclusively. */
        mutex_lock(&lli->lli_layout_mutex);
 
-       /* try again inside layout mutex */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
-                               LDLM_FL_LVB_READY);
+again:
+       /* try again. Maybe somebody else has done this. */
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
        if (mode != 0) { /* hit cached lock */
-               *gen = lli->lli_layout_gen + 1;
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               if (rc == -EAGAIN)
+                       goto again;
 
-               ldlm_lock_decref(&lockh, mode);
                mutex_unlock(&lli->lli_layout_mutex);
-               ll_finish_md_op_data(op_data);
-               RETURN(0);
+               RETURN(rc);
+       }
+
+       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
+                       0, 0, LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data)) {
+               mutex_unlock(&lli->lli_layout_mutex);
+               RETURN(PTR_ERR(op_data));
        }
 
        /* have to enqueue one */
+       memset(&it, 0, sizeof(it));
+       it.it_op = IT_LAYOUT;
+       lockh.cookie = 0ULL;
+
+       LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
+                       ll_get_fsname(inode->i_sb, NULL, 0), inode,
+                       PFID(&lli->lli_fid));
+
        rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
                        NULL, 0, NULL, 0);
        if (it.d.lustre.it_data != NULL)
                ptlrpc_req_finished(it.d.lustre.it_data);
        it.d.lustre.it_data = NULL;
 
-       if (rc == 0) {
-               struct ldlm_lock *lock;
-               struct cl_object_conf conf;
-               struct lustre_md md = { NULL };
-               void *lmm;
-               int lmmsize;
+       ll_finish_md_op_data(op_data);
 
-               LASSERT(lustre_handle_is_used(&lockh));
+       mode = it.d.lustre.it_lock_mode;
+       it.d.lustre.it_lock_mode = 0;
+       ll_intent_drop_lock(&it);
 
+       if (rc == 0) {
                /* set lock data in case this is a new lock */
                ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
-
-               lock = ldlm_handle2lock(&lockh);
-               LASSERT(lock != NULL);
-
-               /* for IT_LAYOUT lock, lmm is returned in lock's lvb
-                * data via completion callback */
-               lmm = lock->l_lvb_data;
-               lmmsize = lock->l_lvb_len;
-               if (lmm != NULL) {
-                       rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
-                                       lmm, lmmsize);
-                       if (rc >= 0) {
-                               if (md.lsm != NULL)
-                                       *gen = md.lsm->lsm_layout_gen + 1;
-                               rc = 0;
-                       } else {
-                               CERROR("file: "DFID" unpackmd error: %d\n",
-                                       PFID(&lli->lli_fid), rc);
-                       }
-               }
-               LDLM_LOCK_PUT(lock);
-
-               /* set layout to file. This may cause lock expiration as we
-                * set layout inside layout ibits lock. */
-               memset(&conf, 0, sizeof conf);
-               conf.coc_inode = inode;
-               conf.u.coc_md = &md;
-               ll_layout_conf(inode, &conf);
-               /* is this racy? */
-               lli->lli_has_smd = md.lsm != NULL;
-               if (md.lsm != NULL)
-                       obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               if (rc == -EAGAIN)
+                       goto again;
        }
-       ll_intent_drop_lock(&it);
-
        mutex_unlock(&lli->lli_layout_mutex);
-       ll_finish_md_op_data(op_data);
 
        RETURN(rc);
 }