Whamcloud - gitweb
LU-11359 mdt: fix mdt_dom_discard_data() timeouts 71/34071/21
authorMikhail Pershin <mpershin@whamcloud.com>
Wed, 31 Oct 2018 13:28:29 +0000 (16:28 +0300)
committerOleg Drokin <green@whamcloud.com>
Sat, 1 Jun 2019 03:54:42 +0000 (03:54 +0000)
The mdt_dom_discard_data() issues new lock to cause data
discard for all conflicting client locks. This was done in
context of unlink RPC processing and may cause it to be stuck
waiting for client to cancel their locks leading to cascading
timeouts for any other locks waiting on the same resource and
parent directory.

Patch skips discard lock waiting in the current context by
using own CP callback for that which doesn't wait for blocking
locks. They will be finished later by LDLM and cleaned up in
that completion callback. So current thread just makes sure
discard locks are taken and BL ASTs are sent but doesnt't wait
for lock granting and that fixes the original problem.

At the same time that opens window for race with data being
flushed on client, so it is possible that new IO from client
will happen on just unlinked object causing error message and
it is not possible to distinguish that case from other
possibly critical situations. To solve that the unlinked object
is pinned in memory while until discard lock is granted.
Therefore, such objects can be easily distinguished as stale one
and any IO against it can be just silently ignored.

Older clients are not fully compatible with async DoM discard so
patch adds also new connection flag ASYNC_DISCARD to distinguish
old clients and use old blocking discard for then.

Test-Parameters: testlist=racer,racer,racer
Signed-off-by: Mikhail Pershin <mpershin@whamcloud.com>
Change-Id: I419677af43c33e365a246fe12205b506209deace
Reviewed-on: https://review.whamcloud.com/34071
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Patrick Farrell <pfarrell@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
19 files changed:
lustre/include/lustre_dlm.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_request.c
lustre/llite/llite_lib.c
lustre/llite/namei.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_io.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/obdclass/lprocfs_status.c
lustre/osc/osc_cache.c
lustre/ptlrpc/service.c
lustre/ptlrpc/wirehdr.c
lustre/ptlrpc/wiretest.c
lustre/tests/sanity.sh
lustre/utils/wirecheck.c
lustre/utils/wirehdr.c
lustre/utils/wiretest.c

index 0a6c998..6f9bb1e 100644 (file)
@@ -1431,6 +1431,8 @@ static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
        return ldlm_lvbo_update(res, NULL, req, increase);
 }
 
+int is_granted_or_cancelled_nolock(struct ldlm_lock *lock);
+
 int ldlm_error2errno(enum ldlm_error error);
 enum ldlm_error ldlm_errno2error(int err_no); /* don't call it `errno': this
                                               * confuses user-space. */
