Whamcloud - gitweb
LU-3467 mdt: call MDT handlers via unified request handler
[fs/lustre-release.git] / lustre / target / tgt_handler.c
index c5a9fde..e3e63db 100644 (file)
@@ -46,25 +46,87 @@ char *tgt_name(struct lu_target *tgt)
 }
 EXPORT_SYMBOL(tgt_name);
 
+/*
+ * Generic code handling requests that have struct mdt_body passed in:
+ *
+ *  - extract mdt_body from request and save it in @tsi, if present;
+ *
+ *  - create lu_object, corresponding to the fid in mdt_body, and save it in
+ *  @tsi;
+ *
+ *  - if HABEO_CORPUS flag is set for this request type check whether object
+ *  actually exists on storage (lu_object_exists()).
+ *
+ */
+static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
+{
+       const struct mdt_body   *body;
+       struct lu_object        *obj;
+       struct req_capsule      *pill = tsi->tsi_pill;
+       int                      rc;
+
+       ENTRY;
+
+       body = req_capsule_client_get(pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               RETURN(-EFAULT);
+
+       tsi->tsi_mdt_body = body;
+
+       if (!(body->valid & OBD_MD_FLID))
+               RETURN(0);
+
+       /* mdc_pack_body() doesn't check if fid is zero and set OBD_ML_FID
+        * in any case in pre-2.5 clients. Fix that here if needed */
+       if (unlikely(fid_is_zero(&body->fid1)))
+               RETURN(0);
+
+       if (!fid_is_sane(&body->fid1)) {
+               CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
+                      PFID(&body->fid1));
+               RETURN(-EINVAL);
+       }
+
+       obj = lu_object_find(tsi->tsi_env,
+                            &tsi->tsi_tgt->lut_bottom->dd_lu_dev,
+                            &body->fid1, NULL);
+       if (!IS_ERR(obj)) {
+               if ((flags & HABEO_CORPUS) && !lu_object_exists(obj)) {
+                       lu_object_put(tsi->tsi_env, obj);
+                       /* for capability renew ENOENT will be handled in
+                        * mdt_renew_capa */
+                       if (body->valid & OBD_MD_FLOSSCAPA)
+                               rc = 0;
+                       else
+                               rc = -ENOENT;
+               } else {
+                       tsi->tsi_corpus = obj;
+                       rc = 0;
+               }
+       } else {
+               rc = PTR_ERR(obj);
+       }
+       RETURN(rc);
+}
+
 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
 {
        struct req_capsule      *pill = tsi->tsi_pill;
-       const struct mdt_body   *body = NULL;
-       int                      rc = 0;
+       int                      rc;
 
        ENTRY;
 
        if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
-               body = req_capsule_client_get(pill, &RMF_MDT_BODY);
-               if (body == NULL)
-                       RETURN(-EFAULT);
+               rc = tgt_mdt_body_unpack(tsi, flags);
+       } else {
+               rc = 0;
        }
 
        if (flags & HABEO_REFERO) {
                /* Pack reply */
                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                            body ? body->eadatasize : 0);
+                                            tsi->tsi_mdt_body->eadatasize);
                if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_LOGCOOKIES,
                                             RCL_SERVER, 0);
@@ -93,6 +155,20 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
        LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
        LASSERT(current->journal_info == NULL);
 
+       /*
+        * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
+        * to put same checks into handlers like mdt_close(), mdt_reint(),
+        * etc., without talking to mdt authors first. Checking same thing
+        * there again is useless and returning 0 error without packing reply
+        * is buggy! Handlers either pack reply or return error.
+        *
+        * We return 0 here and do not send any reply in order to emulate
+        * network failure. Do not send any reply in case any of NET related
+        * fail_id has occured.
+        */
+       if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
+               RETURN(0);
+
        rc = 0;
        flags = h->th_flags;
        LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
@@ -281,14 +357,12 @@ int tgt_request_handle(struct ptlrpc_request *req)
 
        ENTRY;
 
-       /* Refill(initilize) the context, in case it is
-        * not initialized yet. */
+       /* Refill the context, to make sure all thread keys are allocated */
        lu_env_refill(req->rq_svc_thread->t_env);
 
        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
        tsi->tsi_pill = &req->rq_pill;
        tsi->tsi_env = req->rq_svc_thread->t_env;
-       tsi->tsi_dlm_req = NULL;
 
        /* if request has export then get handlers slice from corresponding
         * target, otherwise that should be connect operation */
@@ -364,6 +438,13 @@ int tgt_request_handle(struct ptlrpc_request *req)
 out:
        req_capsule_fini(tsi->tsi_pill);
        tsi->tsi_pill = NULL;
+       if (tsi->tsi_corpus != NULL) {
+               lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
+               tsi->tsi_corpus = NULL;
+       }
+       tsi->tsi_env = NULL;
+       tsi->tsi_mdt_body = NULL;
+       tsi->tsi_dlm_req = NULL;
        return rc;
 }
 EXPORT_SYMBOL(tgt_request_handle);
@@ -583,6 +664,7 @@ int tgt_connect(struct tgt_session_info *tsi)
        reply = req_capsule_server_get(tsi->tsi_pill, &RMF_CONNECT_DATA);
        spin_lock(&tsi->tsi_exp->exp_lock);
        *exp_connect_flags_ptr(tsi->tsi_exp) = reply->ocd_connect_flags;
+       tsi->tsi_exp->exp_connect_data.ocd_brw_size = reply->ocd_brw_size;
        spin_unlock(&tsi->tsi_exp->exp_lock);
 
        RETURN(0);
@@ -635,8 +717,7 @@ int tgt_obd_qc_callback(struct tgt_session_info *tsi)
 }
 EXPORT_SYMBOL(tgt_obd_qc_callback);
 
-static int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg,
-                       int nob)
+int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob)
 {
        struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
@@ -787,7 +868,6 @@ int tgt_enqueue(struct tgt_session_info *tsi)
         * tsi->tsi_dlm_cbs was set by the *_req_handle() function.
         */
        LASSERT(tsi->tsi_dlm_req != NULL);
-
        rc = ldlm_handle_enqueue0(tsi->tsi_exp->exp_obd->obd_namespace, req,
                                  tsi->tsi_dlm_req, &tgt_dlm_cbs);
        if (rc)