Whamcloud - gitweb
LU-6070 libcfs: provide separate buffers for libcfs_*2str()
[fs/lustre-release.git] / lustre / target / tgt_handler.c
index 6889115..f0f1ed6 100644 (file)
@@ -21,7 +21,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * lustre/target/tgt_handler.c
@@ -37,6 +37,9 @@
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_cksum.h>
+#include <md_object.h>
+#include <lustre_lfsck.h>
+#include <lustre_nodemap.h>
 
 #include "tgt_internal.h"
 
@@ -74,29 +77,29 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
 
        tsi->tsi_mdt_body = body;
 
-       if (!(body->valid & OBD_MD_FLID))
+       if (!(body->mbo_valid & OBD_MD_FLID))
                RETURN(0);
 
        /* mdc_pack_body() doesn't check if fid is zero and set OBD_ML_FID
         * in any case in pre-2.5 clients. Fix that here if needed */
-       if (unlikely(fid_is_zero(&body->fid1)))
+       if (unlikely(fid_is_zero(&body->mbo_fid1)))
                RETURN(0);
 
-       if (!fid_is_sane(&body->fid1)) {
+       if (!fid_is_sane(&body->mbo_fid1)) {
                CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
-                      PFID(&body->fid1));
+                      PFID(&body->mbo_fid1));
                RETURN(-EINVAL);
        }
 
        obj = lu_object_find(tsi->tsi_env,
                             &tsi->tsi_tgt->lut_bottom->dd_lu_dev,
-                            &body->fid1, NULL);
+                            &body->mbo_fid1, NULL);
        if (!IS_ERR(obj)) {
                if ((flags & HABEO_CORPUS) && !lu_object_exists(obj)) {
                        lu_object_put(tsi->tsi_env, obj);
                        /* for capability renew ENOENT will be handled in
                         * mdt_renew_capa */
-                       if (body->valid & OBD_MD_FLOSSCAPA)
+                       if (body->mbo_valid & OBD_MD_FLOSSCAPA)
                                rc = 0;
                        else
                                rc = -ENOENT;
@@ -107,6 +110,9 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        } else {
                rc = PTR_ERR(obj);
        }
+
+       tsi->tsi_fid = body->mbo_fid1;
+
        RETURN(rc);
 }
 
@@ -127,53 +133,117 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
  */
 int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa)
 {
-       int rc;
-
+       struct ost_id   *oi     = &oa->o_oi;
+       u64              seq    = ostid_seq(oi);
+       u64              id     = ostid_id(oi);
+       int              rc;
        ENTRY;
 
        if (unlikely(!(exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_FID) &&
-                    fid_seq_is_echo(oa->o_oi.oi.oi_seq))) {
+                    fid_seq_is_echo(seq))) {
                /* Sigh 2.[123] client still sends echo req with oi_id = 0
                 * during create, and we will reset this to 1, since this
                 * oi_id is basically useless in the following create process,
                 * but oi_id == 0 will make it difficult to tell whether it is
                 * real FID or ost_id. */
-               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
-               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
-               oa->o_oi.oi_fid.f_ver = 0;
+               oi->oi_fid.f_seq = FID_SEQ_ECHO;
+               oi->oi_fid.f_oid = id ?: 1;
+               oi->oi_fid.f_ver = 0;
        } else {
-               if (unlikely((oa->o_valid & OBD_MD_FLID &&
-                             ostid_id(&oa->o_oi) == 0)))
+               struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env);
+
+               if (unlikely((oa->o_valid & OBD_MD_FLID) && id == 0))
                        GOTO(out, rc = -EPROTO);
 
                /* Note: this check might be forced in 2.5 or 2.6, i.e.
                 * all of the requests are required to setup FLGROUP */
                if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
-                       ostid_set_seq_mdt0(&oa->o_oi);
+                       ostid_set_seq_mdt0(oi);
                        oa->o_valid |= OBD_MD_FLGROUP;
+                       seq = ostid_seq(oi);
                }
 
-               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
+               if (unlikely(!(fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq) ||
+                              fid_seq_is_norm(seq) || fid_seq_is_echo(seq))))
                        GOTO(out, rc = -EPROTO);
+
+               rc = ostid_to_fid(&tti->tti_fid1, oi,
+                                 tsi->tsi_tgt->lut_lsd.lsd_osd_index);
+               if (unlikely(rc != 0))
+                       GOTO(out, rc);
+
+               oi->oi_fid = tti->tti_fid1;
        }
+
        RETURN(0);
+
 out:
        CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
               tgt_name(tsi->tsi_tgt), obd_export_nid2str(tsi->tsi_exp),
-              ostid_seq(&oa->o_oi), ostid_id(&oa->o_oi), rc);
+              seq, id, rc);
        return rc;
 }
 EXPORT_SYMBOL(tgt_validate_obdo);
 
