Whamcloud - gitweb
LU-9771 mdt: revise layout_change() to take md_layout_change
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 7bd6531..c2533e4 100644 (file)
@@ -415,7 +415,8 @@ static int mdt_statfs(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request           *req = tgt_ses_req(tsi);
        struct mdt_thread_info          *info = tsi2mdt_info(tsi);
-       struct md_device                *next = info->mti_mdt->mdt_child;
+       struct mdt_device               *mdt = info->mti_mdt;
+       struct tg_grants_data           *tgd = &mdt->mdt_lut.lut_tgd;
        struct ptlrpc_service_part      *svcpt;
        struct obd_statfs               *osfs;
        int                             rc;
@@ -440,24 +441,44 @@ static int mdt_statfs(struct tgt_session_info *tsi)
        if (!osfs)
                GOTO(out, rc = -EPROTO);
 
-       /** statfs information are cached in the mdt_device */
-       if (cfs_time_before_64(info->mti_mdt->mdt_osfs_age,
-                              cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS))) {
-               /** statfs data is too old, get up-to-date one */
-               rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs);
-               if (rc)
-                       GOTO(out, rc);
-               spin_lock(&info->mti_mdt->mdt_lock);
-               info->mti_mdt->mdt_osfs = *osfs;
-               info->mti_mdt->mdt_osfs_age = cfs_time_current_64();
-               spin_unlock(&info->mti_mdt->mdt_lock);
-       } else {
-               /** use cached statfs data */
-               spin_lock(&info->mti_mdt->mdt_lock);
-               *osfs = info->mti_mdt->mdt_osfs;
-               spin_unlock(&info->mti_mdt->mdt_lock);
-       }
+       rc = tgt_statfs_internal(tsi->tsi_env, &mdt->mdt_lut, osfs,
+                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
+                                NULL);
+       if (unlikely(rc))
+               GOTO(out, rc);
 
+       /* at least try to account for cached pages.  its still racy and
+        * might be under-reporting if clients haven't announced their
+        * caches with brw recently */
+       CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
+              " pending %llu free %llu avail %llu\n",
+              tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
+              tgd->tgd_tot_pending,
+              osfs->os_bfree << tgd->tgd_blockbits,
+              osfs->os_bavail << tgd->tgd_blockbits);
+
+       osfs->os_bavail -= min_t(u64, osfs->os_bavail,
+                                ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
+                                  osfs->os_bsize - 1) >> tgd->tgd_blockbits));
+
+       tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
+       CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
+              "%llu objects: %llu free; state %x\n",
+              osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
+              osfs->os_files, osfs->os_ffree, osfs->os_state);
+
+       if (!exp_grant_param_supp(tsi->tsi_exp) &&
+           tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) {
+               /* clients which don't support OBD_CONNECT_GRANT_PARAM
+                * should not see a block size > page size, otherwise
+                * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
+                * block size which is the biggest block size known to work
+                * with all client's page size. */
+               osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bfree  <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
+               osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
+       }
        if (rc == 0)
                mdt_counter_incr(req, LPROC_MDT_STATFS);
 out:
@@ -465,6 +486,41 @@ out:
        RETURN(rc);
 }
 
+/**
+ * Pack size attributes into the reply.
+ */
+int mdt_pack_size2body(struct mdt_thread_info *info,
+                       const struct lu_fid *fid, bool dom_lock)
+{
+       struct mdt_body *b;
+       struct md_attr *ma = &info->mti_attr;
+       int dom_stripe;
+
+       ENTRY;
+
+       LASSERT(ma->ma_attr.la_valid & LA_MODE);
+
+       if (!S_ISREG(ma->ma_attr.la_mode) ||
+           !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
+               RETURN(-ENODATA);
+
+       dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm);
+       /* no DoM stripe, no size in reply */
+       if (dom_stripe == LMM_NO_DOM)
+               RETURN(-ENOENT);
+
+       /* no DoM lock, no size in reply */
+       if (!dom_lock)
+               RETURN(0);
+
+       /* Either DoM lock exists or LMM has only DoM stripe then
+        * return size on body. */
+       b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+
+       mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
+       RETURN(0);
+}
+
 #ifdef CONFIG_FS_POSIX_ACL
 /*
  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
@@ -676,6 +732,8 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                        else
                                b->mbo_blocks = 1;
                        b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+               } else if (info->mti_som_valid) { /* som is valid */