index bb9dd61..6d7f66c 100644 (file)
@@ -838,6 +838,7 @@ struct ptlrpc_body_v2 {
 #define OBD_CONNECT2_LSOM              0x800ULL /* LSOM support */
 #define OBD_CONNECT2_PCC               0x1000ULL /* Persistent Client Cache */
 #define OBD_CONNECT2_PLAIN_LAYOUT      0x2000ULL /* Plain Directory Layout */
+#define OBD_CONNECT2_ASYNC_DISCARD     0x4000ULL /* support async DoM data discard */
 
 /* XXX README XXX:
  * Please DO NOT add flag values here before first ensuring that this same
@@ -895,7 +896,8 @@ struct ptlrpc_body_v2 {
                                OBD_CONNECT2_LOCK_CONVERT | \
                                OBD_CONNECT2_ARCHIVE_ID_ARRAY | \
                                OBD_CONNECT2_SELINUX_POLICY | \
-                               OBD_CONNECT2_LSOM)
+                               OBD_CONNECT2_LSOM | \
+                               OBD_CONNECT2_ASYNC_DISCARD)
 
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
index d2235c0..136d9f5 100644 (file)
@@ -354,16 +354,13 @@ ldlm_add_var(struct lprocfs_vars *vars, struct dentry *debugfs_entry,
 
 static inline int is_granted_or_cancelled(struct ldlm_lock *lock)
 {
-        int ret = 0;
+       int ret = 0;
 
-        lock_res_and_lock(lock);
-       if (ldlm_is_granted(lock) && !ldlm_is_cp_reqd(lock))
-               ret = 1;
-       else if (ldlm_is_failed(lock) || ldlm_is_cancel(lock))
-                ret = 1;
-        unlock_res_and_lock(lock);
+       lock_res_and_lock(lock);
+       ret = is_granted_or_cancelled_nolock(lock);
+       unlock_res_and_lock(lock);
 
-        return ret;
+       return ret;
 }
 
 static inline bool is_bl_done(struct ldlm_lock *lock)
index bfbf599..ac63e69 100644 (file)
@@ -150,6 +150,19 @@ int ldlm_expired_completion_wait(void *data)
        RETURN(0);
 }
 
+int is_granted_or_cancelled_nolock(struct ldlm_lock *lock)
+{
+       int ret = 0;
+
+       check_res_locked(lock->l_resource);
+       if (ldlm_is_granted(lock) && !ldlm_is_cp_reqd(lock))
+               ret = 1;
+       else if (ldlm_is_failed(lock) || ldlm_is_cancel(lock))
+               ret = 1;
+       return ret;
+}
+EXPORT_SYMBOL(is_granted_or_cancelled_nolock);
+
 /**
  * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
  * lock cancel, and their replies). Used for lock completion timeout on the
index ff64d19..69bbc79 100644 (file)
@@ -226,7 +226,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT2_FLR |
                                   OBD_CONNECT2_LOCK_CONVERT |
                                   OBD_CONNECT2_ARCHIVE_ID_ARRAY |
-                                  OBD_CONNECT2_LSOM;
+                                  OBD_CONNECT2_LSOM |
+                                  OBD_CONNECT2_ASYNC_DISCARD;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
@@ -2137,11 +2138,17 @@ void ll_delete_inode(struct inode *inode)
        unsigned long nrpages;
        ENTRY;
 
-       if (S_ISREG(inode->i_mode) && lli->lli_clob != NULL)
+       if (S_ISREG(inode->i_mode) && lli->lli_clob != NULL) {
                /* It is last chance to write out dirty pages,
-                * otherwise we may lose data while umount */
-               cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, CL_FSYNC_LOCAL, 1);
-
+                * otherwise we may lose data while umount.
+                *
+                * If i_nlink is 0 then just discard data. This is safe because
+                * local inode gets i_nlink 0 from server only for the last
+                * unlink, so that file is not opened somewhere else
+                */
+               cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, inode->i_nlink ?
+                                  CL_FSYNC_LOCAL : CL_FSYNC_DISCARD, 1);
+       }
        truncate_inode_pages_final(mapping);
 
        /* Workaround for LU-118: Note nrpages may not be totally updated when
index 39778ea..f21f961 100644 (file)
@@ -233,8 +233,20 @@ void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
        __u64 bits = to_cancel;
        int rc;
 
-       if (inode == NULL)
-               return;
+       ENTRY;
+
+       if (!inode) {
+               /* That means the inode is evicted most likely and may cause
+                * the skipping of lock cleanups below, so print the message
+                * about that in log.
+                */
+               if (lock->l_resource->lr_lvb_inode)
+                       LDLM_DEBUG(lock,
+                                  "can't take inode for the lock (%sevicted)\n",
+                                  lock->l_resource->lr_lvb_inode->i_state &
+                                  I_FREEING ? "" : "not ");
+               RETURN_EXIT;
+       }
 
        if (!fid_res_name_eq(ll_inode2fid(inode),
                             &lock->l_resource->lr_name)) {
@@ -372,6 +384,7 @@ void ll_lock_cancel_bits(struct ldlm_lock *lock, __u64 to_cancel)
                ll_invalidate_aliases(inode);
 
        iput(inode);
+       RETURN_EXIT;
 }
 
 /* Check if the given lock may be downgraded instead of canceling and
index 7982041..546ad8a 100644 (file)
@@ -1293,8 +1293,7 @@ int mdt_brw_enqueue(struct mdt_thread_info *info, struct ldlm_namespace *ns,
                    struct ldlm_lock **lockp, __u64 flags);
 int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
                         struct lustre_handle *lh);
-void mdt_dom_discard_data(struct mdt_thread_info *info,
-                         const struct lu_fid *fid);
+void mdt_dom_discard_data(struct mdt_thread_info *info, struct mdt_object *mo);
 int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
                             struct ldlm_resource *res, bool increase_only);
 void mdt_dom_obj_lvb_update(const struct lu_env *env, struct mdt_object *mo,
index 1a8f019..3dcc8a8 100644 (file)
@@ -383,16 +383,42 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
 {
        struct dt_object *dob;
        int i, j, rc, tot_bytes = 0;
+       int level;
 
        ENTRY;
 
        mdt_dom_read_lock(mo);
-       if (!mdt_object_exists(mo))
-               GOTO(unlock, rc = -ENOENT);
+       *nr_local = 0;
+       /* the only valid case when READ can find object is missing or stale
+        * when export is just evicted and open files are closed forcefully
+        * on server while client's READ can be in progress.
+        * This should not happen on healthy export, object can't be missing
+        * or dying because both states means it was finally destroyed.
+        */
+       level = exp->exp_failed ? D_INFO : D_ERROR;
+       if (!mdt_object_exists(mo)) {
+               CDEBUG_LIMIT(level,
+                            "%s: READ IO to missing obj "DFID": rc = %d\n",
+                            exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
+                            -ENOENT);
+               /* return 0 and continue with empty commit to skip such READ
+                * without more BRW errors.
+                */
+               RETURN(0);
+       }
+       if (lu_object_is_dying(&mo->mot_header)) {
+               CDEBUG_LIMIT(level,
+                            "%s: READ IO to stale obj "DFID": rc = %d\n",
+                            exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
+                            -ESTALE);
+               /* return 0 and continue with empty commit to skip such READ
+                * without more BRW errors.
+                */
+               RETURN(0);
+       }
 
        dob = mdt_obj2dt(mo);
        /* parse remote buffers to local buffers and prepare the latter */