+static int tgt_io_data_unpack(struct tgt_session_info *tsi, struct ost_id *oi)
+{
+       unsigned                 max_brw;
+       struct niobuf_remote    *rnb;
+       struct obd_ioobj        *ioo;
+       int                      obj_count;
+
+       ENTRY;
+
+       ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+       if (ioo == NULL)
+               RETURN(-EPROTO);
+
+       rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
+       if (rnb == NULL)
+               RETURN(-EPROTO);
+
+       max_brw = ioobj_max_brw_get(ioo);
+       if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+               CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+                      ": rc = %d\n", tgt_name(tsi->tsi_tgt),
+                      obd_export_nid2str(tsi->tsi_exp), max_brw,
+                      POSTID(oi), -EPROTO);
+               RETURN(-EPROTO);
+       }
+       ioo->ioo_oid = *oi;
+
+       obj_count = req_capsule_get_size(tsi->tsi_pill, &RMF_OBD_IOOBJ,
+                                       RCL_CLIENT) / sizeof(*ioo);
+       if (obj_count == 0) {
+               CERROR("%s: short ioobj\n", tgt_name(tsi->tsi_tgt));
+               RETURN(-EPROTO);
+       } else if (obj_count > 1) {
+               CERROR("%s: too many ioobjs (%d)\n", tgt_name(tsi->tsi_tgt),
+                      obj_count);
+               RETURN(-EPROTO);
+       }
+
+       if (ioo->ioo_bufcnt == 0) {
+               CERROR("%s: ioo has zero bufcnt\n", tgt_name(tsi->tsi_tgt));
+               RETURN(-EPROTO);
+       }
+
+       if (ioo->ioo_bufcnt > PTLRPC_MAX_BRW_PAGES) {
+               DEBUG_REQ(D_RPCTRACE, tgt_ses_req(tsi),
+                         "bulk has too many pages (%d)",
+                         ioo->ioo_bufcnt);
+               RETURN(-EPROTO);
+       }
+
+       RETURN(0);
+}
+
 static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
 {
        struct ost_body         *body;
        struct req_capsule      *pill = tsi->tsi_pill;
        struct lustre_capa      *capa;
-       struct obd_ioobj        *ioo;
+       struct lu_nodemap       *nodemap;
        int                      rc;
 
        ENTRY;
@@ -186,8 +256,17 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        if (rc)
                RETURN(rc);
 
+       nodemap = tsi->tsi_exp->exp_target_data.ted_nodemap;
+
+       body->oa.o_uid = nodemap_map_id(nodemap, NODEMAP_UID,
+                                       NODEMAP_CLIENT_TO_FS,
+                                       body->oa.o_uid);
+       body->oa.o_gid = nodemap_map_id(nodemap, NODEMAP_GID,
+                                       NODEMAP_CLIENT_TO_FS,
+                                       body->oa.o_gid);
+
        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-               capa = req_capsule_client_get(tsi->tsi_pill, &RMF_CAPA1);
+               capa = req_capsule_client_get(pill, &RMF_CAPA1);
                if (capa == NULL) {
                        CERROR("%s: OSSCAPA flag is set without capability\n",
                               tgt_name(tsi->tsi_tgt));
@@ -196,28 +275,12 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        }
 
        tsi->tsi_ost_body = body;
+       tsi->tsi_fid = body->oa.o_oi.oi_fid;
 
        if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) {
-               unsigned                 max_brw;
-               struct niobuf_remote    *rnb;
-
-               ioo = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
-               if (ioo == NULL)
-                       RETURN(-EPROTO);
-
-               rnb = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
-               if (rnb == NULL)
-                       RETURN(-EPROTO);
-
-               max_brw = ioobj_max_brw_get(ioo);
-               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
-                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
-                              ": rc = %d\n", tgt_name(tsi->tsi_tgt),
-                              obd_export_nid2str(tsi->tsi_exp), max_brw,
-                              POSTID(&body->oa.o_oi), -EPROTO);
-                       RETURN(-EPROTO);
-               }
-               ioo->ioo_oid = body->oa.o_oi;
+               rc = tgt_io_data_unpack(tsi, &body->oa.o_oi);
+               if (rc < 0)
+                       RETURN(rc);
        }
 
        if (!(body->oa.o_valid & OBD_MD_FLID)) {
@@ -231,16 +294,6 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
                }
        }
 