+                       b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
                }
        }
 
@@ -936,6 +994,9 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
                rc = mo_attr_get(env, next, ma);
                if (rc)
                        GOTO(out, rc);
+
+               if (S_ISREG(mode))
+                       (void) mdt_get_som(info, o, &ma->ma_attr);
                ma->ma_valid |= MA_INODE;
        }
 
@@ -1280,32 +1341,33 @@ out:
  *
  * \param[in] info     thread environment
  * \param[in] obj      object
- * \param[in] layout   layout intent
- * \param[in] buf      buffer containing client's lovea, could be empty
+ * \param[in] layout   layout change descriptor
  *
  * \retval 0   on success
  * \retval < 0 error code
  */
 static int mdt_layout_change(struct mdt_thread_info *info,
                             struct mdt_object *obj,
-                            struct layout_intent *layout,
-                            const struct lu_buf *buf)
+                            struct md_layout_change *layout)
 {
        struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+       struct layout_intent *intent = layout->mlc_intent;
        int rc;
        ENTRY;
 
        CDEBUG(D_INFO, "got layout change request from client: "
-              "opc:%u flags:%#x extent[%#llx,%#llx)\n",
-              layout->li_opc, layout->li_flags,
-              layout->li_start, layout->li_end);
-       if (layout->li_start >= layout->li_end) {
-               CERROR("Recieved an invalid layout change range [%llu, %llu) "
-                      "for "DFID"\n", layout->li_start, layout->li_end,
-                      PFID(mdt_object_fid(obj)));
+              "opc:%u flags:%#x extent "DEXT"\n",
+              intent->li_opc, intent->li_flags, PEXT(&intent->li_extent));
+
+       if (intent->li_extent.e_start >= intent->li_extent.e_end) {
+               CERROR(DFID ":invalid range of layout change "DEXT"\n",
+                      PFID(mdt_object_fid(obj)), PEXT(&intent->li_extent));
                RETURN(-EINVAL);
        }
 
+       if (!mdt_object_exists(obj))
+               GOTO(out, rc = -ENOENT);
+
        if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
                GOTO(out, rc = -EINVAL);
 
@@ -1316,13 +1378,11 @@ static int mdt_layout_change(struct mdt_thread_info *info,
 
        /* take layout lock to prepare layout change */
        mdt_lock_reg_init(lh, LCK_EX);
-       rc = mdt_object_lock(info, obj, lh,
-                            MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT);
        if (rc)
                GOTO(out, rc);
 
-       rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout,
-                             buf);
+       rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
 
        mdt_object_unlock(info, obj, lh, 1);
 out:
@@ -1684,12 +1744,16 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                /* layout lock must be granted in a best-effort way
                 * for IT operations */
                LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
-               if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
-                   exp_connect_layout(info->mti_exp) &&
-                   S_ISREG(lu_object_attr(&child->mot_obj)) &&
+               if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
                    !mdt_object_remote(child) && ldlm_rep != NULL) {
-                       /* try to grant layout lock for regular file. */
-                       try_bits = MDS_INODELOCK_LAYOUT;
+                       if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+                           exp_connect_layout(info->mti_exp)) {
+                               /* try to grant layout lock for regular file. */
+                               try_bits = MDS_INODELOCK_LAYOUT;
+                       }
+                       /* Acquire DOM lock in advance for data-on-mdt file */
+                       if (child != parent)
+                               try_bits |= MDS_INODELOCK_DOM;
                }
 
                if (try_bits != 0) {
@@ -1724,6 +1788,27 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                         "Lock res_id: "DLDLMRES", fid: "DFID"\n",
                         PLDLMRES(lock->l_resource),
                         PFID(mdt_object_fid(child)));
