Whamcloud - gitweb
LU-9325 mdt: replace simple_strtol() with kstrtol()
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 2cc8151..be9797b 100644 (file)
@@ -58,6 +58,7 @@
 #include <uapi/linux/lustre/lustre_param.h>
 #include <lustre_quota.h>
 #include <lustre_swab.h>
+#include <lustre_lmv.h>
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_barrier.h>
@@ -509,6 +510,58 @@ out:
        RETURN(rc);
 }
 
+__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
+{
+       struct lov_comp_md_v1 *comp_v1;
+       struct lov_mds_md *v1;
+       __u32 off;
+       __u32 dom_stripesize = 0;
+       int i;
+       bool has_ost_stripes = false;
+
+       ENTRY;
+
+       if (is_dom_only)
+               *is_dom_only = 0;
+
+       if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
+               RETURN(0);
+
+       comp_v1 = (struct lov_comp_md_v1 *)lmm;
+       off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
+       v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+       /* Fast check for DoM entry with no mirroring, should be the first */
+       if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
+           lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
+               RETURN(0);
+
+       /* check all entries otherwise */
+       for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
+               struct lov_comp_md_entry_v1 *lcme;
+
+               lcme = &comp_v1->lcm_entries[i];
+               if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
+                       continue;
+
+               off = le32_to_cpu(lcme->lcme_offset);
+               v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+               if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
+                   LOV_PATTERN_MDT)
+                       dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
+               else
+                       has_ost_stripes = true;
+
+               if (dom_stripesize && has_ost_stripes)
+                       RETURN(dom_stripesize);
+       }
+       /* DoM-only case exits here */
+       if (is_dom_only && dom_stripesize)
+               *is_dom_only = 1;
+       RETURN(dom_stripesize);
+}
+
 /**
  * Pack size attributes into the reply.
  */
@@ -517,7 +570,7 @@ int mdt_pack_size2body(struct mdt_thread_info *info,
 {
        struct mdt_body *b;
        struct md_attr *ma = &info->mti_attr;
-       int dom_stripe;
+       __u32 dom_stripe;
        bool dom_lock = false;
 
        ENTRY;
@@ -528,9 +581,9 @@ int mdt_pack_size2body(struct mdt_thread_info *info,
            !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
                RETURN(-ENODATA);
 
-       dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm);
+       dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
        /* no DoM stripe, no size in reply */
-       if (dom_stripe == LMM_NO_DOM)
+       if (!dom_stripe)
                RETURN(-ENOENT);
 
        if (lustre_handle_is_used(lh)) {
@@ -598,8 +651,9 @@ again:
                            buf->lb_buf != info->mti_big_acl) {
                                if (info->mti_big_acl == NULL) {
                                        info->mti_big_aclsize =
-                                                       MIN(mdt->mdt_max_ea_size,
-                                                           XATTR_SIZE_MAX);
+                                                       min_t(unsigned int,
+                                                             mdt->mdt_max_ea_size,
+                                                             XATTR_SIZE_MAX);
                                        OBD_ALLOC_LARGE(info->mti_big_acl,
                                                        info->mti_big_aclsize);
                                        if (info->mti_big_acl == NULL) {
@@ -726,6 +780,10 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                b->mbo_ctime = attr->la_ctime;
                b->mbo_valid |= OBD_MD_FLCTIME;
        }
+       if (attr->la_valid & LA_BTIME) {
+               b->mbo_btime = attr->la_btime;
+               b->mbo_valid |= OBD_MD_FLBTIME;
+       }
        if (attr->la_valid & LA_FLAGS) {
                b->mbo_flags = attr->la_flags;
                b->mbo_valid |= OBD_MD_FLFLAGS;
@@ -922,8 +980,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(rc);
 }
 
-int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                  struct md_attr *ma, const char *name)
+int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                    struct md_attr *ma, const char *name)
 {
        struct md_object *next = mdt_object_child(o);
        struct lu_buf    *buf = &info->mti_buf;
@@ -999,6 +1057,40 @@ got:
        return rc;
 }
 
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                  struct md_attr *ma, const char *name)
+{
+       int rc;
+
+       if (!info->mti_big_lmm) {
+               OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
+               if (!info->mti_big_lmm)
+                       return -ENOMEM;
+               info->mti_big_lmmsize = PAGE_SIZE;
+       }
+
+       if (strcmp(name, XATTR_NAME_LOV) == 0) {
+               ma->ma_lmm = info->mti_big_lmm;
+               ma->ma_lmm_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LOV;
+       } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               ma->ma_lmv = info->mti_big_lmm;
+               ma->ma_lmv_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LMV;
+       } else {
+               LBUG();
+       }
+
+       LASSERT(!info->mti_big_lmm_used);
+       rc = __mdt_stripe_get(info, o, ma, name);
+       /* since big_lmm is always used here, clear 'used' flag to avoid
+        * assertion in mdt_big_xattr_get().
+        */
+       info->mti_big_lmm_used = 0;
+
+       return rc;
+}
+
 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
                      struct lu_fid *pfid)
 {
@@ -1046,6 +1138,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(0);
 }
 