-       *nr_local = 0;
        for (i = 0, j = 0; i < niocount; i++) {
                rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 0);
                if (unlikely(rc < 0))
@@ -415,7 +441,6 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
        RETURN(0);
 buf_put:
        dt_bufs_put(env, dob, lnb, *nr_local);
-unlock:
        mdt_dom_read_unlock(mo);
        return rc;
 }
@@ -437,15 +462,34 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
        tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
 
        mdt_dom_read_lock(mo);
+       *nr_local = 0;
+       /* don't report error in cases with failed export */
        if (!mdt_object_exists(mo)) {
-               CDEBUG(D_ERROR, "%s: BRW to missing obj "DFID"\n",
-                      exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)));
-               GOTO(unlock, rc = -ENOENT);
+               int level = exp->exp_failed ? D_INFO : D_ERROR;
+
+               rc = -ENOENT;
+               CDEBUG_LIMIT(level,
+                            "%s: WRITE IO to missing obj "DFID": rc = %d\n",
+                            exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
+                            rc);
+               /* exit with no data written, note nr_local = 0 above */
+               GOTO(unlock, rc);
+       }
+       if (lu_object_is_dying(&mo->mot_header)) {
+               /* This is possible race between object destroy followed by
+                * discard BL AST and client cache flushing. Object is
+                * referenced until discard finish.
+                */
+               CDEBUG(D_INODE, "WRITE IO to stale object "DFID"\n",
+                      PFID(mdt_object_fid(mo)));
+               /* Note: continue with no error here to don't cause BRW errors
+                * but skip transaction in commitrw silently so no data is
+                * written.
+                */
        }
 
        dob = mdt_obj2dt(mo);
        /* parse remote buffers to local buffers and prepare the latter */