+
+               if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
+                   mdt_object_exists(child) && !mdt_object_remote(child) &&
+                   child != parent) {
+                       LDLM_LOCK_PUT(lock);
+                       mdt_object_put(info->mti_env, child);
+                       /* NB: call the mdt_pack_size2body always after
+                        * mdt_object_put(), that is why this speacial
+                        * exit path is used. */
+                       rc = mdt_pack_size2body(info, child_fid,
+                                               child_bits & MDS_INODELOCK_DOM);
+                       if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
+                               /* DOM lock was taken in advance but this is
+                                * not DoM file. Drop the lock. */
+                               lock_res_and_lock(lock);
+                               ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
+                               unlock_res_and_lock(lock);
+                       }
+
+                       GOTO(out_parent, rc = 0);
+               }
         }
         if (lock)
                 LDLM_LOCK_PUT(lock);
@@ -3201,6 +3286,7 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_opdata = 0;
        info->mti_big_lmm_used = 0;
        info->mti_big_acl_used = 0;
+       info->mti_som_valid = 0;
 
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
@@ -3657,10 +3743,10 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                             __u64 flags)
 {
        struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT];
-       struct layout_intent *layout;
+       struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP };
+       struct layout_intent *intent;
        struct lu_fid *fid;
        struct mdt_object *obj = NULL;
-       bool layout_change = false;
        int layout_size = 0;
        int rc = 0;
        ENTRY;
@@ -3671,14 +3757,15 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                RETURN(-EINVAL);
        }
 
-       layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
-       if (layout == NULL)
+       intent = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
+       if (intent == NULL)
                RETURN(-EPROTO);
 
-       switch (layout->li_opc) {
+       switch (intent->li_opc) {
        case LAYOUT_INTENT_TRUNC:
        case LAYOUT_INTENT_WRITE:
-               layout_change = true;
+               layout.mlc_opc = MD_LAYOUT_WRITE;
+               layout.mlc_intent = intent;
                break;
        case LAYOUT_INTENT_ACCESS:
                break;
@@ -3687,12 +3774,12 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        case LAYOUT_INTENT_RELEASE:
        case LAYOUT_INTENT_RESTORE:
                CERROR("%s: Unsupported layout intent opc %d\n",
-                      mdt_obd_name(info->mti_mdt), layout->li_opc);
+                      mdt_obd_name(info->mti_mdt), intent->li_opc);
                rc = -ENOTSUPP;
                break;
        default:
                CERROR("%s: Unknown layout intent opc %d\n",
-                      mdt_obd_name(info->mti_mdt), layout->li_opc);
+                      mdt_obd_name(info->mti_mdt), intent->li_opc);
                rc = -EINVAL;
                break;
        }
@@ -3730,8 +3817,8 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                GOTO(out_obj, rc);
 
 
-       if (layout_change) {
-               struct lu_buf *buf = &info->mti_buf;
+       if (layout.mlc_opc != MD_LAYOUT_NOP) {
+               struct lu_buf *buf = &layout.mlc_buf;
 
                /**
                 * mdt_layout_change is a reint operation, when the request
@@ -3775,7 +3862,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                 * lovea, then it's a replay of the layout intent write
                 * RPC.
                 */
-               rc = mdt_layout_change(info, obj, layout, buf);
+               rc = mdt_layout_change(info, obj, &layout);
                if (rc)
                        GOTO(out_obj, rc);
        }
@@ -3983,6 +4070,18 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
        RETURN(rc);
 }
 
