Whamcloud - gitweb
LU-930 misc: limit CDEBUG console message frequency
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index a093a49..bf1d0d7 100644 (file)
@@ -164,6 +164,13 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
        lh->mlh_type = MDT_REG_LOCK;
 }
 
+void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
+{
+       mdt_lock_reg_init(lh, lock->l_req_mode);
+       if (lock->l_req_mode == LCK_GROUP)
+               lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
+}
+
 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
                       const struct lu_name *lname)
 {
@@ -269,6 +276,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct lu_name *lname = &info->mti_name;
+       const char *start = fileset;
        char *filename = info->mti_filename;
        struct mdt_object *parent;
        u32 mode;
@@ -283,8 +291,8 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
         */
        *fid = mdt->mdt_md_root_fid;
 
-       while (rc == 0 && fileset != NULL && *fileset != '\0') {
-               const char *s1 = fileset;
+       while (rc == 0 && start != NULL && *start != '\0') {
+               const char *s1 = start;
                const char *s2;
 
                while (*++s1 == '/')
@@ -296,7 +304,7 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                if (s2 == s1)
                        break;
 
-               fileset = s2;
+               start = s2;
 
                lname->ln_namelen = s2 - s1;
                if (lname->ln_namelen > NAME_MAX) {
@@ -332,9 +340,21 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                        rc = PTR_ERR(parent);
                else {
                        mode = lu_object_attr(&parent->mot_obj);
-                       mdt_object_put(info->mti_env, parent);
-                       if (!S_ISDIR(mode))
+                       if (!S_ISDIR(mode)) {
                                rc = -ENOTDIR;
+                       } else if (mdt_is_remote_object(info, parent, parent)) {
+                               if (!mdt->mdt_enable_remote_subdir_mount) {
+                                       rc = -EREMOTE;
+                                       LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
+                                                     mdt_obd_name(mdt),
+                                                     fileset, rc);
+                               } else {
+                                       LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
+                                                     mdt_obd_name(mdt),
+                                                     fileset);
+                               }
+                       }
+                       mdt_object_put(info->mti_env, parent);
                }
        }
 
@@ -1396,10 +1416,10 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
-               CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
-                      "%s: getattr error for "DFID": rc = %d\n",
-                      mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(o)), rc);
+               CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
+                            "%s: getattr error for "DFID": rc = %d\n",
+                            mdt_obd_name(info->mti_mdt),
+                            PFID(mdt_object_fid(o)), rc);
                RETURN(rc);
        }
 
@@ -2162,37 +2182,38 @@ unlock_parent:
 static int mdt_getattr_name(struct tgt_session_info *tsi)
 {
        struct mdt_thread_info  *info = tsi2mdt_info(tsi);
-        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
-        struct mdt_body        *reqbody;
-        struct mdt_body        *repbody;
-        int rc, rc2;
-        ENTRY;
+       struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
+       struct mdt_body *reqbody;
+       struct mdt_body *repbody;
+       int rc, rc2;
 
-        reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(reqbody != NULL);
-        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(repbody != NULL);
+       ENTRY;
+
+       reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(reqbody != NULL);
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       LASSERT(repbody != NULL);
 
        info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
        repbody->mbo_eadatasize = 0;
        repbody->mbo_aclsize = 0;
 
-        rc = mdt_init_ucred_intent_getattr(info, reqbody);
-        if (unlikely(rc))
-                GOTO(out_shrink, rc);
+       rc = mdt_init_ucred(info, reqbody);
+       if (unlikely(rc))
+               GOTO(out_shrink, rc);
 
-        rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
-        if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
-                ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
-                lhc->mlh_reg_lh.cookie = 0;
-        }
-        mdt_exit_ucred(info);
-        EXIT;
+       rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
+       if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+               ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
+               lhc->mlh_reg_lh.cookie = 0;
+       }
+       mdt_exit_ucred(info);
+       EXIT;
 out_shrink:
-        mdt_client_compatibility(info);
-        rc2 = mdt_fix_reply(info);
-        if (rc == 0)
-                rc = rc2;
+       mdt_client_compatibility(info);
+       rc2 = mdt_fix_reply(info);
+       if (rc == 0)
+               rc = rc2;
        mdt_thread_info_fini(info);
        return rc;
 }
@@ -2287,7 +2308,7 @@ static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
        if (la->la_flags & LUSTRE_IMMUTABLE_FL)
                        rc = -EACCES;
 
-       if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
+       if (md_capable(uc, CAP_DAC_OVERRIDE))
                RETURN(0);
        if (uc->uc_fsuid == la->la_uid) {
                if ((la->la_mode & S_IWUSR) == 0)
@@ -2514,7 +2535,7 @@ static int mdt_readpage(struct tgt_session_info *tsi)
                                exp_max_brw_size(tsi->tsi_exp));
        rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
                          PAGE_SHIFT;
-       OBD_ALLOC_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
+       OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
        if (rdpg->rp_pages == NULL)
                RETURN(-ENOMEM);
 