-       *nr_local = 0;
        for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
                rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 1);
                if (unlikely(rc < 0))
@@ -546,11 +590,10 @@ static int mdt_commitrw_read(const struct lu_env *env, struct mdt_device *mdt,
 
        ENTRY;
 
-       LASSERT(niocount > 0);
-
        dob = mdt_obj2dt(mo);
 
-       dt_bufs_put(env, dob, lnb, niocount);
+       if (niocount)
+               dt_bufs_put(env, dob, lnb, niocount);
 
        mdt_dom_read_unlock(mo);
        RETURN(rc);
@@ -580,6 +623,20 @@ static int mdt_commitrw_write(const struct lu_env *env, struct obd_export *exp,
 retry:
        if (!dt_object_exists(dob))
                GOTO(out, rc = -ENOENT);
+       if (lu_object_is_dying(&mo->mot_header)) {
+               /* Commit to stale object can be just skipped silently. */
+               CDEBUG(D_INODE, "skip commit to stale object "DFID"\n",
+                       PFID(mdt_object_fid(mo)));
+               GOTO(out, rc = 0);
+       }
+
+       if (niocount == 0) {
+               rc = -EPROTO;
+               DEBUG_REQ(D_WARNING, tgt_ses_req(tgt_ses_info(env)),
+                         "%s: commit with no pages for "DFID": rc = %d\n",
+                         exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)), rc);
+               GOTO(out, rc);
+       }
 
        th = dt_trans_create(env, dt);
        if (IS_ERR(th))
@@ -685,12 +742,6 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
        __u64 valid;
        int rc = 0;
 
-       if (npages == 0) {
-               CERROR("%s: no pages to commit\n",
-                      exp->exp_obd->obd_name);
-               rc = -EPROTO;
-       }
-
        LASSERT(mo);
 
        if (cmd == OBD_BRW_WRITE) {
@@ -757,7 +808,6 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
        } else {
                rc = -EPROTO;
        }
-       /* this put is pair to object_get in ofd_preprw_write */
        mdt_thread_info_fini(info);
        RETURN(rc);
 }
@@ -1224,32 +1274,6 @@ out:
        RETURN(rc);
 }
 
-void mdt_dom_discard_data(struct mdt_thread_info *info,
-                         const struct lu_fid *fid)
-{
-       struct mdt_device *mdt = info->mti_mdt;
-       union ldlm_policy_data *policy = &info->mti_policy;
-       struct ldlm_res_id *res_id = &info->mti_res_id;
-       struct lustre_handle dom_lh;
-       __u64 flags = LDLM_FL_AST_DISCARD_DATA;
-       int rc = 0;
-
-       policy->l_inodebits.bits = MDS_INODELOCK_DOM;
-       policy->l_inodebits.try_bits = 0;
-       fid_build_reg_res_name(fid, res_id);
-
-       /* Tell the clients that the object is gone now and that they should
-        * throw away any cached pages. */
-       rc = ldlm_cli_enqueue_local(info->mti_env, mdt->mdt_namespace, res_id,
-                                   LDLM_IBITS, policy, LCK_PW, &flags,
-                                   ldlm_blocking_ast, ldlm_completion_ast,
-                                   NULL, NULL, 0, LVB_T_NONE, NULL, &dom_lh);
-
-       /* We only care about the side-effects, just drop the lock. */
-       if (rc == ELDLM_OK)
-               ldlm_lock_decref_and_cancel(&dom_lh, LCK_PW);
-}
-
 /* check if client has already DoM lock for given resource */
 bool mdt_dom_client_has_lock(struct mdt_thread_info *info,
                             const struct lu_fid *fid)
@@ -1573,3 +1597,105 @@ out:
        RETURN(0);
 }
 