+int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct lu_fid *pfid, struct lu_name *lname)
+{
+       struct lu_buf *buf = &info->mti_buf;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       int reclen;
+       int rc;
+
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(info->mti_xattr_buf);
+       rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
+                         XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+               buf->lb_buf = info->mti_big_lmm;
+               buf->lb_len = info->mti_big_lmmsize;
+       }
+       if (rc < 0)
+               return rc;
+
+       if (rc < sizeof(*leh)) {
+               CERROR("short LinkEA on "DFID": rc = %d\n",
+                      PFID(mdt_object_fid(o)), rc);
+               return -ENODATA;
+       }
+
+       leh = (struct link_ea_header *)buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_len = __swab64(leh->leh_len);
+       }
+       if (leh->leh_magic != LINK_EA_MAGIC)
+               return -EINVAL;
+
+       if (leh->leh_reccount == 0)
+               return -ENODATA;
+
+       linkea_entry_unpack(lee, &reclen, lname, pfid);
+
+       return 0;
+}
+
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma)
 {
@@ -1083,19 +1220,19 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        }
 
        if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
                if (rc)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV_DEF && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
@@ -1145,19 +1282,21 @@ out:
 }
 
 static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o, int ma_need)
+                               struct mdt_object *o, int ma_need)
 {
-       struct md_object        *next = mdt_object_child(o);
-       const struct mdt_body   *reqbody = info->mti_body;
-       struct ptlrpc_request   *req = mdt_info_req(info);
-       struct md_attr          *ma = &info->mti_attr;
-       struct lu_attr          *la = &ma->ma_attr;
-       struct req_capsule      *pill = info->mti_pill;
-       const struct lu_env     *env = info->mti_env;
-       struct mdt_body         *repbody;
-       struct lu_buf           *buffer = &info->mti_buf;
-       struct obd_export       *exp = info->mti_exp;
-       int                      rc;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct md_object *next = mdt_object_child(o);
+       const struct mdt_body *reqbody = info->mti_body;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_attr *la = &ma->ma_attr;
+       struct req_capsule *pill = info->mti_pill;
+       const struct lu_env *env = info->mti_env;
+       struct mdt_body *repbody;
+       struct lu_buf *buffer = &info->mti_buf;
+       struct obd_export *exp = info->mti_exp;
+       int rc;
+
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
@@ -1244,13 +1383,13 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                }
        }
 
-        if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+       if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
            reqbody->mbo_valid & OBD_MD_FLDIREA  &&