@@ -2538,7 +2559,7 @@ free_rdpg:
        for (i = 0; i < rdpg->rp_npages; i++)
                if (rdpg->rp_pages[i] != NULL)
                        __free_page(rdpg->rp_pages[i]);
-       OBD_FREE_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
+       OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
                RETURN(0);
@@ -3527,6 +3548,7 @@ int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
 
        policy->l_inodebits.bits = *ibits;
        policy->l_inodebits.try_bits = trybits;
+       policy->l_inodebits.li_gid = lh->mlh_gid;
 
         /*
          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
@@ -4259,7 +4281,7 @@ static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
                GOTO(out_shrink, rc = -EINVAL);
        }
 
-       rc = mdt_init_ucred_intent_getattr(info, reqbody);
+       rc = mdt_init_ucred(info, reqbody);
        if (rc)
                GOTO(out_shrink, rc);
 
@@ -4441,6 +4463,7 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
         struct ldlm_reply      *rep = NULL;
         long                    opc;
         int                     rc;
+       struct ptlrpc_request  *req = mdt_info_req(info);
 
         static const struct req_format *intent_fmts[REINT_MAX] = {
                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
@@ -4458,6 +4481,9 @@ static int mdt_intent_open(enum ldlm_intent_flags it_opc,
 
         rc = mdt_reint_internal(info, lhc, opc);
 
+       if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+               DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc);
+
        /* Check whether the reply has been packed successfully. */
        if (mdt_info_req(info)->rq_repmsg != NULL)
                rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
@@ -4670,6 +4696,23 @@ static int mdt_intent_policy(const struct lu_env *env,
                } else {
                        rc = err_serious(-EFAULT);
                }
+       } else if (ldesc->l_resource.lr_type == LDLM_IBITS &&
+                  ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) {
+               struct ldlm_reply *rep;
+
+               /* No intent was provided but INTENT flag is set along with
+                * DOM bit, this is considered as GLIMPSE request.
+                * This logic is common for MDT and OST glimpse
+                */
+               mdt_ptlrpc_stats_update(req, IT_GLIMPSE);
+               rc = mdt_glimpse_enqueue(info, ns, lockp, flags);
+               /* Check whether the reply has been packed successfully. */
+               if (req->rq_repmsg != NULL) {
+                       rep = req_capsule_server_get(info->mti_pill,
+                                                    &RMF_DLM_REP);
+                       rep->lock_policy_res2 =
+                               ptlrpc_status_hton(rep->lock_policy_res2);
+               }
        } else {
                /* No intent was provided */
                req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
@@ -5423,6 +5466,7 @@ TGT_OST_HDL_HP(HAS_BODY | HAS_REPLY | IS_MUTABLE,
                                         OST_PUNCH,     mdt_punch_hdl,
                                                        mdt_hp_punch),
 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC,    mdt_data_sync),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek),
 };
 
 static struct tgt_handler mdt_sec_ctx_ops[] = {
@@ -5649,8 +5693,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                        m->mdt_skip_lfsck = 1;
        }
 
-       /* DoM files get IO lock at open optionally by default */
-       m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+       /* Just try to get a DoM lock by default. Otherwise, having a group
+        * lock granted, it may get blocked for a long time. */
+       m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN;
        /* DoM files are read at open and data is packed in the reply */
        m->mdt_opts.mo_dom_read_open = 1;
 
@@ -5668,6 +5713,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
        m->mdt_dir_restripe_nsonly = 1;
+       m->mdt_enable_remote_subdir_mount = 1;
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);
@@ -5723,8 +5769,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                                              LDLM_NAMESPACE_SERVER,
                                              LDLM_NAMESPACE_GREEDY,
                                              LDLM_NS_TYPE_MDT);
-       if (m->mdt_namespace == NULL)
-               GOTO(err_fini_seq, rc = -ENOMEM);
+       if (IS_ERR(m->mdt_namespace)) {
+               rc = PTR_ERR(m->mdt_namespace);
+               CERROR("%s: unable to create server namespace: rc = %d\n",
+                      obd->obd_name, rc);
+               m->mdt_namespace = NULL;
+               GOTO(err_fini_seq, rc);
+       }
 
        m->mdt_namespace->ns_lvbp = m;
        m->mdt_namespace->ns_lvbo = &mdt_lvbo;
