Whamcloud - gitweb
LU-6655 ptlrpc: skip delayed replay requests
[fs/lustre-release.git] / lustre / target / tgt_handler.c
index b81882c..24e6936 100644 (file)
@@ -21,7 +21,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2013, 2016, Intel Corporation.
+ * Copyright (c) 2013, 2017, Intel Corporation.
  */
 /*
  * lustre/target/tgt_handler.c
@@ -661,6 +661,19 @@ static int process_req_last_xid(struct ptlrpc_request *req)
                        RETURN(-EPROTO);
        }
 
+       /* The "last_xid" is the minimum xid among unreplied requests,
+        * if the request is from the previous connection, its xid can
+        * still be larger than "exp_last_xid", then the above check of
+        * xid is not enough to determine whether the request is delayed.
+        *
+        * For example, if some replay request was delayed and caused
+        * timeout at client and the replay is restarted, the delayed
+        * replay request will have the larger xid than "exp_last_xid"
+        */
+       if (req->rq_export->exp_conn_cnt >
+           lustre_msg_get_conn_cnt(req->rq_reqmsg))
+               RETURN(-ESTALE);
+
        /* try to release in-memory reply data */
        if (tgt_is_multimodrpcs_client(req->rq_export)) {
                tgt_handle_received_xid(req->rq_export,
@@ -687,6 +700,19 @@ int tgt_request_handle(struct ptlrpc_request *req)
        bool                     is_connect = false;
        ENTRY;
 
+       if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_RECOVERY_REQ_RACE))) {
+               if (cfs_fail_val == 0 &&
+                   lustre_msg_get_opc(msg) != OBD_PING &&
+                   lustre_msg_get_flags(msg) & MSG_REQ_REPLAY_DONE) {
+                       struct l_wait_info lwi =  { 0 };
+
+                       cfs_fail_val = 1;
+                       cfs_race_state = 0;
+                       l_wait_event(cfs_race_waitq, (cfs_race_state == 1),
+                                    &lwi);
+               }
+       }
+
        /* Refill the context, to make sure all thread keys are allocated */
        lu_env_refill(req->rq_svc_thread->t_env);
 
@@ -1591,7 +1617,9 @@ EXPORT_SYMBOL(tgt_io_thread_done);
 int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
                      struct lustre_handle *lh, int mode, __u64 *flags)
 {
-       union ldlm_policy_data policy;
+       union ldlm_policy_data policy = {
+               .l_inodebits.bits = MDS_INODELOCK_DOM,
+       };
        int rc;
 
        ENTRY;
@@ -1600,9 +1628,6 @@ int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
        LASSERT(ns != NULL);
        LASSERT(!lustre_handle_is_used(lh));
 
-       policy.l_inodebits.bits = MDS_INODELOCK_DOM | MDS_INODELOCK_UPDATE;
-       policy.l_inodebits.try_bits = 0;
-
        rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy, mode,
                                    flags, ldlm_blocking_ast,
                                    ldlm_completion_ast, ldlm_glimpse_ast,
@@ -1612,6 +1637,13 @@ int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
 }
 EXPORT_SYMBOL(tgt_mdt_data_lock);
 
+void tgt_mdt_data_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
+{
+       LASSERT(lustre_handle_is_used(lh));
+       ldlm_lock_decref(lh, mode);
+}
+EXPORT_SYMBOL(tgt_mdt_data_unlock);
+
 /**
  * Helper function for getting server side [start, start+count] DLM lock
  * if asked by client.
@@ -1706,15 +1738,15 @@ static void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
                tgt_extent_unlock(lh, mode);
        EXIT;
 }
-static __u32 tgt_checksum_niobuf(struct lu_target *tgt,
+static int tgt_checksum_niobuf(struct lu_target *tgt,
                                 struct niobuf_local *local_nb, int npages,
-                                int opc, enum cksum_types cksum_type)
+                                int opc, enum cksum_types cksum_type,
+                                __u32 *cksum)
 {
        struct cfs_crypto_hash_desc     *hdesc;
        unsigned int                    bufsize;
        int                             i, err;
        unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
-       __u32                           cksum;
 
        hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
        if (IS_ERR(hdesc)) {
@@ -1789,10 +1821,10 @@ static __u32 tgt_checksum_niobuf(struct lu_target *tgt,
                }
        }
 
-       bufsize = sizeof(cksum);
-       err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+       bufsize = sizeof(*cksum);
+       err = cfs_crypto_hash_final(hdesc, (unsigned char *)cksum, &bufsize);
 
-       return cksum;
+       return 0;
 }
 
 char dbgcksum_file_name[PATH_MAX];
@@ -1805,7 +1837,6 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
        int rc, i;
        unsigned int len;
        char *buf;
-       mm_segment_t oldfs;
 
        /* will only keep dump of pages on first error for the same range in
         * file/fid, not during the resends/retries. */
@@ -1834,14 +1865,11 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
                return;
        }
 
-       oldfs = get_fs();
-       set_fs(KERNEL_DS);
        for (i = 0; i < count; i++) {
                len = local_nb[i].lnb_len;
                buf = kmap(local_nb[i].lnb_page);
                while (len != 0) {
-                       rc = vfs_write(filp, (__force const char __user *)buf,
-                                      len, &filp->f_pos);
+                       rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
                        if (rc < 0) {
                                CERROR("%s: wanted to write %u but got %d "
                                       "error\n", dbgcksum_file_name, len, rc);
@@ -1854,7 +1882,6 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
                }
                kunmap(local_nb[i].lnb_page);
        }
-       set_fs(oldfs);
 
        rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
        if (rc)
@@ -1865,7 +1892,7 @@ static void dump_all_bulk_pages(struct obdo *oa, int count,
 
 static int check_read_checksum(struct niobuf_local *local_nb, int npages,
                               struct obd_export *exp, struct obdo *oa,
-                              const lnet_process_id_t *peer,
+                              const struct lnet_process_id *peer,
                               __u32 client_cksum, __u32 server_cksum,
                               enum cksum_types server_cksum_type)
 {
@@ -2078,9 +2105,12 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
                repbody->oa.o_flags = cksum_type_pack(cksum_type);
                repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-               repbody->oa.o_cksum = tgt_checksum_niobuf(tsi->tsi_tgt,
-                                                        local_nb, npages_read,
-                                                        OST_READ, cksum_type);
+               rc = tgt_checksum_niobuf(tsi->tsi_tgt, local_nb,
+                                        npages_read, OST_READ, cksum_type,
+                                        &repbody->oa.o_cksum);
+               if (rc < 0)
+                       GOTO(out_commitrw, rc);
+
                CDEBUG(D_PAGE, "checksum at read origin: %x\n",
                       repbody->oa.o_cksum);
 
@@ -2420,10 +2450,12 @@ skip_transfer:
                repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
                repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
                repbody->oa.o_flags |= cksum_type_pack(cksum_type);
-               repbody->oa.o_cksum = tgt_checksum_niobuf(tsi->tsi_tgt,
-                                                         local_nb, npages,
-                                                         OST_WRITE,
-                                                         cksum_type);
+               rc = tgt_checksum_niobuf(tsi->tsi_tgt, local_nb,
+                                        npages, OST_WRITE, cksum_type,
+                                        &repbody->oa.o_cksum);
+               if (rc < 0)
+                       GOTO(out_commitrw, rc);
+
                cksum_counter++;
 
                if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) {
@@ -2442,6 +2474,7 @@ skip_transfer:
                }
        }
 
+out_commitrw:
        /* Must commit after prep above in all cases */
        rc = obd_commitrw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
                          objcount, ioo, remote_nb, npages, local_nb, rc);