Whamcloud - gitweb
LU-1095 debug: clean up console messages
[fs/lustre-release.git] / lustre / target / tgt_handler.c
index 6889115..5a837a6 100644 (file)
@@ -37,6 +37,7 @@
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_cksum.h>
+#include <md_object.h>
 
 #include "tgt_internal.h"
 
@@ -107,6 +108,9 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        } else {
                rc = PTR_ERR(obj);
        }
+
+       tsi->tsi_fid = body->fid1;
+
        RETURN(rc);
 }
 
@@ -127,53 +131,115 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
  */
 int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa)
 {
-       int rc;
-
+       struct ost_id   *oi     = &oa->o_oi;
+       obd_seq          seq    = ostid_seq(oi);
+       obd_id           id     = ostid_id(oi);
+       int              rc;
        ENTRY;
 
        if (unlikely(!(exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_FID) &&
-                    fid_seq_is_echo(oa->o_oi.oi.oi_seq))) {
+                    fid_seq_is_echo(seq))) {
                /* Sigh 2.[123] client still sends echo req with oi_id = 0
                 * during create, and we will reset this to 1, since this
                 * oi_id is basically useless in the following create process,
                 * but oi_id == 0 will make it difficult to tell whether it is
                 * real FID or ost_id. */
-               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
-               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
-               oa->o_oi.oi_fid.f_ver = 0;
+               oi->oi_fid.f_seq = FID_SEQ_ECHO;
+               oi->oi_fid.f_oid = id ?: 1;
+               oi->oi_fid.f_ver = 0;
        } else {
-               if (unlikely((oa->o_valid & OBD_MD_FLID &&
-                             ostid_id(&oa->o_oi) == 0)))
+               struct lu_fid *fid = &tsi->tsi_fid2;
+
+               if (unlikely((oa->o_valid & OBD_MD_FLID) && id == 0))
                        GOTO(out, rc = -EPROTO);
 
                /* Note: this check might be forced in 2.5 or 2.6, i.e.
                 * all of the requests are required to setup FLGROUP */
                if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
-                       ostid_set_seq_mdt0(&oa->o_oi);
+                       ostid_set_seq_mdt0(oi);
                        oa->o_valid |= OBD_MD_FLGROUP;
+                       seq = ostid_seq(oi);
                }
 
-               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
+               if (unlikely(!(fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq) ||
+                              fid_seq_is_norm(seq) || fid_seq_is_echo(seq))))
                        GOTO(out, rc = -EPROTO);
+
+               rc = ostid_to_fid(fid, oi, tsi->tsi_tgt->lut_lsd.lsd_osd_index);
+               if (unlikely(rc != 0))
+                       GOTO(out, rc);
+
+               oi->oi_fid = *fid;
        }
+
        RETURN(0);
+
 out:
        CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
               tgt_name(tsi->tsi_tgt), obd_export_nid2str(tsi->tsi_exp),
-              ostid_seq(&oa->o_oi), ostid_id(&oa->o_oi), rc);
+              seq, id, rc);
        return rc;
 }
 EXPORT_SYMBOL(tgt_validate_obdo);
 