-       rc = ostid_to_fid(&tsi->tsi_fid, &body->oa.o_oi, 0);
-       if (rc != 0)
-               RETURN(rc);
-
-       if (!fid_is_sane(&tsi->tsi_fid)) {
-               CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
-                      PFID(&tsi->tsi_fid));
-               RETURN(-EINVAL);
-       }
-
        ost_fid_build_resid(&tsi->tsi_fid, &tsi->tsi_resid);
 
        /*
@@ -251,32 +304,72 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        RETURN(rc);
 }
 
-static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
+/*
+ * Do necessary preprocessing according to handler ->th_flags.
+ */
+static int tgt_request_preprocess(struct tgt_session_info *tsi,
+                                 struct tgt_handler *h,
+                                 struct ptlrpc_request *req)
 {
        struct req_capsule      *pill = tsi->tsi_pill;
-       int                      rc;
+       __u32                    flags = h->th_flags;
+       int                      rc = 0;
 
        ENTRY;
 
-       if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
-               rc = tgt_mdt_body_unpack(tsi, flags);
-       } else if (req_capsule_has_field(pill, &RMF_OST_BODY, RCL_CLIENT)) {
-               rc = tgt_ost_body_unpack(tsi, flags);
-       } else {
-               rc = 0;
+       if (tsi->tsi_preprocessed)
+               RETURN(0);
+
+       LASSERT(h->th_act != NULL);
+       LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
+       LASSERT(current->journal_info == NULL);
+
+       LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
+                    h->th_fmt != NULL));
+       if (h->th_fmt != NULL) {
+               req_capsule_set(pill, h->th_fmt);
+               if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
+                       rc = tgt_mdt_body_unpack(tsi, flags);
+                       if (rc < 0)
+                               RETURN(rc);
+               } else if (req_capsule_has_field(pill, &RMF_OST_BODY,
+                                                RCL_CLIENT)) {
+                       rc = tgt_ost_body_unpack(tsi, flags);
+                       if (rc < 0)
+                               RETURN(rc);
+               }
        }
 
-       if (rc == 0 && flags & HABEO_REFERO) {
-               /* Pack reply */
-               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
-                       req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                            tsi->tsi_mdt_body->eadatasize);
-               if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
-                       req_capsule_set_size(pill, &RMF_LOGCOOKIES,
-                                            RCL_SERVER, 0);
+       if (flags & MUTABOR && tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
+
+       if (flags & HABEO_CLAVIS) {
+               struct ldlm_request *dlm_req;
+
+               LASSERT(h->th_fmt != NULL);
 
-               rc = req_capsule_server_pack(pill);
+               dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
+               if (dlm_req != NULL) {
+                       if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
+                                    LDLM_IBITS &&
+                                    dlm_req->lock_desc.l_policy_data.\
+                                    l_inodebits.bits == 0)) {
+                               /*
+                                * Lock without inodebits makes no sense and
+                                * will oops later in ldlm. If client miss to
+                                * set such bits, do not trigger ASSERTION.
+                                *
+                                * For liblustre flock case, it maybe zero.
+                                */
+                               rc = -EPROTO;
+                       } else {
+                               tsi->tsi_dlm_req = dlm_req;
+                       }
+               } else {
+                       rc = -EFAULT;
+               }
        }
+       tsi->tsi_preprocessed = 1;
        RETURN(rc);
 }
 