+/**
+ * Completion AST for DOM discard locks:
+ *
+ * CP AST an DOM discard lock is called always right after enqueue or from
+ * reprocess if lock was blocked, in the latest case l_ast_data is set to
+ * the mdt_object which is kept while there are pending locks on it.
+ */
+int ldlm_dom_discard_cp_ast(struct ldlm_lock *lock, __u64 flags, void *data)
+{
+       struct mdt_object *mo;
+       struct lustre_handle dom_lh;
+       struct lu_env *env;
+
+       ENTRY;
+
+       /* l_ast_data is set when lock was not granted immediately
+        * in mdt_dom_discard_data() below but put into waiting list,
+        * so this CP callback means we are finished and corresponding
+        * MDT object should be released finally as well as lock itself.
+        */
+       lock_res_and_lock(lock);
+       if (!lock->l_ast_data) {
+               unlock_res_and_lock(lock);
+               RETURN(0);
+       }
+
+       mo = lock->l_ast_data;
+       lock->l_ast_data = NULL;
+       unlock_res_and_lock(lock);
+
+       ldlm_lock2handle(lock, &dom_lh);
+       ldlm_lock_decref(&dom_lh, LCK_PW);
+
+       env = lu_env_find();
+       LASSERT(env);
+       mdt_object_put(env, mo);
+
+       RETURN(0);
+}
+
+void mdt_dom_discard_data(struct mdt_thread_info *info,
+                         struct mdt_object *mo)
+{
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct mdt_device *mdt = mdt_dev(mo->mot_obj.lo_dev);
+       union ldlm_policy_data policy;
+       struct ldlm_res_id res_id;
+       struct lustre_handle dom_lh;
+       struct ldlm_lock *lock;
+       __u64 flags = LDLM_FL_AST_DISCARD_DATA;
+       int rc = 0;
+       bool old_client;
+
+       ENTRY;
+
+       if (req && req_is_replay(req))
+               RETURN_EXIT;
+
+       policy.l_inodebits.bits = MDS_INODELOCK_DOM;
+       policy.l_inodebits.try_bits = 0;
+       fid_build_reg_res_name(mdt_object_fid(mo), &res_id);
+
+       /* Keep blocking version of discard for an old client to avoid
+        * crashes on non-patched clients. LU-11359.
+        */
+       old_client = req && !(exp_connect_flags2(req->rq_export) &
+                             OBD_CONNECT2_ASYNC_DISCARD);
+
+       /* Tell the clients that the object is gone now and that they should
+        * throw away any cached pages. */
+       rc = ldlm_cli_enqueue_local(info->mti_env, mdt->mdt_namespace, &res_id,
+                                   LDLM_IBITS, &policy, LCK_PW, &flags,
+                                   ldlm_blocking_ast, old_client ?
+                                   ldlm_completion_ast :
+                                   ldlm_dom_discard_cp_ast,
+                                   NULL, NULL, 0, LVB_T_NONE, NULL, &dom_lh);
+       if (rc != ELDLM_OK) {
+               CDEBUG(D_DLMTRACE,
+                      "Failed to issue discard lock, rc = %d\n", rc);
+               RETURN_EXIT;
+       }
+
+       lock = ldlm_handle2lock(&dom_lh);
+       lock_res_and_lock(lock);
+       /* if lock is not granted then there are BL ASTs in progress and
+        * lock will be granted in result of reprocessing with CP callback
+        * notifying about that. The mdt object has to be kept until that and
+        * it is saved in l_ast_data of the lock. Lock reference is kept too
+        * until that to prevent it from canceling.
+        */
+       if (!is_granted_or_cancelled_nolock(lock)) {
+               mdt_object_get(info->mti_env, mo);
+               lock->l_ast_data = mo;
+               unlock_res_and_lock(lock);
+       } else {
+               unlock_res_and_lock(lock);
+               ldlm_lock_decref_and_cancel(&dom_lh, LCK_PW);
+       }
+       LDLM_LOCK_PUT(lock);
+
+       RETURN_EXIT;
+}
index f01571b..9211ee2 100644 (file)
@@ -2242,7 +2242,6 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        int rc = 0;
        u64 open_flags;
        u64 intent;