@@ -6285,11 +6336,7 @@ static int mdt_connect_internal(const struct lu_env *env,
        if (OCD_HAS_FLAG(data, CKSUM)) {
                __u32 cksum_types = data->ocd_cksum_types;
 
-               /* The client set in ocd_cksum_types the checksum types it
-                * supports. We have to mask off the algorithms that we don't
-                * support */
-               data->ocd_cksum_types &=
-                       obd_cksum_types_supported_server(obd_name);
+               tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types);
 
                if (unlikely(data->ocd_cksum_types == 0)) {
                        CERROR("%s: Connect with checksum support but no "
@@ -6315,6 +6362,9 @@ static int mdt_connect_internal(const struct lu_env *env,
                mdt_enable_slc(mdt);
        }
 
+       if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole)
+               data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK;
+
        return 0;
 }
 
@@ -6691,18 +6741,20 @@ static int mdt_path_current(struct mdt_thread_info *info,
                            struct getinfo_fid2path *fp,
                            struct lu_fid *root_fid)
 {
-       struct mdt_device       *mdt = info->mti_mdt;
-       struct mdt_object       *mdt_obj;
-       struct link_ea_header   *leh;
-       struct link_ea_entry    *lee;
-       struct lu_name          *tmpname = &info->mti_name;
-       struct lu_fid           *tmpfid = &info->mti_tmp_fid1;
-       struct lu_buf           *buf = &info->mti_big_buf;
-       char                    *ptr;
-       int                     reclen;
-       struct linkea_data      ldata = { NULL };
-       int                     rc = 0;
-       bool                    first = true;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_name *tmpname = &info->mti_name;
+       struct lu_fid *tmpfid = &info->mti_tmp_fid1;
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct md_attr *ma = &info->mti_attr;
+       struct linkea_data ldata = { NULL };
+       bool first = true;
+       struct mdt_object *mdt_obj;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       char *ptr;
+       int reclen;
+       int rc = 0;
+
        ENTRY;
 
        /* temp buffer for path element, the buffer will be finally freed
@@ -6718,8 +6770,6 @@ static int mdt_path_current(struct mdt_thread_info *info,
        *tmpfid = fp->gf_fid = *mdt_object_fid(obj);
 
        while (!lu_fid_eq(root_fid, &fp->gf_fid)) {
-               struct lu_buf           lmv_buf;
-
                if (!lu_fid_eq(root_fid, &mdt->mdt_md_root_fid) &&
                    lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
                        GOTO(out, rc = -ENOENT);
@@ -6767,26 +6817,25 @@ static int mdt_path_current(struct mdt_thread_info *info,
                                fp->gf_linkno++;
                }
 
-               lmv_buf.lb_buf = info->mti_xattr_buf;
-               lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
                /* Check if it is slave stripes */
-               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
-                                 &lmv_buf, XATTR_NAME_LMV);
+               rc = mdt_stripe_get(info, mdt_obj, ma, XATTR_NAME_LMV);
                mdt_object_put(info->mti_env, mdt_obj);
-               if (rc > 0) {
-                       union lmv_mds_md *lmm = lmv_buf.lb_buf;
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+
+                       if (!lmv_is_sane2(lmv))
+                               GOTO(out, rc = -EBADF);
 
                        /* For slave stripes, get its master */
-                       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
+                       if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE) {
                                fp->gf_fid = *tmpfid;
                                continue;
                        }
-               } else if (rc < 0 && rc != -ENODATA) {
-                       GOTO(out, rc);
                }
 
-               rc = 0;
-
                /* Pack the name in the end of the buffer */
                ptr -= tmpname->ln_namelen;
                if (ptr - 1 <= fp->gf_u.gf_path)
@@ -6802,6 +6851,9 @@ static int mdt_path_current(struct mdt_thread_info *info,
                first = false;
        }
 
+       /* non-zero will be treated as an error */
+       rc = 0;
+
 remote_out:
        ptr++; /* skip leading / */
        memmove(fp->gf_u.gf_path, ptr,
@@ -7076,12 +7128,20 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                if (rc == 0)
                        rc = dt_ro(&env, dt);
                break;
-       case OBD_IOC_ABORT_RECOVERY:
+       case OBD_IOC_ABORT_RECOVERY: {
+               struct obd_ioctl_data *data = karg;
+
                CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
-               obd->obd_abort_recovery = 1;
+               if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT)
+                       obd->obd_abort_recov_mdt = 1;
+               else /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
+                       /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */
+                       obd->obd_abort_recovery = 1;
+
                target_stop_recovery_thread(obd);
                rc = 0;
                break;
+       }
         case OBD_IOC_CHANGELOG_REG:
         case OBD_IOC_CHANGELOG_DEREG:
         case OBD_IOC_CHANGELOG_CLEAR:
@@ -7326,10 +7386,10 @@ int mdt_cos_is_enabled(struct mdt_device *mdt)
         return mdt->mdt_opts.mo_cos != 0;
 }
 
-static struct lu_device_type_operations mdt_device_type_ops = {
-        .ldto_device_alloc = mdt_device_alloc,
-        .ldto_device_free  = mdt_device_free,
-        .ldto_device_fini  = mdt_device_fini
+static const struct lu_device_type_operations mdt_device_type_ops = {
+       .ldto_device_alloc = mdt_device_alloc,
+       .ldto_device_free  = mdt_device_free,
+       .ldto_device_fini  = mdt_device_fini
 };
 
 static struct lu_device_type mdt_device_type = {
@@ -7355,7 +7415,7 @@ static int __init mdt_init(void)
        if (rc)
                GOTO(lu_fini, rc);
 
-       rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL,
+       rc = class_register_type(&mdt_obd_device_ops, NULL, true,
                                 LUSTRE_MDT_NAME, &mdt_device_type);
        if (rc)
                GOTO(mds_fini, rc);