@@ -291,14 +384,9 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
 {
        int      serious = 0;
        int      rc;
-       __u32    flags;
 
        ENTRY;
 
-       LASSERT(h->th_act != NULL);
-       LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
-       LASSERT(current->journal_info == NULL);
-
        /*
         * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
         * to put same checks into handlers like mdt_close(), mdt_reint(),
@@ -313,44 +401,21 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
        if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
                RETURN(0);
 
-       rc = 0;
-       flags = h->th_flags;
-       LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
-                    h->th_fmt != NULL));
-       if (h->th_fmt != NULL) {
-               req_capsule_set(tsi->tsi_pill, h->th_fmt);
-               rc = tgt_unpack_req_pack_rep(tsi, flags);
-       }
-
-       if (rc == 0 && flags & MUTABOR &&
-           tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
-               rc = -EROFS;
-
-       if (rc == 0 && flags & HABEO_CLAVIS) {
-               struct ldlm_request *dlm_req;
-
-               LASSERT(h->th_fmt != NULL);
+       rc = tgt_request_preprocess(tsi, h, req);
+       /* pack reply if reply format is fixed */
+       if (rc == 0 && h->th_flags & HABEO_REFERO) {
+               /* Pack reply */
+               if (req_capsule_has_field(tsi->tsi_pill, &RMF_MDT_MD,
+                                         RCL_SERVER))
+                       req_capsule_set_size(tsi->tsi_pill, &RMF_MDT_MD,
+                                            RCL_SERVER,
+                                            tsi->tsi_mdt_body->mbo_eadatasize);
+               if (req_capsule_has_field(tsi->tsi_pill, &RMF_LOGCOOKIES,
+                                         RCL_SERVER))
+                       req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES,
+                                            RCL_SERVER, 0);
 
-               dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
-               if (dlm_req != NULL) {
-                       if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
-                                    LDLM_IBITS &&
-                                    dlm_req->lock_desc.l_policy_data.\
-                                    l_inodebits.bits == 0)) {
-                               /*
-                                * Lock without inodebits makes no sense and
-                                * will oops later in ldlm. If client miss to
-                                * set such bits, do not trigger ASSERTION.
-                                *
-                                * For liblustre flock case, it maybe zero.
-                                */
-                               rc = -EPROTO;
-                       } else {
-                               tsi->tsi_dlm_req = dlm_req;
-                       }
-               } else {
-                       rc = -EFAULT;
-               }
+               rc = req_capsule_server_pack(tsi->tsi_pill);
        }
 
        if (likely(rc == 0)) {
@@ -407,9 +472,10 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req,
        case MDS_SYNC: /* used in unmounting */
        case OBD_PING:
        case MDS_REINT:
-       case UPDATE_OBJ:
+       case OUT_UPDATE:
        case SEQ_QUERY:
        case FLD_QUERY:
+       case FLD_READ:
        case LDLM_ENQUEUE:
        case OST_CREATE:
        case OST_DESTROY:
@@ -433,7 +499,7 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req,
  *       -ve: abort immediately with the given error code;
  *         0: send reply with error code in req->rq_status;
  */
-int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
+static int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
 {
        ENTRY;
 
@@ -488,13 +554,45 @@ int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
        RETURN(+1);
 }
 
+/* Initial check for request, it is validation mostly */
+static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req)
+{
+       struct tgt_handler      *h;
+       struct tgt_opc_slice    *s;
+       struct lu_target        *tgt;
+       __u32                    opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+       ENTRY;
+
+       tgt = class_exp2tgt(req->rq_export);
+
+       for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
+               if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
+                       break;
+
+       /* opcode was not found in slice */
+       if (unlikely(s->tos_hs == NULL)) {
+               CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt),
+                      opc);
+               RETURN(ERR_PTR(-ENOTSUPP));
+       }
+
+       LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
+       h = s->tos_hs + (opc - s->tos_opc_start);
+       if (unlikely(h->th_opc == 0)) {
+               CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
+               RETURN(ERR_PTR(-ENOTSUPP));
+       }
+
+       RETURN(h);
+}
+
 int tgt_request_handle(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
 
        struct lustre_msg       *msg = req->rq_reqmsg;
        struct tgt_handler      *h;
-       struct tgt_opc_slice    *s;
        struct lu_target        *tgt;
        int                      request_fail_id = 0;
        __u32                    opc = lustre_msg_get_opc(msg);
@@ -543,14 +641,9 @@ int tgt_request_handle(struct ptlrpc_request *req)
        request_fail_id = tgt->lut_request_fail_id;
        tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
 
-       for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
-               if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
-                       break;
-
-       /* opcode was not found in slice */
-       if (unlikely(s->tos_hs == NULL)) {
-               CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
-               req->rq_status = -ENOTSUPP;
+       h = tgt_handler_find_check(req);
+       if (IS_ERR(h)) {
+               req->rq_status = PTR_ERR(h);
                rc = ptlrpc_error(req);
                GOTO(out, rc);
        }
@@ -558,17 +651,6 @@ int tgt_request_handle(struct ptlrpc_request *req)
        if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
                GOTO(out, rc = 0);
 
-       LASSERT(current->journal_info == NULL);
-
-       LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
-       h = s->tos_hs + (opc - s->tos_opc_start);
-       if (unlikely(h->th_opc == 0)) {
-               CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
-               req->rq_status = -ENOTSUPP;
-               rc = ptlrpc_error(req);
-               GOTO(out, rc);
-       }
-
        rc = lustre_msg_check_version(msg, h->th_version);
        if (unlikely(rc)) {
                DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
@@ -590,20 +672,48 @@ int tgt_request_handle(struct ptlrpc_request *req)
        EXIT;
 out:
        req_capsule_fini(tsi->tsi_pill);
-       tsi->tsi_pill = NULL;
        if (tsi->tsi_corpus != NULL) {
                lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
                tsi->tsi_corpus = NULL;
        }
-       tsi->tsi_env = NULL;
-       tsi->tsi_mdt_body = NULL;
-       tsi->tsi_dlm_req = NULL;
-       fid_zero(&tsi->tsi_fid);
-       memset(&tsi->tsi_resid, 0, sizeof tsi->tsi_resid);
        return rc;
 }
 EXPORT_SYMBOL(tgt_request_handle);
 
+/** Assign high priority operations to the request if needed. */
+int tgt_hpreq_handler(struct ptlrpc_request *req)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+       struct tgt_handler      *h;
+       int                      rc;
+
+       ENTRY;
+
+       if (req->rq_export == NULL)
+               RETURN(0);
+
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       tsi->tsi_pill = &req->rq_pill;
+       tsi->tsi_env = req->rq_svc_thread->t_env;
+       tsi->tsi_tgt = class_exp2tgt(req->rq_export);
+       tsi->tsi_exp = req->rq_export;
+
+       h = tgt_handler_find_check(req);
+       if (IS_ERR(h)) {
+               rc = PTR_ERR(h);
+               RETURN(rc);
+       }
+
+       rc = tgt_request_preprocess(tsi, h, req);
+       if (unlikely(rc != 0))
+               RETURN(rc);
+
+       if (h->th_hp != NULL)
+               h->th_hp(tsi);
+       RETURN(0);
+}
+EXPORT_SYMBOL(tgt_hpreq_handler);
+
 void tgt_counter_incr(struct obd_export *exp, int opcode)
 {
        lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
@@ -630,11 +740,10 @@ static inline void tgt_init_sec_none(struct obd_connect_data *reply)
 static int tgt_init_sec_level(struct ptlrpc_request *req)
 {
        struct lu_target        *tgt = class_exp2tgt(req->rq_export);
-       char                    *client = libcfs_nid2str(req->rq_peer.nid);
+       char                    *client;
        struct obd_connect_data *data, *reply;
        int                      rc = 0;
        bool                     remote;
-
        ENTRY;
 
        data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
@@ -648,6 +757,7 @@ static int tgt_init_sec_level(struct ptlrpc_request *req)
                RETURN(0);
        }
 
+       client = libcfs_nid2str(req->rq_peer.nid);
        /* no GSS support case */
        if (!req->rq_auth_gss) {
                if (tgt->lut_sec_level > LUSTRE_SEC_NONE) {
@@ -699,7 +809,7 @@ static int tgt_init_sec_level(struct ptlrpc_request *req)
                        RETURN(-EACCES);
                }
        } else {
-               if (req->rq_auth_uid == INVALID_UID) {
+               if (!uid_valid(make_kuid(&init_user_ns, req->rq_auth_uid))) {
                        CDEBUG(D_SEC, "client %s -> target %s: user is not "
                               "authenticated!\n", client, tgt_name(tgt));
                        RETURN(-EACCES);
@@ -888,13 +998,11 @@ int tgt_obd_log_cancel(struct tgt_session_info *tsi)
 {
        return err_serious(-EOPNOTSUPP);
 }
-EXPORT_SYMBOL(tgt_obd_log_cancel);
 
 int tgt_obd_qc_callback(struct tgt_session_info *tsi)
 {
        return err_serious(-EOPNOTSUPP);
 }
-EXPORT_SYMBOL(tgt_obd_qc_callback);
 
 int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob)
 {
@@ -936,7 +1044,7 @@ EXPORT_SYMBOL(tgt_sendpage);
 /*
  * OBD_IDX_READ handler
  */
-int tgt_obd_idx_read(struct tgt_session_info *tsi)
+static int tgt_obd_idx_read(struct tgt_session_info *tsi)
 {
        struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
        struct lu_rdpg          *rdpg = &tti->tti_u.rdpg.tti_rdpg;
@@ -1017,7 +1125,6 @@ out:
        }
        return rc;
 }
-EXPORT_SYMBOL(tgt_obd_idx_read);
 
 struct tgt_handler tgt_obd_handlers[] = {
 TGT_OBD_HDL    (0,     OBD_PING,               tgt_obd_ping),
@@ -1028,7 +1135,7 @@ TGT_OBD_HDL    (0,        OBD_IDX_READ,           tgt_obd_idx_read)
 EXPORT_SYMBOL(tgt_obd_handlers);
 
 int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
-            struct dt_object *obj)
+            struct dt_object *obj, __u64 start, __u64 end)
 {
        int rc = 0;
 
@@ -1039,7 +1146,7 @@ int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
                rc = dt_sync(env, tgt->lut_bottom);
        } else if (dt_version_get(env, obj) >
                   tgt->lut_obd->obd_last_committed) {
-               rc = dt_object_sync(env, obj);
+               rc = dt_object_sync(env, obj, start, end);
        }
 
        RETURN(rc);
@@ -1051,8 +1158,8 @@ EXPORT_SYMBOL(tgt_sync);
 
 /* Ensure that data and metadata are synced to the disk when lock is cancelled
  * (if requested) */
-int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                    void *data, int flag)
+static int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                           void *data, int flag)
 {
        struct lu_env            env;
        struct lu_target        *tgt;
@@ -1069,11 +1176,15 @@ int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
            (tgt->lut_sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
             (tgt->lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
              lock->l_flags & LDLM_FL_CBPENDING))) {
+               __u64 start = 0;
+               __u64 end = OBD_OBJECT_EOF;
+
                rc = lu_env_init(&env, LCT_DT_THREAD);
                if (unlikely(rc != 0))
                        RETURN(rc);
 
-               ost_fid_from_resid(&fid, &lock->l_resource->lr_name);
+               ost_fid_from_resid(&fid, &lock->l_resource->lr_name,
+                                  tgt->lut_lsd.lsd_osd_index);
                obj = dt_locate(&env, tgt->lut_bottom, &fid);
                if (IS_ERR(obj))
                        GOTO(err_env, rc = PTR_ERR(obj));
@@ -1081,10 +1192,18 @@ int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                if (!dt_object_exists(obj))
                        GOTO(err_put, rc = -ENOENT);
 
-               rc = tgt_sync(&env, tgt, obj);
+               if (lock->l_resource->lr_type == LDLM_EXTENT) {
+                       start = lock->l_policy_data.l_extent.start;
+                       end = lock->l_policy_data.l_extent.end;
+               }
+
+               rc = tgt_sync(&env, tgt, obj, start, end);
                if (rc < 0) {
-                       CERROR("%s: sync failed on lock cancel: rc = %d\n",
-                              tgt_name(tgt), rc);
+                       CERROR("%s: syncing "DFID" ("LPU64"-"LPU64") on lock "
+                              "cancel: rc = %d\n",
+                              tgt_name(tgt), PFID(&fid),
+                              lock->l_policy_data.l_extent.start,
+                              lock->l_policy_data.l_extent.end, rc);
                }
 err_put:
                lu_object_put(&env, &obj->do_lu);
@@ -1096,7 +1215,7 @@ err_env:
        RETURN(rc);
 }
 
-struct ldlm_callback_suite tgt_dlm_cbs = {
+static struct ldlm_callback_suite tgt_dlm_cbs = {
        .lcs_completion = ldlm_server_completion_ast,
        .lcs_blocking   = tgt_blocking_ast,
        .lcs_glimpse    = ldlm_server_glimpse_ast
@@ -1118,6 +1237,20 @@ int tgt_enqueue(struct tgt_session_info *tsi)
        if (rc)
                RETURN(err_serious(rc));
 
+       switch (LUT_FAIL_CLASS(tsi->tsi_reply_fail_id)) {
+       case LUT_FAIL_MDT:
+               tsi->tsi_reply_fail_id = OBD_FAIL_MDS_LDLM_REPLY_NET;
+               break;
+       case LUT_FAIL_OST:
+               tsi->tsi_reply_fail_id = OBD_FAIL_OST_LDLM_REPLY_NET;
+               break;
+       case LUT_FAIL_MGT:
+               tsi->tsi_reply_fail_id = OBD_FAIL_MGS_LDLM_REPLY_NET;
+               break;
+       default:
+               tsi->tsi_reply_fail_id = OBD_FAIL_LDLM_REPLY;
+               break;
+       }
        RETURN(req->rq_status);
 }
 EXPORT_SYMBOL(tgt_enqueue);
@@ -1135,19 +1268,16 @@ int tgt_convert(struct tgt_session_info *tsi)
 
        RETURN(req->rq_status);
 }
-EXPORT_SYMBOL(tgt_convert);
 
 int tgt_bl_callback(struct tgt_session_info *tsi)
 {
        return err_serious(-EOPNOTSUPP);
 }
-EXPORT_SYMBOL(tgt_bl_callback);
 
 int tgt_cp_callback(struct tgt_session_info *tsi)
 {
        return err_serious(-EOPNOTSUPP);
 }
-EXPORT_SYMBOL(tgt_cp_callback);
 
 /* generic LDLM target handler */
 struct tgt_handler tgt_dlm_handlers[] = {
@@ -1196,7 +1326,6 @@ int tgt_llog_destroy(struct tgt_session_info *tsi)
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(tgt_llog_destroy);
 
 int tgt_llog_read_header(struct tgt_session_info *tsi)
 {
@@ -1249,7 +1378,7 @@ EXPORT_SYMBOL(tgt_llog_handlers);
  * sec context handlers
  */
 /* XXX: Implement based on mdt_sec_ctx_handle()? */
-int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
+static int tgt_sec_ctx_handle(struct tgt_session_info *tsi)
 {
        return 0;
 }
@@ -1261,6 +1390,77 @@ TGT_SEC_HDL_VAR(0,       SEC_CTX_FINI,           tgt_sec_ctx_handle),
 };
 EXPORT_SYMBOL(tgt_sec_ctx_handlers);
 
+int (*tgt_lfsck_in_notify)(const struct lu_env *env,
+                          struct dt_device *key,
+                          struct lfsck_request *lr,
+                          struct thandle *th) = NULL;
+
+void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *,
+                                               struct dt_device *,
+                                               struct lfsck_request *,
+                                               struct thandle *))
+{
+       tgt_lfsck_in_notify = notify;
+}
+EXPORT_SYMBOL(tgt_register_lfsck_in_notify);
+
+static int (*tgt_lfsck_query)(const struct lu_env *env,
+                             struct dt_device *key,
+                             struct lfsck_request *lr) = NULL;
+
+void tgt_register_lfsck_query(int (*query)(const struct lu_env *,
+                                          struct dt_device *,
+                                          struct lfsck_request *))
+{
+       tgt_lfsck_query = query;
+}
+EXPORT_SYMBOL(tgt_register_lfsck_query);
+
+/* LFSCK request handlers */
+static int tgt_handle_lfsck_notify(struct tgt_session_info *tsi)
+{
+       const struct lu_env     *env = tsi->tsi_env;
+       struct dt_device        *key = tsi->tsi_tgt->lut_bottom;
+       struct lfsck_request    *lr;
+       int                      rc;
+       ENTRY;
+
+       lr = req_capsule_client_get(tsi->tsi_pill, &RMF_LFSCK_REQUEST);
+       if (lr == NULL)
+               RETURN(-EPROTO);
+
+       rc = tgt_lfsck_in_notify(env, key, lr, NULL);
+
+       RETURN(rc);
+}
+
+static int tgt_handle_lfsck_query(struct tgt_session_info *tsi)
+{
+       struct lfsck_request    *request;
+       struct lfsck_reply      *reply;
+       int                      rc;
+       ENTRY;
+
+       request = req_capsule_client_get(tsi->tsi_pill, &RMF_LFSCK_REQUEST);
+       if (request == NULL)
+               RETURN(-EPROTO);
+
+       reply = req_capsule_server_get(tsi->tsi_pill, &RMF_LFSCK_REPLY);
+       if (reply == NULL)
+               RETURN(-ENOMEM);
+
+       rc = tgt_lfsck_query(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, request);
+       reply->lr_status = rc;
+
+       RETURN(rc < 0 ? rc : 0);
+}
+
+struct tgt_handler tgt_lfsck_handlers[] = {
+TGT_LFSCK_HDL(HABEO_REFERO,    LFSCK_NOTIFY,   tgt_handle_lfsck_notify),
+TGT_LFSCK_HDL(HABEO_REFERO,    LFSCK_QUERY,    tgt_handle_lfsck_query),
+};
+EXPORT_SYMBOL(tgt_lfsck_handlers);
+
 /*
  * initialize per-thread page pool (bug 5137).
  */
