Whamcloud - gitweb
LU-7702 ldlm: skip lock if export failed 20/18120/9
authorAlexander Boyko <alexander.boyko@seagate.com>
Wed, 15 Jun 2016 06:12:14 +0000 (14:12 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 27 Jun 2016 18:55:30 +0000 (18:55 +0000)
This patch resolves IO vs eviction race.
After eviction failed export stayed at stale list,
a client had IO processing and reconnected during it.
A client sent brw rpc with last lock cookie and new connection.
The lock with failed export was found and assert was happened.
(ost_handler.c:1812:ost_prolong_lock_one())
ASSERTION( lock->l_export == opd->opd_exp ) failed:

1. Skip the lock at ldlm_handle2lock if lock export failed.
2. Validation of lock for IO was added at hpreq_check(). The lock
   searching is based on granted interval tree. If server doesn`t
   have a valid lock, it reply to client with ESTALE.
3. A lock prolong was moved at ldlm layer.

Signed-off-by: Alexander Boyko <alexander.boyko@seagate.com>
Seagate-bug-id: MRP-2787
Change-Id: I2177736564c3b8164f1ad3e4cc02dca1704a0e6e
Reviewed-on: http://review.whamcloud.com/18120
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Vitaly Fertman <vitaly.fertman@seagate.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_dlm.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_lock.c
lustre/ofd/ofd_dev.c
lustre/osc/osc_request.c
lustre/ptlrpc/service.c
lustre/tests/recovery-small.sh

index 0afbad7..4594387 100644 (file)
@@ -1204,6 +1204,19 @@ int ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data);
 /* ldlm_extent.c */
 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms);
 
 /* ldlm_extent.c */
 __u64 ldlm_extent_shift_kms(struct ldlm_lock *lock, __u64 old_kms);
 
+struct ldlm_prolong_args {
+       struct obd_export       *lpa_export;
+       struct ldlm_res_id      lpa_resid;
+       struct ldlm_extent      lpa_extent;
+       enum ldlm_mode          lpa_mode;
+       int                     lpa_timeout;
+       int                     lpa_locks_cnt;
+       int                     lpa_blocks_cnt;
+};
+void ldlm_lock_prolong_one(struct ldlm_lock *lock,
+                          struct ldlm_prolong_args *arg);
+void ldlm_resource_prolong(struct ldlm_prolong_args *arg);
+
 struct ldlm_callback_suite {
         ldlm_completion_callback lcs_completion;
         ldlm_blocking_callback   lcs_blocking;
 struct ldlm_callback_suite {
         ldlm_completion_callback lcs_completion;
         ldlm_blocking_callback   lcs_blocking;
index 87dcb30..c1eedb5 100644 (file)
@@ -392,6 +392,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_OSC_NO_GRANT            0x411
 #define OBD_FAIL_OSC_DELAY_SETTIME      0x412
 #define OBD_FAIL_OSC_CONNECT_GRANT_PARAM 0x413
 #define OBD_FAIL_OSC_NO_GRANT            0x411
 #define OBD_FAIL_OSC_DELAY_SETTIME      0x412
 #define OBD_FAIL_OSC_CONNECT_GRANT_PARAM 0x413
+#define OBD_FAIL_OSC_DELAY_IO            0x414
 
 #define OBD_FAIL_PTLRPC                  0x500
 #define OBD_FAIL_PTLRPC_ACK              0x501
 
 #define OBD_FAIL_PTLRPC                  0x500
 #define OBD_FAIL_PTLRPC_ACK              0x501
index 34bfa76..ace668a 100644 (file)
@@ -632,6 +632,112 @@ destroylock:
 }
 
 /**
 }
 
 /**
+ * This function refresh eviction timer for cancelled lock.
+ * \param[in] lock             ldlm lock for refresh
+ * \param[in] arg              ldlm prolong arguments, timeout, export, extent
+ *                             and counter are used
+ */
+void ldlm_lock_prolong_one(struct ldlm_lock *lock,
+                          struct ldlm_prolong_args *arg)
+{
+       int timeout;
+
+       if (arg->lpa_export != lock->l_export ||
+           lock->l_flags & LDLM_FL_DESTROYED)
+               /* ignore unrelated locks */
+               return;
+
+       arg->lpa_locks_cnt++;
+
+       if (!(lock->l_flags & LDLM_FL_AST_SENT))
+               /* ignore locks not being cancelled */
+               return;
+
+       /* We are in the middle of the process - BL AST is sent, CANCEL
+        * is ahead. Take half of BL AT + IO AT process time.
+        */
+       timeout = arg->lpa_timeout + (ldlm_bl_timeout(lock) >> 1);
+
+       LDLM_DEBUG(lock, "refreshed to %ds.\n", timeout);
+
+       arg->lpa_blocks_cnt++;
+
+       /* OK. this is a possible lock the user holds doing I/O
+        * let's refresh eviction timer for it.
+        */
+       ldlm_refresh_waiting_lock(lock, timeout);
+}
+EXPORT_SYMBOL(ldlm_lock_prolong_one);
+
+static enum interval_iter ldlm_resource_prolong_cb(struct interval_node *n,
+                                                  void *data)
+{
+       struct ldlm_prolong_args *arg = data;
+       struct ldlm_interval *node = to_ldlm_interval(n);
+       struct ldlm_lock *lock;
+
+       ENTRY;
+
+       LASSERT(!list_empty(&node->li_group));
+
+       list_for_each_entry(lock, &node->li_group, l_sl_policy) {
+               ldlm_lock_prolong_one(lock, arg);
+       }
+
+       RETURN(INTERVAL_ITER_CONT);
+}
+
+/**
+ * Walk through granted tree and prolong locks if they overlaps extent.
+ *
+ * \param[in] arg              prolong args
+ */
+void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
+{
+       struct ldlm_interval_tree *tree;
+       struct ldlm_resource *res;
+       struct interval_node_extent ex = { .start = arg->lpa_extent.start,
+                                          .end = arg->lpa_extent.end };
+       int idx;
+
+       ENTRY;
+
+       res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace, NULL,
+                               &arg->lpa_resid, LDLM_EXTENT, 0);
+       if (IS_ERR(res)) {
+               CDEBUG(D_DLMTRACE, "Failed to get resource for resid "LPU64"/"
+                      LPU64"\n", arg->lpa_resid.name[0],
+                      arg->lpa_resid.name[1]);
+               RETURN_EXIT;
+       }
+
+       lock_res(res);
+       for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+               tree = &res->lr_itree[idx];
+               if (tree->lit_root == NULL) /* empty tree, skipped */
+                       continue;
+
+               /* There is no possibility to check for the groupID
+                * so all the group locks are considered as valid
+                * here, especially because the client is supposed
+                * to check it has such a lock before sending an RPC.
+                */
+               if (!(tree->lit_mode & arg->lpa_mode))
+                       continue;
+
+               interval_search(tree->lit_root, &ex,
+                               ldlm_resource_prolong_cb, arg);
+       }
+
+       unlock_res(res);
+       ldlm_resource_putref(res);
+
+       EXIT;
+}
+EXPORT_SYMBOL(ldlm_resource_prolong);
+
+
+/**
  * Discard all AST work items from list.
  *
  * If for whatever reason we do not want to send ASTs to conflicting locks
  * Discard all AST work items from list.
  *
  * If for whatever reason we do not want to send ASTs to conflicting locks
index 10f8e8f..3e9cab4 100644 (file)
@@ -603,6 +603,13 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
        if (lock == NULL)
                RETURN(NULL);
 
        if (lock == NULL)
                RETURN(NULL);
 
+       if (lock->l_export != NULL && lock->l_export->exp_failed) {
+               CDEBUG(D_INFO, "lock export failed: lock %p, exp %p\n",
+                      lock, lock->l_export);
+               LDLM_LOCK_PUT(lock);
+               RETURN(NULL);
+       }
+
        /* It's unlikely but possible that someone marked the lock as
         * destroyed after we did handle2object on it */
        if ((flags == 0) && !ldlm_is_destroyed(lock)) {
        /* It's unlikely but possible that someone marked the lock as
         * destroyed after we did handle2object on it */
        if ((flags == 0) && !ldlm_is_destroyed(lock)) {
index 71ec9f7..05365a9 100644 (file)
@@ -2266,59 +2266,14 @@ static int ofd_quotactl(struct tgt_session_info *tsi)
  *
  * \retval             amount of time to extend the timeout with
  */
  *
  * \retval             amount of time to extend the timeout with
  */
-static inline int prolong_timeout(struct ptlrpc_request *req,
-                                 struct ldlm_lock *lock)
+static inline int prolong_timeout(struct ptlrpc_request *req)
 {
        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
 
        if (AT_OFF)
                return obd_timeout / 2;
 
 {
        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
 
        if (AT_OFF)
                return obd_timeout / 2;
 
-       /* We are in the middle of the process - BL AST is sent, CANCEL
-         is ahead. Take half of AT + IO process time. */
-       return at_est2timeout(at_get(&svcpt->scp_at_estimate)) +
-               (ldlm_bl_timeout(lock) >> 1);
-}
-
-/**
- * Prolong single lock timeout.
- *
- * This is supplemental function to the ofd_prolong_locks(). It prolongs
- * a single lock.
- *
- * \param[in] tsi      target session environment for this request
- * \param[in] lock     LDLM lock to prolong
- * \param[in] extent   related extent
- * \param[in] timeout  timeout value to add
- *
- * \retval             0 if lock is not suitable for prolongation
- * \retval             1 if lock was prolonged successfully
- */
-static int ofd_prolong_one_lock(struct tgt_session_info *tsi,
-                               struct ldlm_lock *lock,
-                               struct ldlm_extent *extent)
-{
-       int timeout = prolong_timeout(tgt_ses_req(tsi), lock);
-
-       if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
-               return 0;
-
-       /* XXX: never try to grab resource lock here because we're inside
-        * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
-        * res lock and then exp_bl_list_lock. */
-
-       if (!(lock->l_flags & LDLM_FL_AST_SENT))
-               /* ignore locks not being cancelled */
-               return 0;
-
-       LDLM_DEBUG(lock, "refreshed for req x"LPU64" ext("LPU64"->"LPU64") "
-                        "to %ds.\n", tgt_ses_req(tsi)->rq_xid, extent->start,
-                        extent->end, timeout);
-
-       /* OK. this is a possible lock the user holds doing I/O
-        * let's refresh eviction timer for it */
-       ldlm_refresh_waiting_lock(lock, timeout);
-       return 1;
+       return at_est2timeout(at_get(&svcpt->scp_at_estimate));
 }
 
 /**
 }
 
 /**
@@ -2341,25 +2296,26 @@ static int ofd_prolong_one_lock(struct tgt_session_info *tsi,
  * request may cover multiple locks.
  *
  * \param[in] tsi      target session environment for this request
  * request may cover multiple locks.
  *
  * \param[in] tsi      target session environment for this request
- * \param[in] start    start of extent
- * \param[in] end      end of extent
+ * \param[in] data     struct of data to prolong locks
  *
  *
- * \retval             number of prolonged locks
  */
  */
-static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
-                                   __u64 start, __u64 end)
+static void ofd_prolong_extent_locks(struct tgt_session_info *tsi,
+                                   struct ldlm_prolong_args *data)
 {
 {
-       struct obd_export       *exp = tsi->tsi_exp;
        struct obdo             *oa  = &tsi->tsi_ost_body->oa;
        struct obdo             *oa  = &tsi->tsi_ost_body->oa;
-       struct ldlm_extent       extent = {
-               .start = start,
-               .end = end
-       };
        struct ldlm_lock        *lock;
        struct ldlm_lock        *lock;
-       int                      lock_count = 0;
 
        ENTRY;
 
 
        ENTRY;
 
+       data->lpa_timeout = prolong_timeout(tgt_ses_req(tsi));
+       data->lpa_export = tsi->tsi_exp;
+       data->lpa_resid = tsi->tsi_resid;
+
+       CDEBUG(D_RPCTRACE, "Prolong locks for req %p with x"LPU64
+              " ext("LPU64"->"LPU64")\n", tgt_ses_req(tsi),
+              tgt_ses_req(tsi)->rq_xid, data->lpa_extent.start,
+              data->lpa_extent.end);
+
        if (oa->o_valid & OBD_MD_FLHANDLE) {
                /* mostly a request should be covered by only one lock, try
                 * fast path. */
        if (oa->o_valid & OBD_MD_FLHANDLE) {
                /* mostly a request should be covered by only one lock, try
                 * fast path. */
@@ -2367,42 +2323,21 @@ static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
                if (lock != NULL) {
                        /* Fast path to check if the lock covers the whole IO
                         * region exclusively. */
                if (lock != NULL) {
                        /* Fast path to check if the lock covers the whole IO
                         * region exclusively. */
-                       if (lock->l_granted_mode == LCK_PW &&
-                           ldlm_extent_contain(&lock->l_policy_data.l_extent,
-                                               &extent)) {
+                       if (ldlm_extent_contain(&lock->l_policy_data.l_extent,
+                                               &data->lpa_extent)) {
                                /* bingo */
                                /* bingo */
-                               LASSERT(lock->l_export == exp);
-                               lock_count = ofd_prolong_one_lock(tsi, lock,
-                                                                 &extent);
+                               LASSERT(lock->l_export == data->lpa_export);
+                               ldlm_lock_prolong_one(lock, data);
                                LDLM_LOCK_PUT(lock);
                                LDLM_LOCK_PUT(lock);
-                               RETURN(lock_count);
+                               RETURN_EXIT;
                        }
                        lock->l_last_used = cfs_time_current();
                        LDLM_LOCK_PUT(lock);
                }
        }
 
                        }
                        lock->l_last_used = cfs_time_current();
                        LDLM_LOCK_PUT(lock);
                }
        }
 
-       spin_lock_bh(&exp->exp_bl_list_lock);
-       list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
-               LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
-               LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
-
-               /* ignore waiting locks, no more granted locks in the list */
-               if (lock->l_granted_mode != lock->l_req_mode)
-                       break;
-
-               if (!ldlm_res_eq(&tsi->tsi_resid, &lock->l_resource->lr_name))
-                       continue;
-
-               if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
-                                        &extent))
-                       continue;
-
-               lock_count += ofd_prolong_one_lock(tsi, lock, &extent);
-       }
-       spin_unlock_bh(&exp->exp_bl_list_lock);
-
-       RETURN(lock_count);
+       ldlm_resource_prolong(data);
+       EXIT;
 }
 
 /**
 }
 
 /**
@@ -2449,8 +2384,10 @@ static int ofd_rw_hpreq_lock_match(struct ptlrpc_request *req,
        if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
                RETURN(0);
 
        if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
                RETURN(0);
 
-       /* a bulk write can only hold a reference on a PW extent lock */
-       mode = LCK_PW;
+       /* a bulk write can only hold a reference on a PW extent lock
+        * or GROUP lock.
+        */
+       mode = LCK_PW | LCK_GROUP;
        if (opc == OST_READ)
                /* whereas a bulk read can be protected by either a PR or PW
                 * extent lock */
        if (opc == OST_READ)
                /* whereas a bulk read can be protected by either a PR or PW
                 * extent lock */
@@ -2466,20 +2403,22 @@ static int ofd_rw_hpreq_lock_match(struct ptlrpc_request *req,
  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_check for OFD RW requests.
  *
  * Check for whether the given PTLRPC request (\a req) is blocking
  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_check for OFD RW requests.
  *
  * Check for whether the given PTLRPC request (\a req) is blocking
- * an LDLM lock cancel.
+ * an LDLM lock cancel. Also checks whether the request is covered by an LDLM
+ * lock.
  *
  * \param[in] req      the incoming request
  *
  * \retval             1 if \a req is blocking an LDLM lock cancel
  * \retval             0 if it is not
  *
  * \param[in] req      the incoming request
  *
  * \retval             1 if \a req is blocking an LDLM lock cancel
  * \retval             0 if it is not
+ * \retval             -ESTALE if lock is not found
  */
 static int ofd_rw_hpreq_check(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi;
        struct obd_ioobj        *ioo;
        struct niobuf_remote    *rnb;
  */
 static int ofd_rw_hpreq_check(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi;
        struct obd_ioobj        *ioo;
        struct niobuf_remote    *rnb;
-       __u64                    start, end;
-       int                      lock_count;
+       int opc;
+       struct ldlm_prolong_args pa = { 0 };
 
        ENTRY;
 
 
        ENTRY;
 
@@ -2491,6 +2430,9 @@ static int ofd_rw_hpreq_check(struct ptlrpc_request *req)
         * Use LASSERT below because malformed RPCs should have
         * been filtered out in tgt_hpreq_handler().
         */
         * Use LASSERT below because malformed RPCs should have
         * been filtered out in tgt_hpreq_handler().
         */
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       LASSERT(opc == OST_READ || opc == OST_WRITE);
+
        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
        LASSERT(ioo != NULL);
 
        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
        LASSERT(ioo != NULL);
 
@@ -2498,21 +2440,28 @@ static int ofd_rw_hpreq_check(struct ptlrpc_request *req)
        LASSERT(rnb != NULL);
        LASSERT(!(rnb->rnb_flags & OBD_BRW_SRVLOCK));
 
        LASSERT(rnb != NULL);
        LASSERT(!(rnb->rnb_flags & OBD_BRW_SRVLOCK));
 
-       start = rnb->rnb_offset;
+       pa.lpa_mode = LCK_PW | LCK_GROUP;
+       if (opc == OST_READ)
+               pa.lpa_mode |= LCK_PR;
+
+       pa.lpa_extent.start = rnb->rnb_offset;
        rnb += ioo->ioo_bufcnt - 1;
        rnb += ioo->ioo_bufcnt - 1;
-       end = rnb->rnb_offset + rnb->rnb_len - 1;
+       pa.lpa_extent.end = rnb->rnb_offset + rnb->rnb_len - 1;
 
        DEBUG_REQ(D_RPCTRACE, req, "%s %s: refresh rw locks: "DFID
 
        DEBUG_REQ(D_RPCTRACE, req, "%s %s: refresh rw locks: "DFID
-                                  " ("LPU64"->"LPU64")\n",
-                 tgt_name(tsi->tsi_tgt), current->comm,
-                 PFID(&tsi->tsi_fid), start, end);
+                 " ("LPU64"->"LPU64")\n", tgt_name(tsi->tsi_tgt),
+                 current->comm, PFID(&tsi->tsi_fid), pa.lpa_extent.start,
+                 pa.lpa_extent.end);
 
 
-       lock_count = ofd_prolong_extent_locks(tsi, start, end);
+       ofd_prolong_extent_locks(tsi, &pa);
 
        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
 
        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
-              tgt_name(tsi->tsi_tgt), lock_count, req);
+              tgt_name(tsi->tsi_tgt), pa.lpa_blocks_cnt, req);
 
 
-       RETURN(lock_count > 0);
+       if (pa.lpa_blocks_cnt > 0)
+               RETURN(1);
+
+       RETURN(pa.lpa_locks_cnt > 0 ? 0 : -ESTALE);
 }
 
 /**
 }
 
 /**
@@ -2582,18 +2531,22 @@ static int ofd_punch_hpreq_lock_match(struct ptlrpc_request *req,
  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_check for OST_PUNCH request.
  *
  * High-priority queue request check for whether the given punch request
  * Implementation of ptlrpc_hpreq_ops::hpreq_lock_check for OST_PUNCH request.
  *
  * High-priority queue request check for whether the given punch request
- * (\a req) is blocking an LDLM lock cancel.
+ * (\a req) is blocking an LDLM lock cancel. Also checks whether the request is
+ * covered by an LDLM lock.
+ *
+
  *
  * \param[in] req      the incoming request
  *
  * \retval             1 if \a req is blocking an LDLM lock cancel
  * \retval             0 if it is not
  *
  * \param[in] req      the incoming request
  *
  * \retval             1 if \a req is blocking an LDLM lock cancel
  * \retval             0 if it is not
+ * \retval             -ESTALE if lock is not found
  */
 static int ofd_punch_hpreq_check(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi;
        struct obdo             *oa;
  */
 static int ofd_punch_hpreq_check(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi;
        struct obdo             *oa;
-       int                      lock_count;
+       struct ldlm_prolong_args pa = { 0 };
 
        ENTRY;
 
 
        ENTRY;
 
@@ -2606,17 +2559,24 @@ static int ofd_punch_hpreq_check(struct ptlrpc_request *req)
        LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS &&
                  oa->o_flags & OBD_FL_SRVLOCK));
 
        LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS &&
                  oa->o_flags & OBD_FL_SRVLOCK));
 
+       pa.lpa_mode = LCK_PW | LCK_GROUP;
+       pa.lpa_extent.start = oa->o_size;
+       pa.lpa_extent.end   = oa->o_blocks;
+
        CDEBUG(D_DLMTRACE,
               "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
               tgt_name(tsi->tsi_tgt), tsi->tsi_resid.name[0],
        CDEBUG(D_DLMTRACE,
               "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
               tgt_name(tsi->tsi_tgt), tsi->tsi_resid.name[0],
-              tsi->tsi_resid.name[1], oa->o_size, oa->o_blocks);
+              tsi->tsi_resid.name[1], pa.lpa_extent.start, pa.lpa_extent.end);
 
 
-       lock_count = ofd_prolong_extent_locks(tsi, oa->o_size, oa->o_blocks);
+       ofd_prolong_extent_locks(tsi, &pa);
 
        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
 
        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
-              tgt_name(tsi->tsi_tgt), lock_count, req);
+              tgt_name(tsi->tsi_tgt), pa.lpa_blocks_cnt, req);
+
+       if (pa.lpa_blocks_cnt > 0)
+               RETURN(1);
 
 
-       RETURN(lock_count > 0);
+       RETURN(pa.lpa_locks_cnt > 0 ? 0 : -ESTALE);
 }
 
 /**
 }
 
 /**
@@ -2667,7 +2627,9 @@ static void ofd_hp_brw(struct tgt_session_info *tsi)
                LASSERT(rnb != NULL); /* must exist after request preprocessing */
 
                /* no high priority if server lock is needed */
                LASSERT(rnb != NULL); /* must exist after request preprocessing */
 
                /* no high priority if server lock is needed */
-               if (rnb->rnb_flags & OBD_BRW_SRVLOCK)
+               if (rnb->rnb_flags & OBD_BRW_SRVLOCK ||
+                   (lustre_msg_get_flags(tgt_ses_req(tsi)->rq_reqmsg)
+                    & MSG_REPLAY))
                        return;
        }
        tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_rw;
                        return;
        }
        tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_rw;
@@ -2686,8 +2648,10 @@ static void ofd_hp_punch(struct tgt_session_info *tsi)
 {
        LASSERT(tsi->tsi_ost_body != NULL); /* must exists if we are here */
        /* no high-priority if server lock is needed */
 {
        LASSERT(tsi->tsi_ost_body != NULL); /* must exists if we are here */
        /* no high-priority if server lock is needed */
-       if (tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
-           tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK)
+       if ((tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
+            tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK) ||
+           tgt_conn_flags(tsi) & OBD_CONNECT_MDS ||
+           lustre_msg_get_flags(tgt_ses_req(tsi)->rq_reqmsg) & MSG_REPLAY)
                return;
        tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_punch;
 }
                return;
        tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_punch;
 }
index 5161287..4e9cf07 100644 (file)
@@ -1904,6 +1904,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
                  page_count, aa, cli->cl_r_in_flight,
                  cli->cl_w_in_flight);
        DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
                  page_count, aa, cli->cl_r_in_flight,
                  cli->cl_w_in_flight);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, 4);
 
        ptlrpcd_add_req(req);
        rc = 0;
 
        ptlrpcd_add_req(req);
        rc = 0;
index 40077f0..e86c382 100644 (file)
@@ -1583,7 +1583,14 @@ static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
                 * list otherwise it may hit swab race at LU-1044. */
                if (req->rq_ops->hpreq_check != NULL) {
                        rc = req->rq_ops->hpreq_check(req);
                 * list otherwise it may hit swab race at LU-1044. */
                if (req->rq_ops->hpreq_check != NULL) {
                        rc = req->rq_ops->hpreq_check(req);
-                       LASSERT(rc <= 1); /* can only return error, 0, or 1 */
+                       if (rc == -ESTALE) {
+                               req->rq_status = rc;
+                               ptlrpc_error(req);
+                       }
+                       /** can only return error,
+                        * 0 for normal request,
+                        *  or 1 for high priority request */
+                       LASSERT(rc <= 1);
                }
        }
 
                }
        }
 
index 5d6a40d..0d2cfa7 100755 (executable)
@@ -2496,6 +2496,31 @@ test_132() {
 }
 run_test 132 "long punch"
 
 }
 run_test 132 "long punch"
 
+test_131() {
+       remote_ost_nodsh && skip "remote OST with nodsh" && return 0
+
+       rm -f $DIR/$tfile
+       # get a lock on client so that export would reach the stale list
+       $SETSTRIPE -i 0 $DIR/$tfile || error "setstripe failed"
+       dd if=/dev/zero of=$DIR/$tfile count=1 || error "dd failed"
+
+       # another IO under the same lock
+       #define OBD_FAIL_OSC_DELAY_IO            0x414
+       $LCTL set_param fail_loc=0x80000414
+       dd if=/dev/zero of=$DIR/$tfile count=1 conv=notrunc oflag=dsync &
+       local pid=$!
+       sleep 1
+
+       #define OBD_FAIL_LDLM_BL_EVICT           0x31e
+       set_nodes_failloc "$(osts_nodes)" 0x8000031e
+       ost_evict_client
+       client_reconnect
+
+       wait $pid && error "dd succeeded"
+       return 0
+}
+run_test 131 "IO vs evict results to IO under staled lock"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status