-            lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
-                /* get default stripe info for this dir. */
-                ma->ma_need |= MA_LOV_DEF;
-        }
-        ma->ma_need |= ma_need;
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
+               /* get default stripe info for this dir. */
+               ma->ma_need |= MA_LOV_DEF;
+       }
+       ma->ma_need |= ma_need;
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
@@ -1269,22 +1408,27 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-        if (likely(ma->ma_valid & MA_INODE))
-                mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
-        else
-                RETURN(-EFAULT);
+       if (unlikely(!(ma->ma_valid & MA_INODE)))
+               RETURN(-EFAULT);
+
+       mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
+
+       if (mdt_body_has_lov(la, reqbody)) {
+               u32 stripe_count = 1;
 
-        if (mdt_body_has_lov(la, reqbody)) {
-                if (ma->ma_valid & MA_LOV) {
-                        LASSERT(ma->ma_lmm_size);
+               if (ma->ma_valid & MA_LOV) {
+                       LASSERT(ma->ma_lmm_size);
                        repbody->mbo_eadatasize = ma->ma_lmm_size;
                        if (S_ISDIR(la->la_mode))
                                repbody->mbo_valid |= OBD_MD_FLDIREA;
                        else
                                repbody->mbo_valid |= OBD_MD_FLEASIZE;
                        mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
-                }
+               }
                if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+                       u32 magic = le32_to_cpu(lmv->lmv_magic);
+
                        /* Return -ENOTSUPP for old client */
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
@@ -1293,6 +1437,13 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
+
+                       stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+                       if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
+                               mdt_restripe_migrate_add(info, o);
+                       else if (magic == LMV_MAGIC_V1 &&
+                                lmv_is_restriping(lmv))
+                               mdt_restripe_update_add(info, o);
                }
                if (ma->ma_valid & MA_LMV_DEF) {
                        /* Return -ENOTSUPP for old client */
@@ -1309,6 +1460,18 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
                }
+               CDEBUG(D_VFSTRACE,
+                      "dirent count %llu stripe count %u MDT count %d\n",
+                      ma->ma_attr.la_dirent_count, stripe_count,
+                      atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+               if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
+                   ma->ma_attr.la_dirent_count >
+                       mdt->mdt_restriper.mdr_dir_split_count &&
+                   !fid_is_root(mdt_object_fid(o)) &&
+                   mdt->mdt_enable_dir_auto_split &&
+                   !o->mot_restriping &&
+                   stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
+                       mdt_auto_split_add(info, o);
        } else if (S_ISLNK(la->la_mode) &&
                   reqbody->mbo_valid & OBD_MD_LINKNAME) {
                buffer->lb_buf = ma->ma_lmm;
@@ -1346,8 +1509,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                               print_limit < rc ? "..." : "", print_limit,
                               (char *)ma->ma_lmm + rc - print_limit, rc);
                        rc = 0;
-                }
-        }
+               }
+       }
 
        if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
                repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
@@ -1369,10 +1532,10 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 #endif
 
 out:
-        if (rc == 0)
+       if (rc == 0)
                mdt_counter_incr(req, LPROC_MDT_GETATTR);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_getattr(struct tgt_session_info *tsi)
@@ -1497,10 +1660,15 @@ int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
 
        if (rc > 0) {
                /* not resent */
+               __u64 lockpart = MDS_INODELOCK_LAYOUT;
+
+               /* take layout lock to prepare layout change */
+               if (layout->mlc_opc == MD_LAYOUT_WRITE)
+                       lockpart |= MDS_INODELOCK_UPDATE;
+
                mdt_lock_handle_init(lhc);
                mdt_lock_reg_init(lhc, LCK_EX);
-               rc = mdt_reint_object_lock(info, obj, lhc, MDS_INODELOCK_LAYOUT,
-                                          false);
+               rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
                if (rc)
                        RETURN(rc);
        }
@@ -2267,6 +2435,8 @@ static int mdt_set_info(struct tgt_session_info *tsi)
                        __swab32s(&cs->cs_id);
                }
 
+               if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
+                       RETURN(-EACCES);
                rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
                                   vallen, val, NULL);
        } else if (KEY_IS(KEY_EVICT_BY_NID)) {
@@ -2316,17 +2486,17 @@ static int mdt_readpage(struct tgt_session_info *tsi)
                                exp_max_brw_size(tsi->tsi_exp));
        rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
                          PAGE_SHIFT;
-        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
-        if (rdpg->rp_pages == NULL)
-                RETURN(-ENOMEM);
+       OBD_ALLOC_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
+       if (rdpg->rp_pages == NULL)
+               RETURN(-ENOMEM);
 
-        for (i = 0; i < rdpg->rp_npages; ++i) {
+       for (i = 0; i < rdpg->rp_npages; ++i) {
                rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
-                if (rdpg->rp_pages[i] == NULL)
-                        GOTO(free_rdpg, rc = -ENOMEM);
-        }
+               if (rdpg->rp_pages[i] == NULL)
+                       GOTO(free_rdpg, rc = -ENOMEM);
+       }
 
