Whamcloud - gitweb
LU-8592 mdt: hold mdt_device::mdt_md_root until service stop
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index acc008f..c3e8cf1 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -49,6 +45,7 @@
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
+#include <linux/pagemap.h>
 
 #include <dt_object.h>
 #include <lustre_acl.h>
@@ -379,6 +376,7 @@ static int mdt_get_root(struct tgt_session_info *tsi)
 
        nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
        if (nodemap_fileset && nodemap_fileset[0]) {
+               CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
                if (fileset) {
                        /* consider fileset from client as a sub-fileset
                         * of the nodemap one */
@@ -396,6 +394,7 @@ static int mdt_get_root(struct tgt_session_info *tsi)
        }
 
        if (fileset) {
+               CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
                rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
                if (rc < 0)
                        GOTO(out, rc = err_serious(rc));
@@ -578,7 +577,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
        if (fid != NULL) {
                b->mbo_fid1 = *fid;
                b->mbo_valid |= OBD_MD_FLID;
-               CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid="LPX64"\n",
+               CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
                       PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
        }
 
@@ -928,7 +927,7 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
 #endif
 out:
        ma->ma_need = need;
-       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
+       CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
               rc, ma->ma_valid, ma->ma_lmm);
        RETURN(rc);
 }
@@ -1014,7 +1013,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
-               CERROR("%s: getattr error for "DFID": rc = %d\n",
+               CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
+                      "%s: getattr error for "DFID": rc = %d\n",
                       mdt_obd_name(info->mti_mdt),
                       PFID(mdt_object_fid(o)), rc);
                RETURN(rc);
@@ -1101,7 +1101,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                               PFID(mdt_object_fid(o)), rc);
                        rc = -EFAULT;
                } else {
-                       int print_limit = min_t(int, PAGE_CACHE_SIZE - 128, rc);
+                       int print_limit = min_t(int, PAGE_SIZE - 128, rc);
 
                        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
                                rc -= 2;
@@ -1771,7 +1771,7 @@ static int mdt_readpage(struct tgt_session_info *tsi)
          */
        rdpg->rp_hash = reqbody->mbo_size;
        if (rdpg->rp_hash != reqbody->mbo_size) {
-               CERROR("Invalid hash: "LPX64" != "LPX64"\n",
+               CERROR("Invalid hash: %#llx != %#llx\n",
                       rdpg->rp_hash, reqbody->mbo_size);
                RETURN(-EFAULT);
        }
@@ -1781,14 +1781,14 @@ static int mdt_readpage(struct tgt_session_info *tsi)
                rdpg->rp_attrs |= LUDA_64BITHASH;
        rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
                                exp_max_brw_size(tsi->tsi_exp));
-       rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE - 1) >>
-                         PAGE_CACHE_SHIFT;
+       rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
+                         PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         if (rdpg->rp_pages == NULL)
                 RETURN(-ENOMEM);
 
         for (i = 0; i < rdpg->rp_npages; ++i) {
-               rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
+               rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
                 if (rdpg->rp_pages[i] == NULL)
                         GOTO(free_rdpg, rc = -ENOMEM);
         }