-       bool discard = false;
 
        ENTRY;
 
@@ -2326,7 +2325,8 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
 
        if (!MFD_CLOSED(open_flags)) {
                rc = mo_close(info->mti_env, next, ma, open_flags);
-               discard = mdt_dom_check_for_discard(info, o);
+               if (mdt_dom_check_for_discard(info, o))
+                       mdt_dom_discard_data(info, o);
        }
 
        /* adjust open and lease count */
@@ -2338,9 +2338,6 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        mdt_mfd_free(mfd);
        mdt_object_put(info->mti_env, o);
 
-       if (discard)
-               mdt_dom_discard_data(info, ofid);
-
        RETURN(rc);
 }
 
index c97e718..fdb9798 100644 (file)
@@ -844,7 +844,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_lock_handle *child_lh;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        __u64 lock_ibits;
-       bool cos_incompat = false, discard = false;
+       bool cos_incompat = false;
        int no_name = 0;
        int rc;
 
@@ -1025,8 +1025,8 @@ relock:
                rc = mdt_attr_get_complex(info, mc, ma);
                if (rc)
                        GOTO(out_stat, rc);
-       } else {
-               discard = mdt_dom_check_for_discard(info, mc);
+       } else if (mdt_dom_check_for_discard(info, mc)) {
+               mdt_dom_discard_data(info, mc);
        }
        mdt_handle_last_unlink(info, mc, ma);
 
@@ -1060,13 +1060,6 @@ unlock_parent:
        mdt_object_unlock(info, mp, parent_lh, rc);
 put_parent:
        mdt_object_put(info->mti_env, mp);
-
-       /* discard is just a PW DOM lock to drop the data on a client
-        * no need to keep objects being get and locked, do that after all.
-        */
-       if (discard)
-               mdt_dom_discard_data(info, child_fid);
-
         return rc;
 }
 
@@ -2295,8 +2288,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        struct lu_fid *old_fid = &info->mti_tmp_fid1;
        struct lu_fid *new_fid = &info->mti_tmp_fid2;
        __u64 lock_ibits;
-       bool reverse = false;
-       bool cos_incompat, discard = false;
+       bool reverse = false, discard = false;
+       bool cos_incompat;
        int rc;
        ENTRY;
 
@@ -2619,7 +2612,6 @@ relock:
                        mdt_handle_last_unlink(info, mnew, ma);
                        discard = mdt_dom_check_for_discard(info, mnew);
                }
-
                mdt_rename_counter_tally(info, info->mti_mdt, req,
                                         msrcdir, mtgtdir);
        }
@@ -2630,7 +2622,7 @@ relock:
 out_unlock_old:
        mdt_object_unlock(info, mold, lh_oldp, rc);
 out_put_new:
-       if (mnew != NULL)
+       if (mnew && !discard)
                mdt_object_put(info->mti_env, mnew);
 out_put_old:
        mdt_object_put(info->mti_env, mold);
@@ -2645,13 +2637,15 @@ out_put_tgtdir:
 out_put_srcdir:
        mdt_object_put(info->mti_env, msrcdir);
 
-       /* If 'discard' is set then new_fid must exits.
-        * DOM data discard need neither object nor lock,
-        * so do this at the end.
+       /* The DoM discard can be done right in the place above where it is
+        * assigned, meanwhile it is done here after rename unlock due to
+        * compatibility with old clients, for them the discard blocks
+        * the main thread until completion. Check LU-11359 for details.
         */
-       if (discard)
-               mdt_dom_discard_data(info, new_fid);
-
+       if (discard) {
+               mdt_dom_discard_data(info, mnew);
+               mdt_object_put(info->mti_env, mnew);
+       }
        return rc;
 }
 