+static int tgt_io_data_unpack(struct tgt_session_info *tsi, struct ost_id *oi)
+{
+       unsigned                 max_brw;
+       struct niobuf_remote    *rnb;
+       struct obd_ioobj        *ioo;
+       int                      obj_count;
+
+       ENTRY;
+
+       ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+       if (ioo == NULL)
+               RETURN(-EPROTO);
+
+       rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
+       if (rnb == NULL)
+               RETURN(-EPROTO);
+
+       max_brw = ioobj_max_brw_get(ioo);
+       if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+               CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+                      ": rc = %d\n", tgt_name(tsi->tsi_tgt),
+                      obd_export_nid2str(tsi->tsi_exp), max_brw,
+                      POSTID(oi), -EPROTO);
+               RETURN(-EPROTO);
+       }
+       ioo->ioo_oid = *oi;
+
+       obj_count = req_capsule_get_size(tsi->tsi_pill, &RMF_OBD_IOOBJ,
+                                       RCL_CLIENT) / sizeof(*ioo);
+       if (obj_count == 0) {
+               CERROR("%s: short ioobj\n", tgt_name(tsi->tsi_tgt));
+               RETURN(-EPROTO);
+       } else if (obj_count > 1) {
+               CERROR("%s: too many ioobjs (%d)\n", tgt_name(tsi->tsi_tgt),
+                      obj_count);
+               RETURN(-EPROTO);
+       }
+
+       if (ioo->ioo_bufcnt == 0) {
+               CERROR("%s: ioo has zero bufcnt\n", tgt_name(tsi->tsi_tgt));
+               RETURN(-EPROTO);
+       }
+
+       if (ioo->ioo_bufcnt > PTLRPC_MAX_BRW_PAGES) {
+               DEBUG_REQ(D_RPCTRACE, tgt_ses_req(tsi),
+                         "bulk has too many pages (%d)",
+                         ioo->ioo_bufcnt);
+               RETURN(-EPROTO);
+       }
+
+       RETURN(0);
+}
+
 static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
 {
        struct ost_body         *body;
        struct req_capsule      *pill = tsi->tsi_pill;
        struct lustre_capa      *capa;
-       struct obd_ioobj        *ioo;
        int                      rc;
 
        ENTRY;
@@ -187,7 +253,7 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
                RETURN(rc);
 
        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-               capa = req_capsule_client_get(tsi->tsi_pill, &RMF_CAPA1);
+               capa = req_capsule_client_get(pill, &RMF_CAPA1);
                if (capa == NULL) {
                        CERROR("%s: OSSCAPA flag is set without capability\n",
                               tgt_name(tsi->tsi_tgt));
@@ -196,28 +262,12 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        }
 
        tsi->tsi_ost_body = body;
+       tsi->tsi_fid = body->oa.o_oi.oi_fid;
 
        if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) {
-               unsigned                 max_brw;
-               struct niobuf_remote    *rnb;
-
-               ioo = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
-               if (ioo == NULL)
-                       RETURN(-EPROTO);
-
-               rnb = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
-               if (rnb == NULL)
-                       RETURN(-EPROTO);
-
-               max_brw = ioobj_max_brw_get(ioo);
-               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
-                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
-                              ": rc = %d\n", tgt_name(tsi->tsi_tgt),
-                              obd_export_nid2str(tsi->tsi_exp), max_brw,
-                              POSTID(&body->oa.o_oi), -EPROTO);
-                       RETURN(-EPROTO);
-               }
-               ioo->ioo_oid = body->oa.o_oi;
+               rc = tgt_io_data_unpack(tsi, &body->oa.o_oi);
+               if (rc < 0)
+                       RETURN(rc);
        }
 
        if (!(body->oa.o_valid & OBD_MD_FLID)) {
@@ -231,16 +281,6 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
                }
        }
 
-       rc = ostid_to_fid(&tsi->tsi_fid, &body->oa.o_oi, 0);
-       if (rc != 0)
-               RETURN(rc);
-
-       if (!fid_is_sane(&tsi->tsi_fid)) {
-               CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
-                      PFID(&tsi->tsi_fid));
-               RETURN(-EINVAL);
-       }
-
        ost_fid_build_resid(&tsi->tsi_fid, &tsi->tsi_resid);
 
        /*
@@ -251,32 +291,72 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        RETURN(rc);
 }
 
-static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
+/*
+ * Do necessary preprocessing according to handler ->th_flags.
+ */
+static int tgt_request_preprocess(struct tgt_session_info *tsi,
+                                 struct tgt_handler *h,
+                                 struct ptlrpc_request *req)
 {
        struct req_capsule      *pill = tsi->tsi_pill;
-       int                      rc;
+       __u32                    flags = h->th_flags;
+       int                      rc = 0;
 
        ENTRY;
 
-       if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
-               rc = tgt_mdt_body_unpack(tsi, flags);
-       } else if (req_capsule_has_field(pill, &RMF_OST_BODY, RCL_CLIENT)) {
-               rc = tgt_ost_body_unpack(tsi, flags);
-       } else {
-               rc = 0;
+       if (tsi->tsi_preprocessed)
+               RETURN(0);
+
+       LASSERT(h->th_act != NULL);
+       LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
+       LASSERT(current->journal_info == NULL);
+
+       LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
+                    h->th_fmt != NULL));
+       if (h->th_fmt != NULL) {
+               req_capsule_set(pill, h->th_fmt);
+               if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
+                       rc = tgt_mdt_body_unpack(tsi, flags);
+                       if (rc < 0)
+                               RETURN(rc);
+               } else if (req_capsule_has_field(pill, &RMF_OST_BODY,
+                                                RCL_CLIENT)) {
+                       rc = tgt_ost_body_unpack(tsi, flags);
+                       if (rc < 0)
+                               RETURN(rc);
+               }
        }
 