@@ -1361,18 +1561,18 @@ int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
        LASSERT(mode == LCK_PR || mode == LCK_PW);
        LASSERT(!lustre_handle_is_used(lh));
 
-       if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+       if (nrbufs == 0 || !(nb[0].rnb_flags & OBD_BRW_SRVLOCK))
                RETURN(0);
 
        for (i = 1; i < nrbufs; i++)
-               if (!(nb[i].flags & OBD_BRW_SRVLOCK))
+               if (!(nb[i].rnb_flags & OBD_BRW_SRVLOCK))
                        RETURN(-EFAULT);
 
-       RETURN(tgt_extent_lock(ns, res_id, nb[0].offset,
-                              nb[nrbufs - 1].offset + nb[nrbufs - 1].len - 1,
+       RETURN(tgt_extent_lock(ns, res_id, nb[0].rnb_offset,
+                              nb[nrbufs - 1].rnb_offset +
+                              nb[nrbufs - 1].rnb_len - 1,
                               lh, mode, &flags));
 }
-EXPORT_SYMBOL(tgt_brw_lock);
 
 void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
                    struct lustre_handle *lh, int mode)
@@ -1380,13 +1580,14 @@ void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
        ENTRY;
 
        LASSERT(mode == LCK_PR || mode == LCK_PW);