index 2e28262..87d2af8 100644 (file)
@@ -787,6 +787,7 @@ static const char *obd_connect_names[] = {
        "lsom",                 /* 0x800 */
        "pcc",                  /* 0x1000 */
        "plain_layout",         /* 0x2000 */
+       "async_discard",        /* 0x4000 */
        NULL
 };
 
index 3e179c1..ea721ba 100644 (file)
@@ -3032,7 +3032,7 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
                                /* the only discarder is lock cancelling, so
                                 * [start, end] must contain this extent */
                                EASSERT(ext->oe_start >= start &&
-                                       ext->oe_max_end <= end, ext);
+                                       ext->oe_end <= end, ext);
                                osc_extent_state_set(ext, OES_LOCKING);
                                ext->oe_owner = current;
                                list_move_tail(&ext->oe_link,
index ca01179..be4cd31 100644 (file)
@@ -2871,8 +2871,13 @@ static int ptlrpc_hr_main(void *arg)
        struct ptlrpc_hr_thread *hrt = (struct ptlrpc_hr_thread *)arg;
        struct ptlrpc_hr_partition *hrp = hrt->hrt_partition;
        struct list_head replies;
+       struct lu_env *env;
        int rc;
 
+       OBD_ALLOC_PTR(env);
+       if (env == NULL)
+               RETURN(-ENOMEM);
+
        INIT_LIST_HEAD(&replies);
        unshare_fs_struct();
 
@@ -2886,6 +2891,15 @@ static int ptlrpc_hr_main(void *arg)
                      threadname, hrp->hrp_cpt, ptlrpc_hr.hr_cpt_table, rc);
        }
 
+       rc = lu_context_init(&env->le_ctx, LCT_MD_THREAD | LCT_DT_THREAD |
+                            LCT_REMEMBER | LCT_NOREF);
+       if (rc)
+               GOTO(out_env, rc);
+
+       rc = lu_env_add(env);
+       if (rc)
+               GOTO(out_ctx_fini, rc);
+
        atomic_inc(&hrp->hrp_nstarted);
        wake_up(&ptlrpc_hr.hr_waitq);
 
@@ -2899,13 +2913,22 @@ static int ptlrpc_hr_main(void *arg)
                                        struct ptlrpc_reply_state,
                                        rs_list);
                        list_del_init(&rs->rs_list);
+                       /* refill keys if needed */
+                       lu_env_refill(env);
+                       lu_context_enter(&env->le_ctx);
                        ptlrpc_handle_rs(rs);
+                       lu_context_exit(&env->le_ctx);
                }
        }
 
        atomic_inc(&hrp->hrp_nstopped);
        wake_up(&ptlrpc_hr.hr_waitq);
 
+       lu_env_remove(env);
+out_ctx_fini:
+       lu_context_fini(&env->le_ctx);
+out_env:
+       OBD_FREE_PTR(env);
        return 0;
 }
 
index 0778636..7f9fb09 100644 (file)
@@ -42,4 +42,5 @@
 #include <lustre_net.h>
 #include <lustre_disk.h>
 #include <uapi/linux/lustre/lustre_lfsck_user.h>
+#include <uapi/linux/lustre/lustre_cfg.h>
 
index deda48f..b0f4d81 100644 (file)
@@ -44,6 +44,7 @@
 #include <uapi/linux/lustre/lustre_lfsck_user.h>
 #include <uapi/linux/lustre/lustre_cfg.h>
 