+static void mdt_ptlrpc_stats_update(struct ptlrpc_request *req,
+                                   enum ldlm_intent_flags it_opc)
+{
+       struct lprocfs_stats *srv_stats = ptlrpc_req2svc(req)->srv_stats;
+
+       /* update stats when IT code is known */
+       if (srv_stats != NULL)
+               lprocfs_counter_incr(srv_stats,
+                               PTLRPC_LAST_CNTR + (it_opc == IT_GLIMPSE ?
+                               LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE));
+}
+
 static int mdt_intent_policy(struct ldlm_namespace *ns,
                             struct ldlm_lock **lockp, void *req_cookie,
                             enum ldlm_mode mode, __u64 flags, void *data)
@@ -4009,17 +4108,18 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
 
        if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
                req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
-                it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
-                if (it != NULL) {
-                        rc = mdt_intent_opc(it->opc, info, lockp, flags);
-                        if (rc == 0)
-                                rc = ELDLM_OK;
-
-                        /* Lock without inodebits makes no sense and will oops
-                         * later in ldlm. Let's check it now to see if we have
-                         * ibits corrupted somewhere in mdt_intent_opc().
-                         * The case for client miss to set ibits has been
-                         * processed by others. */
+               it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
+               if (it != NULL) {
+                       mdt_ptlrpc_stats_update(req, it->opc);
+                       rc = mdt_intent_opc(it->opc, info, lockp, flags);
+                       if (rc == 0)
+                               rc = ELDLM_OK;
+
+                       /* Lock without inodebits makes no sense and will oops
+                        * later in ldlm. Let's check it now to see if we have
+                        * ibits corrupted somewhere in mdt_intent_opc().
+                        * The case for client miss to set ibits has been
+                        * processed by others. */
                        LASSERT(ergo(ldesc->l_resource.lr_type == LDLM_IBITS,
                                ldesc->l_policy_data.l_inodebits.bits != 0));
                } else {
@@ -4916,67 +5016,71 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 static int mdt_postrecov(const struct lu_env *, struct mdt_device *);
 
 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
-                     struct lu_device_type *ldt, struct lustre_cfg *cfg)
-{
-        struct mdt_thread_info    *info;
-        struct obd_device         *obd;
-        const char                *dev = lustre_cfg_string(cfg, 0);
-        const char                *num = lustre_cfg_string(cfg, 2);
-        struct lustre_mount_info  *lmi = NULL;
-        struct lustre_sb_info     *lsi;
-        struct lu_site            *s;
-       struct seq_server_site    *ss_site;
-        const char                *identity_upcall = "NONE";
-        struct md_device          *next;
-        int                        rc;
-       long                       node_id;
-        mntopt_t                   mntopts;
-        ENTRY;
+                    struct lu_device_type *ldt, struct lustre_cfg *cfg)
+{
+       const struct dt_device_param *dt_conf;
+       struct mdt_thread_info *info;
+       struct obd_device *obd;
+       const char *dev = lustre_cfg_string(cfg, 0);
+       const char *num = lustre_cfg_string(cfg, 2);
+       struct tg_grants_data *tgd = &m->mdt_lut.lut_tgd;
+       struct lustre_mount_info *lmi = NULL;
+       struct lustre_sb_info *lsi;
+       struct lu_site *s;
+       struct seq_server_site *ss_site;
+       const char *identity_upcall = "NONE";
+       struct md_device *next;
+       int rc;
+       long node_id;
+       mntopt_t mntopts;
+       ENTRY;
 
        lu_device_init(&m->mdt_lu_dev, ldt);
-        /*
-         * Environment (env) might be missing mdt_thread_key values at that
-         * point, if device is allocated when mdt_thread_key is in QUIESCENT
-         * mode.
-         *
-         * Usually device allocation path doesn't use module key values, but
-         * mdt has to do a lot of work here, so allocate key value.
-         */
-        rc = lu_env_refill((struct lu_env *)env);
-        if (rc != 0)
-                RETURN(rc);
+       /*
+        * Environment (env) might be missing mdt_thread_key values at that
+        * point, if device is allocated when mdt_thread_key is in QUIESCENT
+        * mode.
+        *
+        * Usually device allocation path doesn't use module key values, but
+        * mdt has to do a lot of work here, so allocate key value.
+        */
+       rc = lu_env_refill((struct lu_env *)env);
+       if (rc != 0)
+               RETURN(rc);
 
-        info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        LASSERT(info != NULL);
+       info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+       LASSERT(info != NULL);
 
-        obd = class_name2obd(dev);
-        LASSERT(obd != NULL);
+       obd = class_name2obd(dev);
+       LASSERT(obd != NULL);
 
-        m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
+       m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
        m->mdt_opts.mo_evict_tgt_nids = 1;
-        m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
+       m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
 
        lmi = server_get_mount(dev);
-        if (lmi == NULL) {
-                CERROR("Cannot get mount info for %s!\n", dev);
-                RETURN(-EFAULT);
-        } else {
-                lsi = s2lsi(lmi->lmi_sb);
-                /* CMD is supported only in IAM mode */
-                LASSERT(num);
-                node_id = simple_strtol(num, NULL, 10);
+       if (lmi == NULL) {
+               CERROR("Cannot get mount info for %s!\n", dev);
+               RETURN(-EFAULT);
+       } else {
+               lsi = s2lsi(lmi->lmi_sb);
+               /* CMD is supported only in IAM mode */
+               LASSERT(num);
+               node_id = simple_strtol(num, NULL, 10);
                obd->u.obt.obt_magic = OBT_MAGIC;
                if (lsi->lsi_lmd != NULL &&
                    lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
                        m->mdt_skip_lfsck = 1;
        }
 
+       /* DoM files get IO lock at open by default */
+       m->mdt_opts.mo_dom_lock = 1;
+
        m->mdt_squash.rsi_uid = 0;
        m->mdt_squash.rsi_gid = 0;
        INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
        init_rwsem(&m->mdt_squash.rsi_sem);
        spin_lock_init(&m->mdt_lock);
-       m->mdt_osfs_age = cfs_time_shift_64(-1000);
        m->mdt_enable_remote_dir = 0;
        m->mdt_enable_remote_dir_gid = 0;
 
@@ -5001,16 +5105,16 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        s->ld_seq_site = ss_site;
        ss_site->ss_lu = s;
 
-        /* set server index */
+       /* set server index */
        ss_site->ss_node_id = node_id;
 
        /* failover is the default
         * FIXME: we do not failout mds0/mgs, which may cause some problems.
         * assumed whose ss_node_id == 0 XXX
         * */
-        obd->obd_replayable = 1;
-        /* No connection accepted until configurations will finish */
-        obd->obd_no_conn = 1;
+       obd->obd_replayable = 1;
+       /* No connection accepted until configurations will finish */
+       obd->obd_no_conn = 1;
 
        if (cfg->lcfg_bufcount > 4 && LUSTRE_CFG_BUFLEN(cfg, 4) > 0) {
                char *str = lustre_cfg_string(cfg, 4);
@@ -5030,25 +5134,25 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
        snprintf(info->mti_u.ns_name, sizeof(info->mti_u.ns_name), "%s-%s",
                 LUSTRE_MDT_NAME, obd->obd_uuid.uuid);
-        m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
-                                              LDLM_NAMESPACE_SERVER,
-                                              LDLM_NAMESPACE_GREEDY,
-                                              LDLM_NS_TYPE_MDT);
-        if (m->mdt_namespace == NULL)
-                GOTO(err_fini_seq, rc = -ENOMEM);
+       m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
+                                             LDLM_NAMESPACE_SERVER,
+                                             LDLM_NAMESPACE_GREEDY,
+                                             LDLM_NS_TYPE_MDT);
+       if (m->mdt_namespace == NULL)
+               GOTO(err_fini_seq, rc = -ENOMEM);
 
        m->mdt_namespace->ns_lvbp = m;
        m->mdt_namespace->ns_lvbo = &mdt_lvbo;
 
-        ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
-        /* set obd_namespace for compatibility with old code */
-        obd->obd_namespace = m->mdt_namespace;
+       ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
+       /* set obd_namespace for compatibility with old code */
+       obd->obd_namespace = m->mdt_namespace;
 
        rc = mdt_hsm_cdt_init(m);
        if (rc != 0) {
                CERROR("%s: error initializing coordinator, rc %d\n",
                       mdt_obd_name(m), rc);
-                GOTO(err_free_ns, rc);
+               GOTO(err_free_ns, rc);
        }
 
        rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice,
@@ -5057,31 +5161,37 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (rc)
                GOTO(err_free_hsm, rc);
 
+       /* Amount of available space excluded from granting and reserved
+        * for metadata. It is in percentage and 50% is default value. */
+       tgd->tgd_reserved_pcnt = 50;
+
+       if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits))
+               m->mdt_brw_size = 1U << tgd->tgd_blockbits;
+       else
+               m->mdt_brw_size = ONE_MB_BRW_SIZE;
+
        rc = mdt_fs_setup(env, m, obd, lsi);
        if (rc)
                GOTO(err_tgt, rc);
 
        tgt_adapt_sptlrpc_conf(&m->mdt_lut);
 
-        next = m->mdt_child;
-        rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
-                                         &mntopts);
-        if (rc)
-               GOTO(err_fs_cleanup, rc);
+       next = m->mdt_child;
+       dt_conf = next->md_ops->mdo_dtconf_get(env, next);
 