-       if (rc == 0 && flags & HABEO_REFERO) {
-               /* Pack reply */
-               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
-                       req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                            tsi->tsi_mdt_body->eadatasize);
-               if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
-                       req_capsule_set_size(pill, &RMF_LOGCOOKIES,
-                                            RCL_SERVER, 0);
+       if (flags & MUTABOR && tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
 
-               rc = req_capsule_server_pack(pill);
+       if (flags & HABEO_CLAVIS) {
+               struct ldlm_request *dlm_req;
+
+               LASSERT(h->th_fmt != NULL);
+
+               dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
+               if (dlm_req != NULL) {
+                       if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
+                                    LDLM_IBITS &&
+                                    dlm_req->lock_desc.l_policy_data.\
+                                    l_inodebits.bits == 0)) {
+                               /*
+                                * Lock without inodebits makes no sense and
+                                * will oops later in ldlm. If client miss to
+                                * set such bits, do not trigger ASSERTION.
+                                *
+                                * For liblustre flock case, it maybe zero.
+                                */
+                               rc = -EPROTO;
+                       } else {
+                               tsi->tsi_dlm_req = dlm_req;
+                       }
+               } else {
+                       rc = -EFAULT;
+               }
        }
+       tsi->tsi_preprocessed = 1;
        RETURN(rc);
 }
 
@@ -291,14 +371,9 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
 {
        int      serious = 0;
        int      rc;
-       __u32    flags;
 
        ENTRY;
 
-       LASSERT(h->th_act != NULL);
-       LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
-       LASSERT(current->journal_info == NULL);
-
        /*
         * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
         * to put same checks into handlers like mdt_close(), mdt_reint(),
@@ -313,44 +388,21 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
        if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
                RETURN(0);
 
-       rc = 0;
-       flags = h->th_flags;
-       LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
-                    h->th_fmt != NULL));
-       if (h->th_fmt != NULL) {
-               req_capsule_set(tsi->tsi_pill, h->th_fmt);
-               rc = tgt_unpack_req_pack_rep(tsi, flags);
-       }
-
-       if (rc == 0 && flags & MUTABOR &&
-           tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
-               rc = -EROFS;
-
-       if (rc == 0 && flags & HABEO_CLAVIS) {
-               struct ldlm_request *dlm_req;
-
-               LASSERT(h->th_fmt != NULL);
+       rc = tgt_request_preprocess(tsi, h, req);
+       /* pack reply if reply format is fixed */
+       if (rc == 0 && h->th_flags & HABEO_REFERO) {
+               /* Pack reply */
+               if (req_capsule_has_field(tsi->tsi_pill, &RMF_MDT_MD,
+                                         RCL_SERVER))
+                       req_capsule_set_size(tsi->tsi_pill, &RMF_MDT_MD,
+                                            RCL_SERVER,
+                                            tsi->tsi_mdt_body->eadatasize);
+               if (req_capsule_has_field(tsi->tsi_pill, &RMF_LOGCOOKIES,
+                                         RCL_SERVER))
+                       req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES,
+                                            RCL_SERVER, 0);
 
-               dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
-               if (dlm_req != NULL) {
-                       if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
-                                    LDLM_IBITS &&
-                                    dlm_req->lock_desc.l_policy_data.\
-                                    l_inodebits.bits == 0)) {
-                               /*
-                                * Lock without inodebits makes no sense and
-                                * will oops later in ldlm. If client miss to
-                                * set such bits, do not trigger ASSERTION.
-                                *
-                                * For liblustre flock case, it maybe zero.
-                                */
-                               rc = -EPROTO;
-                       } else {
-                               tsi->tsi_dlm_req = dlm_req;
-                       }
-               } else {
-                       rc = -EFAULT;
-               }
+               rc = req_capsule_server_pack(tsi->tsi_pill);
        }
 
        if (likely(rc == 0)) {
@@ -488,13 +540,45 @@ int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
        RETURN(+1);
 }
 