+
 void lustre_assert_wire_constants(void)
 {
        /* Wire protocol assertions generated by 'wirecheck'
@@ -1348,6 +1349,8 @@ void lustre_assert_wire_constants(void)
                 OBD_CONNECT2_PCC);
        LASSERTF(OBD_CONNECT2_PLAIN_LAYOUT == 0x2000ULL, "found 0x%.16llxULL\n",
                 OBD_CONNECT2_PLAIN_LAYOUT);
+       LASSERTF(OBD_CONNECT2_ASYNC_DISCARD == 0x4000ULL, "found 0x%.16llxULL\n",
+                OBD_CONNECT2_ASYNC_DISCARD);
        LASSERTF(OBD_CKSUM_CRC32 == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)OBD_CKSUM_CRC32);
        LASSERTF(OBD_CKSUM_ADLER == 0x00000002UL, "found 0x%.8xUL\n",
index 8e94baa..a82727d 100755 (executable)
@@ -17861,6 +17861,26 @@ test_271f() {
 }
 run_test 271f "DoM: read on open (200K file and read tail)"
 
+test_271g() {
+       [[ $($LCTL get_param mdc.*.import) =~ async_discard ]] ||
+               skip "Skipping due to old client or server version"
+
+       $LFS setstripe -E 1024K -L mdt -E EOF $DIR1/$tfile
+       # to get layout
+       $CHECKSTAT -t file $DIR1/$tfile
+
+       $MULTIOP $DIR1/$tfile Ow40960_w4096c &
+       MULTIOP_PID=$!
+       sleep 1
+       #define OBD_FAIL_LDLM_CANCEL_BL_CB_RACE
+       $LCTL set_param fail_loc=0x80000314
+       rm $DIR1/$tfile || error "Unlink fails"
+       RC=$?
+       kill -USR1 $MULTIOP_PID && wait $MULTIOP_PID || error "multiop failure"
+       [ $RC -eq 0 ] || error "Failed write to stale object"
+}
+run_test 271g "Discard DoM data vs client flush race"
+
 test_272a() {
        [ $MDS1_VERSION -lt $(version_code 2.11.50) ] &&
                skip "Need MDS version at least 2.11.50"
index ae8cf00..349db93 100644 (file)
@@ -611,6 +611,7 @@ check_obd_connect_data(void)
        CHECK_DEFINE_64X(OBD_CONNECT2_LSOM);
        CHECK_DEFINE_64X(OBD_CONNECT2_PCC);
        CHECK_DEFINE_64X(OBD_CONNECT2_PLAIN_LAYOUT);
+       CHECK_DEFINE_64X(OBD_CONNECT2_ASYNC_DISCARD);
 
        CHECK_VALUE_X(OBD_CKSUM_CRC32);
        CHECK_VALUE_X(OBD_CKSUM_ADLER);
@@ -2967,7 +2968,7 @@ main(int argc, char **argv)
 
        check_lustre_cfg();
 
-       printf("}\n\n");
+       printf("}\n");
 
        return 0;
 }
index baa59d4..b2cb77c 100644 (file)
@@ -40,6 +40,7 @@
 #include <linux/lustre/lustre_lfsck_user.h>
 #include <linux/lustre/lustre_disk.h>
 #endif
+#include <linux/lustre/lustre_cfg.h>
 
 #define LASSERT(cond) if (!(cond)) { printf("failed " #cond "\n"); ret = 1; }
 #define LASSERTF(cond, fmt, ...) if (!(cond)) { printf("failed '" #cond "'" fmt, ## __VA_ARGS__);ret = 1;}
index e7edcad..6c75192 100644 (file)
@@ -1370,6 +1370,8 @@ void lustre_assert_wire_constants(void)
                 OBD_CONNECT2_PCC);
        LASSERTF(OBD_CONNECT2_PLAIN_LAYOUT == 0x2000ULL, "found 0x%.16llxULL\n",
                 OBD_CONNECT2_PLAIN_LAYOUT);
+       LASSERTF(OBD_CONNECT2_ASYNC_DISCARD == 0x4000ULL, "found 0x%.16llxULL\n",
+                OBD_CONNECT2_ASYNC_DISCARD);
        LASSERTF(OBD_CKSUM_CRC32 == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)OBD_CKSUM_CRC32);
        LASSERTF(OBD_CKSUM_ADLER == 0x00000002UL, "found 0x%.8xUL\n",