-        /* call lower layers to fill allocated pages with directory data */
+       /* call lower layers to fill allocated pages with directory data */
        rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
        if (rc < 0)
                GOTO(free_rdpg, rc);
@@ -2340,7 +2510,7 @@ free_rdpg:
        for (i = 0; i < rdpg->rp_npages; i++)
                if (rdpg->rp_pages[i] != NULL)
                        __free_page(rdpg->rp_pages[i]);
-       OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+       OBD_FREE_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
                RETURN(0);
@@ -2485,14 +2655,11 @@ out_shrink:
 
        /*
         * Data-on-MDT optimization - read data along with OPEN and return it
-        * in reply. Do that only if we have both DOM and LAYOUT locks.
+        * in reply when possible.
         */
-       if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) &&
-           info->mti_attr.ma_lmm != NULL &&
-           mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) {
+       if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
                rc = mdt_dom_read_on_open(info, info->mti_mdt,
                                          &lhc->mlh_reg_lh);
-       }
 
        return rc;
 }
@@ -2699,17 +2866,17 @@ put:
  */
 static int mdt_quotactl(struct tgt_session_info *tsi)
 {
-       struct obd_export       *exp  = tsi->tsi_exp;
-       struct req_capsule      *pill = tsi->tsi_pill;
-       struct obd_quotactl     *oqctl, *repoqc;
-       int                      id, rc;
-       struct mdt_device       *mdt = mdt_exp2dev(exp);
-       struct lu_device        *qmt = mdt->mdt_qmt_dev;
-       struct lu_nodemap       *nodemap;
+       struct obd_export *exp  = tsi->tsi_exp;
+       struct req_capsule *pill = tsi->tsi_pill;
+       struct obd_quotactl *oqctl, *repoqc;
+       int id, rc;
+       struct mdt_device *mdt = mdt_exp2dev(exp);
+       struct lu_device *qmt = mdt->mdt_qmt_dev;
+       struct lu_nodemap *nodemap;
        ENTRY;
 
        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
-       if (oqctl == NULL)
+       if (!oqctl)
                RETURN(err_serious(-EPROTO));
 
        rc = req_capsule_server_pack(pill);
@@ -2725,12 +2892,16 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_SETINFO:
        case Q_SETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
                if (!nodemap_can_setquota(nodemap))
                        GOTO(out_nodemap, rc = -EPERM);
                /* fallthrough */
        case Q_GETINFO:
        case Q_GETQUOTA:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
                if (qmt == NULL)
                        GOTO(out_nodemap, rc = -EOPNOTSUPP);
                /* slave quotactl */
@@ -2739,8 +2910,10 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_GETOQUOTA:
                break;
        default:
-               CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
-               GOTO(out_nodemap, rc = -EFAULT);
+               rc = -EFAULT;
+               CERROR("%s: unsupported quotactl command %d: rc = %d\n",
+                      mdt_obd_name(mdt), oqctl->qc_cmd, rc);
+               GOTO(out_nodemap, rc);
        }
 
        id = oqctl->qc_id;
@@ -2783,6 +2956,10 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        case Q_GETQUOTA:
        case LUSTRE_Q_SETDEFAULT:
        case LUSTRE_Q_GETDEFAULT:
+       case LUSTRE_Q_SETQUOTAPOOL:
+       case LUSTRE_Q_GETQUOTAPOOL:
+       case LUSTRE_Q_SETINFOPOOL:
+       case LUSTRE_Q_GETINFOPOOL:
                /* forward quotactl request to QMT */
                rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
                break;
@@ -2802,8 +2979,7 @@ static int mdt_quotactl(struct tgt_session_info *tsi)
        if (oqctl->qc_id != id)
                swap(oqctl->qc_id, id);
 
-       *repoqc = *oqctl;
-
+       QCTL_COPY(repoqc, oqctl);
        EXIT;
 
 out_nodemap:
@@ -3103,10 +3279,9 @@ int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 
                        rc = lu_env_init(&env, LCT_MD_THREAD);
                        if (unlikely(rc != 0)) {
-                               CWARN("%s: lu_env initialization failed, object"
-                                     "%p "DFID" is leaked!\n",
+                               CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n",
                                      obd->obd_name, mo,
-                                     PFID(mdt_object_fid(mo)));
+                                     PFID(mdt_object_fid(mo)), rc);
                                RETURN(rc);
                        }
 
@@ -3520,20 +3695,18 @@ static void mdt_save_remote_lock(struct mdt_thread_info *info,
 
        if (lustre_handle_is_used(h)) {
                struct ldlm_lock *lock = ldlm_handle2lock(h);
+               struct ptlrpc_request *req = mdt_info_req(info);
 
                if (o != NULL &&
                    (lock->l_policy_data.l_inodebits.bits &
                     (MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)))
                        mo_invalidate(info->mti_env, mdt_object_child(o));
 
-               if (decref || !info->mti_has_trans ||
+               if (decref || !info->mti_has_trans || !req ||
                    !(mode & (LCK_PW | LCK_EX))) {
                        ldlm_lock_decref_and_cancel(h, mode);
                        LDLM_LOCK_PUT(lock);
                } else {
-                       struct ptlrpc_request *req = mdt_info_req(info);
-
-                       LASSERT(req != NULL);
                        tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
                                          req->rq_transno);
                        ldlm_lock_decref(h, mode);
@@ -3748,7 +3921,6 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
        info->mti_spec.sp_permitted = 0;
-       info->mti_spec.sp_migrate_close = 0;
 
        info->mti_spec.u.sp_ea.eadata = NULL;
        info->mti_spec.u.sp_ea.eadatalen = 0;
@@ -3768,6 +3940,7 @@ void mdt_thread_info_fini(struct mdt_thread_info *info)
        info->mti_env = NULL;
        info->mti_pill = NULL;
        info->mti_exp = NULL;
+       info->mti_mdt = NULL;
 
        if (unlikely(info->mti_big_buf.lb_buf != NULL))
                lu_buf_free(&info->mti_big_buf);
@@ -3795,10 +3968,8 @@ static int mdt_tgt_connect(struct tgt_session_info *tsi)
 {
        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) &&
            cfs_fail_val ==
-           tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) {
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               schedule_timeout(cfs_time_seconds(3));
-       }
+           tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id)
+               schedule_timeout_uninterruptible(cfs_time_seconds(3));
 
        return tgt_connect(tsi);
 }
@@ -3943,7 +4114,7 @@ void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         * If the xid matches, then we know this is a resent request, and allow
         * it. (It's probably an OPEN, for which we don't send a lock.
         */
-       if (req_can_reconstruct(req, NULL) == 1)
+       if (req_can_reconstruct(req, NULL) != 0)
                return;
 
         /*
@@ -4590,9 +4761,8 @@ out_free:
  */
 static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
 {
-       struct seq_server_site  *ss = mdt_seq_site(mdt);
-       int                     rc;
-       char                    *prefix;
+       struct seq_server_site *ss = mdt_seq_site(mdt);
+       char *prefix;
        ENTRY;
 
        /* check if this is adding the first MDC and controller is not yet
@@ -4610,19 +4780,12 @@ static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
 
        /* Note: seq_client_fini will be called in seq_site_fini */
        snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt));
-       rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
-                            prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
+       seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
+                       prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
                                                            NULL);
        OBD_FREE(prefix, MAX_OBD_NAME + 5);
-       if (rc != 0) {
-               OBD_FREE_PTR(ss->ss_client_seq);
-               ss->ss_client_seq = NULL;
-               RETURN(rc);
-       }
-
-       rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
 
-       RETURN(rc);
+       RETURN(seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq));
 }
 
 static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt)
@@ -5154,6 +5317,16 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi)
        return rc;
 }
 
+static int mdt_llog_open(struct tgt_session_info *tsi)
+{
+       ENTRY;
+
+       if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
+               RETURN(err_serious(-EACCES));
+
+       RETURN(tgt_llog_open(tsi));
+}
+
 #define OBD_FAIL_OST_READ_NET  OBD_FAIL_OST_BRW_NET
 #define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
 #define OST_BRW_READ   OST_READ