+/* Initial check for request, it is validation mostly */
+static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req)
+{
+       struct tgt_handler      *h;
+       struct tgt_opc_slice    *s;
+       struct lu_target        *tgt;
+       __u32                    opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+       ENTRY;
+
+       tgt = class_exp2tgt(req->rq_export);
+
+       for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
+               if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
+                       break;
+
+       /* opcode was not found in slice */
+       if (unlikely(s->tos_hs == NULL)) {
+               CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt),
+                      opc);
+               RETURN(ERR_PTR(-ENOTSUPP));
+       }
+
+       LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
+       h = s->tos_hs + (opc - s->tos_opc_start);
+       if (unlikely(h->th_opc == 0)) {
+               CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
+               RETURN(ERR_PTR(-ENOTSUPP));
+       }
+
+       RETURN(h);
+}
+
 int tgt_request_handle(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
 
        struct lustre_msg       *msg = req->rq_reqmsg;
        struct tgt_handler      *h;
-       struct tgt_opc_slice    *s;
        struct lu_target        *tgt;
        int                      request_fail_id = 0;
        __u32                    opc = lustre_msg_get_opc(msg);
@@ -543,14 +627,9 @@ int tgt_request_handle(struct ptlrpc_request *req)
        request_fail_id = tgt->lut_request_fail_id;
        tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
 
-       for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
-               if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
-                       break;
-
-       /* opcode was not found in slice */
-       if (unlikely(s->tos_hs == NULL)) {
-               CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
-               req->rq_status = -ENOTSUPP;
+       h = tgt_handler_find_check(req);
+       if (IS_ERR(h)) {
+               req->rq_status = PTR_ERR(h);
                rc = ptlrpc_error(req);
                GOTO(out, rc);
        }
@@ -558,17 +637,6 @@ int tgt_request_handle(struct ptlrpc_request *req)
        if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
                GOTO(out, rc = 0);
 
-       LASSERT(current->journal_info == NULL);
-
-       LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
-       h = s->tos_hs + (opc - s->tos_opc_start);
-       if (unlikely(h->th_opc == 0)) {
-               CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
-               req->rq_status = -ENOTSUPP;
-               rc = ptlrpc_error(req);
-               GOTO(out, rc);
-       }
-
        rc = lustre_msg_check_version(msg, h->th_version);
        if (unlikely(rc)) {
                DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
@@ -590,20 +658,48 @@ int tgt_request_handle(struct ptlrpc_request *req)
        EXIT;
 out:
        req_capsule_fini(tsi->tsi_pill);
-       tsi->tsi_pill = NULL;
        if (tsi->tsi_corpus != NULL) {
                lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
                tsi->tsi_corpus = NULL;
        }
-       tsi->tsi_env = NULL;
-       tsi->tsi_mdt_body = NULL;
-       tsi->tsi_dlm_req = NULL;
-       fid_zero(&tsi->tsi_fid);
-       memset(&tsi->tsi_resid, 0, sizeof tsi->tsi_resid);
        return rc;
 }
 EXPORT_SYMBOL(tgt_request_handle);
 
+/** Assign high priority operations to the request if needed. */
+int tgt_hpreq_handler(struct ptlrpc_request *req)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+       struct tgt_handler      *h;
+       int                      rc;
+
+       ENTRY;
+
+       if (req->rq_export == NULL)
+               RETURN(0);
+
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       tsi->tsi_pill = &req->rq_pill;
+       tsi->tsi_env = req->rq_svc_thread->t_env;
+       tsi->tsi_tgt = class_exp2tgt(req->rq_export);
+       tsi->tsi_exp = req->rq_export;
+
+       h = tgt_handler_find_check(req);
+       if (IS_ERR(h)) {
+               rc = PTR_ERR(h);
+               RETURN(rc);
+       }
+
+       rc = tgt_request_preprocess(tsi, h, req);
+       if (unlikely(rc != 0))
+               RETURN(rc);
+
+       if (h->th_hp != NULL)
+               h->th_hp(tsi);
+       RETURN(0);
+}
+EXPORT_SYMBOL(tgt_hpreq_handler);
+
 void tgt_counter_incr(struct obd_export *exp, int opcode)
 {
        lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
@@ -1073,7 +1169,8 @@ int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                if (unlikely(rc != 0))
                        RETURN(rc);
 
-               ost_fid_from_resid(&fid, &lock->l_resource->lr_name);
+               ost_fid_from_resid(&fid, &lock->l_resource->lr_name,
+                                  tgt->lut_lsd.lsd_osd_index);
                obj = dt_locate(&env, tgt->lut_bottom, &fid);
                if (IS_ERR(obj))
                        GOTO(err_env, rc = PTR_ERR(obj));
@@ -1083,8 +1180,11 @@ int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                rc = tgt_sync(&env, tgt, obj);
                if (rc < 0) {
-                       CERROR("%s: sync failed on lock cancel: rc = %d\n",
-                              tgt_name(tgt), rc);
+                       CERROR("%s: syncing "DFID" ("LPU64"-"LPU64") on lock "
+                              "cancel: rc = %d\n",
+                              tgt_name(tgt), PFID(&fid),
+                              lock->l_policy_data.l_extent.start,
+                              lock->l_policy_data.l_extent.end, rc);
                }
 err_put:
                lu_object_put(&env, &obj->do_lu);
@@ -1493,15 +1593,16 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
                RETURN(-EIO);
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
+                        cfs_fail_val : (obd_timeout + 1) / 4);
 
        /* Check if there is eviction in progress, and if so, wait for it to
         * finish */
-       if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+       if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
                /* We do not care how long it takes */
                lwi = LWI_INTR(NULL, NULL);
                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
-                        !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
+                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),
                         &lwi);
        }
 
@@ -1737,7 +1838,8 @@ int tgt_brw_write(struct tgt_session_info *tsi)
                RETURN(err_serious(-EFAULT));
 
        /* pause before transaction has been started */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
+                        cfs_fail_val : (obd_timeout + 1) / 4);
 
        /* There must be big cache in current thread to process this request
         * if it is NULL then something went wrong and it wasn't allocated,