-       LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
+       LASSERT((obj->ioo_bufcnt > 0 &&
+                (niob[0].rnb_flags & OBD_BRW_SRVLOCK)) ==
                lustre_handle_is_used(lh));
+
        if (lustre_handle_is_used(lh))
                tgt_extent_unlock(lh, mode);
        EXIT;
 }
-EXPORT_SYMBOL(tgt_brw_unlock);
 
 static __u32 tgt_checksum_bulk(struct lu_target *tgt,
                               struct ptlrpc_bulk_desc *desc, int opc,
@@ -1455,10 +1656,8 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt,
                }
        }
 
-       bufsize = 4;
+       bufsize = sizeof(cksum);
        err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
-       if (err)
-               cfs_crypto_hash_final(hdesc, NULL, NULL);
 
        return cksum;
 }
@@ -1474,8 +1673,7 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        struct ost_body         *body, *repbody;
        struct l_wait_info       lwi;
        struct lustre_handle     lockh = { 0 };
-       int                      niocount, npages, nob = 0, rc, i;
-       int                      no_reply = 0;
+       int                      npages, nob = 0, rc, i, no_reply = 0;
        struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
 
        ENTRY;
@@ -1493,15 +1691,16 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
                RETURN(-EIO);
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
+                        cfs_fail_val : (obd_timeout + 1) / 4);
 
        /* Check if there is eviction in progress, and if so, wait for it to
         * finish */