-        if (mntopts & MNTOPT_USERXATTR)
-                m->mdt_opts.mo_user_xattr = 1;
-        else
-                m->mdt_opts.mo_user_xattr = 0;
+       mntopts = dt_conf->ddp_mntopts;
 
-       rc = next->md_ops->mdo_maxeasize_get(env, next, &m->mdt_max_ea_size);
-       if (rc)
-               GOTO(err_fs_cleanup, rc);
+       if (mntopts & MNTOPT_USERXATTR)
+               m->mdt_opts.mo_user_xattr = 1;
+       else
+               m->mdt_opts.mo_user_xattr = 0;
 
-        if (mntopts & MNTOPT_ACL)
-                m->mdt_opts.mo_acl = 1;
-        else
-                m->mdt_opts.mo_acl = 0;
+       m->mdt_max_ea_size = dt_conf->ddp_max_ea_size;
+
+       if (mntopts & MNTOPT_ACL)
+               m->mdt_opts.mo_acl = 1;
+       else
+               m->mdt_opts.mo_acl = 0;
 
        /* XXX: to support suppgid for ACL, we enable identity_upcall
         * by default, otherwise, maybe got unexpected -EACCESS. */
@@ -5097,11 +5207,11 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                GOTO(err_fs_cleanup, rc);
        }
 