@@ -5168,7 +5341,7 @@ TGT_RPC_HANDLER(MDS_FIRST_OPC,
                &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
 TGT_RPC_HANDLER(MDS_FIRST_OPC,
                HAS_REPLY,              MDS_SET_INFO,   mdt_set_info,
-               &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION),
+               &RQF_MDT_SET_INFO, LUSTRE_MDS_VERSION),
 TGT_MDT_HDL(0,                         MDS_GET_INFO,   mdt_get_info),
 TGT_MDT_HDL(HAS_REPLY,         MDS_GET_ROOT,   mdt_get_root),
 TGT_MDT_HDL(HAS_BODY,          MDS_GETATTR,    mdt_getattr),
@@ -5221,6 +5394,13 @@ static struct tgt_handler mdt_quota_ops[] = {
 TGT_QUOTA_HDL(HAS_REPLY,               QUOTA_DQACQ,      mdt_quota_dqacq),
 };
 
+static struct tgt_handler mdt_llog_handlers[] = {
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE,      mdt_llog_open),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK,  tgt_llog_next_block),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
+       TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK,  tgt_llog_prev_block),
+};
+
 static struct tgt_opc_slice mdt_common_slice[] = {
        {
                .tos_opc_start  = MDS_FIRST_OPC,
@@ -5265,7 +5445,7 @@ static struct tgt_opc_slice mdt_common_slice[] = {
        {
                .tos_opc_start  = LLOG_FIRST_OPC,
                .tos_opc_end    = LLOG_LAST_OPC,
-               .tos_hs         = tgt_llog_handlers
+               .tos_hs         = mdt_llog_handlers
        },
        {
                .tos_opc_start  = LFSCK_FIRST_OPC,
@@ -5295,6 +5475,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
+
+       mdt_restriper_stop(m);
        ping_evictor_stop();
 
        /* Remove the HSM /proc entry so the coordinator cannot be
@@ -5414,16 +5596,19 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                RETURN(-EFAULT);
        } else {
                lsi = s2lsi(lmi->lmi_sb);
+               LASSERT(lsi->lsi_lmd);
                /* CMD is supported only in IAM mode */
                LASSERT(num);
-               node_id = simple_strtol(num, NULL, 10);
+               rc = kstrtol(num, 10, &node_id);
+               if (rc)
+                       RETURN(rc);
+
                obd->u.obt.obt_magic = OBT_MAGIC;
-               if (lsi->lsi_lmd != NULL &&
-                   lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
+               if (lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
                        m->mdt_skip_lfsck = 1;
        }
 
-       /* DoM files get IO lock at open by default */
+       /* DoM files get IO lock at open optionally by default */
        m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
        /* DoM files are read at open and data is packed in the reply */
        m->mdt_opts.mo_dom_read_open = 1;
@@ -5436,9 +5621,12 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_enable_remote_dir = 1;
        m->mdt_enable_striped_dir = 1;
        m->mdt_enable_dir_migration = 1;
+       m->mdt_enable_dir_restripe = 0;
+       m->mdt_enable_dir_auto_split = 0;
        m->mdt_enable_remote_dir_gid = 0;
        m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
+       m->mdt_dir_restripe_nsonly = 1;
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);
@@ -5511,8 +5699,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                GOTO(err_free_ns, rc);
 
        /* Amount of available space excluded from granting and reserved
-        * for metadata. It is in percentage and 50% is default value. */
-       tgd->tgd_reserved_pcnt = 50;
+        * for metadata. It is a percentage of the total MDT size. */
+       tgd->tgd_reserved_pcnt = 10;
 
        if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits))
                m->mdt_brw_size = 1U << tgd->tgd_blockbits;
@@ -5596,7 +5784,17 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
                ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
 
+       if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV))
+               m->mdt_lut.lut_local_recovery = 1;
+
+       rc = mdt_restriper_start(m);
+       if (rc)
+               GOTO(err_ping_evictor, rc);
+
        RETURN(0);
+
+err_ping_evictor:
+       ping_evictor_stop();
 err_procfs:
        mdt_tunables_fini(m);
 err_recovery:
@@ -5746,6 +5944,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                init_rwsem(&mo->mot_dom_sem);
                init_rwsem(&mo->mot_open_sem);
                atomic_set(&mo->mot_open_count, 0);
+               mo->mot_restripe_offset = 0;
+               INIT_LIST_HEAD(&mo->mot_restripe_linkage);
                RETURN(o);
        }
        RETURN(NULL);
@@ -5773,22 +5973,31 @@ static int mdt_object_init(const struct lu_env *env, struct lu_object *o,
         RETURN(rc);
 }
 
+static void mdt_object_free_rcu(struct rcu_head *head)
+{
+       struct mdt_object *mo = container_of(head, struct mdt_object,
+                                            mot_header.loh_rcu);
+
+       kmem_cache_free(mdt_object_kmem, mo);
+}
+
 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
 {
-        struct mdt_object *mo = mdt_obj(o);
-        struct lu_object_header *h;
-        ENTRY;
+       struct mdt_object *mo = mdt_obj(o);
+       struct lu_object_header *h;
+       ENTRY;
 
-        h = o->lo_header;
-        CDEBUG(D_INFO, "object free, fid = "DFID"\n",
-               PFID(lu_object_fid(o)));
+       h = o->lo_header;
+       CDEBUG(D_INFO, "object free, fid = "DFID"\n",
+              PFID(lu_object_fid(o)));
 
        LASSERT(atomic_read(&mo->mot_open_count) == 0);
        LASSERT(atomic_read(&mo->mot_lease_count) == 0);
 
        lu_object_fini(o);
        lu_object_header_fini(h);
-       OBD_SLAB_FREE_PTR(mo, mdt_object_kmem);
+       OBD_FREE_PRE(mo, sizeof(*mo), "slab-freed");
+       call_rcu(&mo->mot_header.loh_rcu, mdt_object_free_rcu);
 
        EXIT;
 }
@@ -5887,6 +6096,18 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
        RETURN(0);
 }
 
+static inline void mdt_enable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER)
+               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING;
+}
+
+static inline void mdt_disable_slc(struct mdt_device *mdt)
+{
+       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING)
+               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
+}
+
 /**
  * Match client and server connection feature flags.
  *
@@ -6047,6 +6268,12 @@ static int mdt_connect_internal(const struct lu_env *env,
                       exp->exp_obd->obd_name, obd_export_nid2str(exp));
        }
 
+       if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
+               atomic_inc(&mdt->mdt_mds_mds_conns);
+               mdt_enable_slc(mdt);
+       }
+
        return 0;
 }
 
@@ -6077,7 +6304,7 @@ static int mdt_ctxt_add_dirty_flag(struct lu_env *env,
 
 static int mdt_export_cleanup(struct obd_export *exp)
 {
-       struct list_head         closing_list;
+       LIST_HEAD(closing_list);
        struct mdt_export_data  *med = &exp->exp_mdt_data;
        struct obd_device       *obd = exp->exp_obd;
        struct mdt_device       *mdt;
@@ -6087,7 +6314,6 @@ static int mdt_export_cleanup(struct obd_export *exp)
        int rc = 0;
        ENTRY;
 
-       INIT_LIST_HEAD(&closing_list);
        spin_lock(&med->med_open_lock);
        while (!list_empty(&med->med_open_head)) {
                struct list_head *tmp = med->med_open_head.next;
@@ -6158,18 +6384,6 @@ static int mdt_export_cleanup(struct obd_export *exp)
         RETURN(rc);
 }
 
-static inline void mdt_enable_slc(struct mdt_device *mdt)
-{
-       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER)
-               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING;
-}
-
-static inline void mdt_disable_slc(struct mdt_device *mdt)
-{
-       if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING)
-               mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
-}
-
 static int mdt_obd_disconnect(struct obd_export *exp)
 {
        int rc;
@@ -6224,12 +6438,6 @@ static int mdt_obd_connect(const struct lu_env *env,
 
        mdt = mdt_dev(obd->obd_lu_dev);
 
-       if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
-           !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
-               atomic_inc(&mdt->mdt_mds_mds_conns);
-               mdt_enable_slc(mdt);
-       }
-
        /*
         * first, check whether the stack is ready to handle requests
         * XXX: probably not very appropriate method is used now