-       if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+       if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
                /* We do not care how long it takes */
                lwi = LWI_INTR(NULL, NULL);
                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
-                        !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
+                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),
                         &lwi);
        }
 
@@ -1517,7 +1716,6 @@ int tgt_brw_read(struct tgt_session_info *tsi)
        ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
        LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
 
-       niocount = ioo->ioo_bufcnt;
        remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
        LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
 
@@ -1559,7 +1757,7 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
        nob = 0;
        for (i = 0; i < npages; i++) {
-               int page_rc = local_nb[i].rc;
+               int page_rc = local_nb[i].lnb_rc;
 
                if (page_rc < 0) {
                        rc = page_rc;
@@ -1568,16 +1766,16 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
                nob += page_rc;
                if (page_rc != 0) { /* some data! */
-                       LASSERT(local_nb[i].page != NULL);
-                       ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+                       LASSERT(local_nb[i].lnb_page != NULL);
+                       ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page,
                                                    local_nb[i].lnb_page_offset,
                                                    page_rc);
                }
 
-               if (page_rc != local_nb[i].len) { /* short read */
+               if (page_rc != local_nb[i].lnb_len) { /* short read */
                        /* All subsequent pages should be 0 */
                        while (++i < npages)
-                               LASSERT(local_nb[i].rc == 0);
+                               LASSERT(local_nb[i].lnb_rc == 0);
                        break;
                }
        }
@@ -1654,20 +1852,18 @@ EXPORT_SYMBOL(tgt_brw_read);
 static void tgt_warn_on_cksum(struct ptlrpc_request *req,
                              struct ptlrpc_bulk_desc *desc,
                              struct niobuf_local *local_nb, int npages,
-                             obd_count client_cksum, obd_count server_cksum,
+                             u32 client_cksum, u32 server_cksum,
                              bool mmap)
 {
        struct obd_export *exp = req->rq_export;
        struct ost_body *body;
-       char *router;
-       char *via;
+       char *router = "";
+       char *via = "";
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        LASSERT(body != NULL);
 
-       if (req->rq_peer.nid == desc->bd_sender) {
-               via = router = "";
-       } else {
+       if (req->rq_peer.nid != desc->bd_sender) {
                via = " via ";
                router = libcfs_nid2str(desc->bd_sender);
        }
@@ -1692,7 +1888,7 @@ static void tgt_warn_on_cksum(struct ptlrpc_request *req,
                           POSTID(&body->oa.o_oi),
                           local_nb[0].lnb_file_offset,
                           local_nb[npages-1].lnb_file_offset +
-                          local_nb[npages-1].len - 1,
+                          local_nb[npages - 1].lnb_len - 1,
                           client_cksum, server_cksum);
 }
 
@@ -1737,7 +1933,8 @@ int tgt_brw_write(struct tgt_session_info *tsi)
                RETURN(err_serious(-EFAULT));
 
        /* pause before transaction has been started */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, cfs_fail_val > 0 ?
+                        cfs_fail_val : (obd_timeout + 1) / 4);
 
        /* There must be big cache in current thread to process this request
         * if it is NULL then something went wrong and it wasn't allocated,
@@ -1764,7 +1961,7 @@ int tgt_brw_write(struct tgt_session_info *tsi)
                        sizeof(*remote_nb))
                RETURN(err_serious(-EPROTO));
 
-       if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
+       if ((remote_nb[0].rnb_flags & OBD_BRW_MEMALLOC) &&
            (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
                memory_pressure_set();
 
@@ -1827,9 +2024,9 @@ int tgt_brw_write(struct tgt_session_info *tsi)
 
        /* NB Having prepped, we must commit... */
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+               ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].lnb_page,
                                            local_nb[i].lnb_page_offset,
-                                           local_nb[i].len);
+                                           local_nb[i].lnb_len);
 
        rc = sptlrpc_svc_prep_bulk(req, desc);
        if (rc != 0)
@@ -1890,15 +2087,15 @@ skip_transfer:
 
                /* set per-requested niobuf return codes */
                for (i = j = 0; i < niocount; i++) {
-                       int len = remote_nb[i].len;
+                       int len = remote_nb[i].rnb_len;
 
                        nob += len;
                        rcs[i] = 0;
                        do {
                                LASSERT(j < npages);
-                               if (local_nb[j].rc < 0)
-                                       rcs[i] = local_nb[j].rc;
-                               len -= local_nb[j].len;
+                               if (local_nb[j].lnb_rc < 0)
+                                       rcs[i] = local_nb[j].lnb_rc;
+                               len -= local_nb[j].lnb_len;
                                j++;
                        } while (len > 0);
                        LASSERT(len == 0);