-        rc = mdt_procfs_init(m, dev);
-        if (rc) {
-                CERROR("Can't init MDT lprocfs, rc %d\n", rc);
-                GOTO(err_recovery, rc);
-        }
+       rc = mdt_procfs_init(m, dev);
+       if (rc) {
+               CERROR("Can't init MDT lprocfs, rc %d\n", rc);
+               GOTO(err_recovery, rc);
+       }
 
        rc = mdt_quota_init(env, m, cfg);
        if (rc)
@@ -5117,13 +5227,13 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         * when the whole stack is complete and ready
         * to serve the requests */
 
-        /* Reduce the initial timeout on an MDS because it doesn't need such
-         * a long timeout as an OST does. Adaptive timeouts will adjust this
-         * value appropriately. */
-        if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
-                ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
+       /* Reduce the initial timeout on an MDS because it doesn't need such
+        * a long timeout as an OST does. Adaptive timeouts will adjust this
+        * value appropriately. */
+       if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
+               ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
 
-        RETURN(0);
+       RETURN(0);
 err_procfs:
        mdt_procfs_fini(m);
 err_recovery:
@@ -5461,7 +5571,8 @@ static int mdt_connect_internal(const struct lu_env *env,
                data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
 
        if (OCD_HAS_FLAG(data, BRW_SIZE)) {
-               data->ocd_brw_size = min(data->ocd_brw_size, MD_MAX_BRW_SIZE);
+               data->ocd_brw_size = min(data->ocd_brw_size,
+                                        mdt->mdt_brw_size);
                if (data->ocd_brw_size == 0) {
                        CERROR("%s: cli %s/%p ocd_connect_flags: %#llx "
                               "ocd_version: %x ocd_grant: %d ocd_index: %u "
@@ -5475,9 +5586,29 @@ static int mdt_connect_internal(const struct lu_env *env,
                }
        }
 
-       if (OCD_HAS_FLAG(data, GRANT))
-               data->ocd_grant = mdt_grant_connect(env, exp, data->ocd_grant,
-                                                   !reconnect);
+       if (OCD_HAS_FLAG(data, GRANT_PARAM)) {
+               struct dt_device_param *ddp = &mdt->mdt_lut.lut_dt_conf;
+
+               /* client is reporting its page size, for future use */
+               exp->exp_target_data.ted_pagebits = data->ocd_grant_blkbits;
+               data->ocd_grant_blkbits  = mdt->mdt_lut.lut_tgd.tgd_blockbits;
+               /* ddp_inodespace may not be power-of-two value, eg. for ldiskfs
+                * it's LDISKFS_DIR_REC_LEN(20) = 28. */
+               data->ocd_grant_inobits = fls(ddp->ddp_inodespace - 1);
+               /* ocd_grant_tax_kb is in 1K byte blocks */
+               data->ocd_grant_tax_kb = ddp->ddp_extent_tax >> 10;
+               data->ocd_grant_max_blks = ddp->ddp_max_extent_blks;
+       }
+
+       if (OCD_HAS_FLAG(data, GRANT)) {
+               /* Save connect_data we have so far because tgt_grant_connect()
+                * uses it to calculate grant. */
+               exp->exp_connect_data = *data;
+               tgt_grant_connect(env, exp, data, !reconnect);
+       }
+
+       if (OCD_HAS_FLAG(data, MAXBYTES))
+               data->ocd_maxbytes = mdt->mdt_lut.lut_dt_conf.ddp_maxbytes;
 
        /* NB: Disregard the rule against updating
         * exp_connect_data.ocd_connect_flags in this case, since
@@ -5672,11 +5803,15 @@ static inline void mdt_disable_slc(struct mdt_device *mdt)
 
 static int mdt_obd_disconnect(struct obd_export *exp)
 {
-        int rc;
-        ENTRY;
+       int rc;
+
+       ENTRY;
 
-        LASSERT(exp);
-        class_export_get(exp);
+       LASSERT(exp);
+       class_export_get(exp);
+
+       if (!(exp->exp_flags & OBD_OPT_FORCE))
+               tgt_grant_sanity_check(exp->exp_obd, __func__);
 
        if ((exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS) &&
            !(exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)) {
@@ -5690,6 +5825,8 @@ static int mdt_obd_disconnect(struct obd_export *exp)
        if (rc != 0)
                CDEBUG(D_IOCTL, "server disconnect error: rc = %d\n", rc);
 
+       tgt_grant_discard(exp);
+
        rc = mdt_export_cleanup(exp);
        nodemap_del_member(exp);
        class_export_put(exp);
@@ -5860,6 +5997,17 @@ static int mdt_destroy_export(struct obd_export *exp)
        LASSERT(list_empty(&exp->exp_outstanding_replies));
        LASSERT(list_empty(&exp->exp_mdt_data.med_open_head));
 
+       /*
+        * discard grants once we're sure no more
+        * interaction with the client is possible
+        */
+       tgt_grant_discard(exp);
+       if (exp_connect_flags(exp) & OBD_CONNECT_GRANT)
+               exp->exp_obd->u.obt.obt_lut->lut_tgd.tgd_tot_granted_clients--;
+
+       if (!(exp->exp_flags & OBD_OPT_FORCE))
+               tgt_grant_sanity_check(exp->exp_obd, __func__);
+
        RETURN(0);
 }