@@ -2388,11 +2388,15 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                break;
        }
        case LDLM_CB_CANCELING: {
+               struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+               struct mdt_device *mdt =
+                               mdt_dev(obd->obd_lu_dev->ld_site->ls_top_dev);
+
                LDLM_DEBUG(lock, "Revoke remote lock\n");
 
                /* discard slc lock here so that it can be cleaned anytime,
                 * especially for cleanup_resource() */
-               tgt_discard_slc_lock(lock);
+               tgt_discard_slc_lock(&mdt->mdt_lut, lock);
 
                /* once we cache lock, l_ast_data is set to mdt_object */
                if (lock->l_ast_data != NULL) {
@@ -2401,9 +2405,6 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                        rc = lu_env_init(&env, LCT_MD_THREAD);
                        if (unlikely(rc != 0)) {
-                               struct obd_device *obd;
-
-                               obd = ldlm_lock_to_ns(lock)->ns_obd;
                                CWARN("%s: lu_env initialization failed, object"
                                      "%p "DFID" is leaked!\n",
                                      obd->obd_name, mo,
@@ -2443,7 +2444,7 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
                        /* Lock is pinned by ldlm_handle_enqueue0() as it is
                         * a resend case, however, it could be already destroyed
                         * due to client eviction or a raced cancel RPC. */
-                       LDLM_DEBUG_NOLOCK("Invalid lock handle "LPX64,
+                       LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
                                          lhc->mlh_reg_lh.cookie);
                        RETURN(-ESTALE);
                }
@@ -2739,14 +2740,14 @@ static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                        cos = (mdt_cos_is_enabled(mdt) ||
                               mdt_slc_is_enabled(mdt));
 
-                       LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+                       LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
                                 h->cookie);
 
                        /* there is no request if mdt_object_unlock() is called
                         * from mdt_export_cleanup()->mdt_add_dirty_flag() */
                        if (likely(req != NULL)) {
                                CDEBUG(D_HA, "request = %p reply state = %p"
-                                      " transno = "LPD64"\n", req,
+                                      " transno = %lld\n", req,
                                       req->rq_reply_state, req->rq_transno);
                                if (cos) {
                                        ldlm_lock_downgrade(lock, LCK_COS);
@@ -2803,7 +2804,8 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
                        struct ptlrpc_request *req = mdt_info_req(info);
 
                        LASSERT(req != NULL);
-                       tgt_save_slc_lock(lock, req->rq_transno);
+                       tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
+                                         req->rq_transno);
                        ldlm_lock_decref(h, mode);
                }
                h->cookie = 0ull;
@@ -3181,14 +3183,14 @@ mdt_intent_lock_replace(struct mdt_thread_info *info,
                /* Lock is pinned by ldlm_handle_enqueue0() as it is
                 * a resend case, however, it could be already destroyed
                 * due to client eviction or a raced cancel RPC. */
-               LDLM_DEBUG_NOLOCK("Invalid lock handle "LPX64"\n",
+               LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx\n",
                                  lh->mlh_reg_lh.cookie);
                lh->mlh_reg_lh.cookie = 0;
                RETURN(-ESTALE);
        }
 
        LASSERTF(new_lock != NULL,
-                "lockh "LPX64" flags "LPX64" rc %d\n",
+                "lockh %#llx flags %#llx : rc = %d\n",
                 lh->mlh_reg_lh.cookie, flags, result);
 
         /*
@@ -3275,7 +3277,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                lh->mlh_reg_mode = new_lock->l_granted_mode;
 
                LDLM_DEBUG(new_lock, "Restoring lock cookie");
-               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie "LPX64,
+               DEBUG_REQ(D_DLMTRACE, req, "restoring lock cookie %#llx",
                          lh->mlh_reg_lh.cookie);
                return;
        }
@@ -3294,7 +3296,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
          */
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
-       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
+       DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle %#llx",
                  dlmreq->lock_handle[0].cookie);
 }
 
@@ -3305,7 +3307,8 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
 {
        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
        struct ldlm_reply      *ldlm_rep = NULL;
-       int rc, grc;
+       int rc;
+       ENTRY;
 
        /*
         * Initialize lhc->mlh_reg_lh either from a previously granted lock
@@ -3321,18 +3324,30 @@ static int mdt_intent_getxattr(enum mdt_it_code opcode,
                        return rc;
        }
 
-       grc = mdt_getxattr(info);
-
-       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, 0);
+       rc = mdt_getxattr(info);
 
        if (mdt_info_req(info)->rq_repmsg != NULL)
                ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
-       if (ldlm_rep == NULL)
+
+       if (ldlm_rep == NULL ||
+           OBD_FAIL_CHECK(OBD_FAIL_MDS_XATTR_REP)) {
+               mdt_object_unlock(info,  info->mti_object, lhc, 1);
                RETURN(err_serious(-EFAULT));
+       }
 
-       ldlm_rep->lock_policy_res2 = grc;
+       ldlm_rep->lock_policy_res2 = clear_serious(rc);
 
-       return rc;
+       /* This is left for interop instead of adding a new interop flag.
+        * LU-7433 */
+#if LUSTRE_VERSION_CODE > OBD_OCD_VERSION(3, 0, 0, 0)
+       if (ldlm_rep->lock_policy_res2) {
+               mdt_object_unlock(info, info->mti_object, lhc, 1);
+               RETURN(ELDLM_LOCK_ABORTED);
+       }
+#endif
+
+       rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
+       RETURN(rc);
 }
 
 static int mdt_intent_getattr(enum mdt_it_code opcode,
@@ -4494,17 +4509,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        struct lfsck_stop        stop;
        ENTRY;
 
-       if (m->mdt_md_root != NULL) {
-               mdt_object_put(env, m->mdt_md_root);
-               m->mdt_md_root = NULL;
-       }
-
        stop.ls_status = LS_PAUSED;
        stop.ls_flags = 0;
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
-       target_recovery_fini(obd);
        ping_evictor_stop();
 
        if (m->mdt_opts.mo_coordinator)
@@ -4534,6 +4543,11 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
                d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
        }
 
+       if (m->mdt_md_root != NULL) {
+               mdt_object_put(env, m->mdt_md_root);
+               m->mdt_md_root = NULL;
+       }
+
        mdt_quota_fini(env, m);
 
        cfs_free_nidlist(&m->mdt_squash.rsi_nosquash_nids);
@@ -5056,6 +5070,10 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
  * connect flags from the obd_connect_data::ocd_connect_flags field of the
  * reply. \see mdt_connect().
  *
+ * Before 2.7.50 clients will send a struct obd_connect_data_v1 rather than a
+ * full struct obd_connect_data. So care must be taken when accessing fields
+ * that are not present in struct obd_connect_data_v1. See LU-16.
+ *
  * \param exp   the obd_export associated with this client/target pair
  * \param mdt   the target device for the connection
  * \param data  stores data for this connect request
@@ -5071,13 +5089,16 @@ static int mdt_connect_internal(struct obd_export *exp,
        LASSERT(data != NULL);
 
        data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
-       data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
+
+       if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
+               data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
+
        data->ocd_ibits_known &= MDS_INODELOCK_FULL;
 
        if (!(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
            !(data->ocd_connect_flags & OBD_CONNECT_IBITS)) {
                CWARN("%s: client %s does not support ibits lock, either "
-                     "very old or an invalid client: flags "LPX64"\n",
+                     "very old or an invalid client: flags %#llx\n",
                      mdt_obd_name(mdt), exp->exp_client_uuid.uuid,
                      data->ocd_connect_flags);
                return -EBADE;
@@ -5093,7 +5114,7 @@ static int mdt_connect_internal(struct obd_export *exp,
                data->ocd_brw_size = min(data->ocd_brw_size,
                                         (__u32)MD_MAX_BRW_SIZE);
                if (data->ocd_brw_size == 0) {
-                       CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
+                       CERROR("%s: cli %s/%p ocd_connect_flags: %#llx"
                               " ocd_version: %x ocd_grant: %d "
                               "ocd_index: %u ocd_brw_size is "
                               "unexpectedly zero, network data "
@@ -5694,16 +5715,16 @@ static int mdt_fid2path(struct mdt_thread_info *info,
        int    rc;
        ENTRY;
 
-       CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
+       CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n",
                PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno);
 
        if (!fid_is_sane(&fp->gf_fid))
                RETURN(-EINVAL);
 
        if (!fid_is_namespace_visible(&fp->gf_fid)) {
-               CWARN("%s: "DFID" is invalid, sequence should be "
-                     ">= "LPX64"\n", mdt_obd_name(mdt),
-                     PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
+               CDEBUG(D_INFO, "%s: "DFID" is invalid, f_seq should be >= %#llx"
+                      ", or f_oid != 0, or f_ver == 0\n", mdt_obd_name(mdt),
+                      PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
                RETURN(-EINVAL);
        }
 
@@ -5731,7 +5752,7 @@ static int mdt_fid2path(struct mdt_thread_info *info,
 
        rc = mdt_path(info, obj, fp, root_fid);
 
-       CDEBUG(D_INFO, "fid "DFID", path %s recno "LPX64" linkno %u\n",
+       CDEBUG(D_INFO, "fid "DFID", path %s recno %#llx linkno %u\n",
               PFID(&fp->gf_fid), fp->gf_u.gf_path,
               fp->gf_recno, fp->gf_linkno);