*/
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
*
* lustre/mdt/mdt_handler.c
*
#include <uapi/linux/lustre/lustre_param.h>
#include <lustre_quota.h>
#include <lustre_swab.h>
+#include <lustre_lmv.h>
#include <obd.h>
#include <obd_support.h>
#include <lustre_barrier.h>
lh->mlh_type = MDT_REG_LOCK;
}
+void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
+{
+ mdt_lock_reg_init(lh, lock->l_req_mode);
+ if (lock->l_req_mode == LCK_GROUP)
+ lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
+}
+
void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
const struct lu_name *lname)
{
EXIT;
}
+/**
+ * Check whether \a o is directory stripe object.
+ *
+ * \param[in] info thread environment
+ * \param[in] o MDT object
+ *
+ * \retval 1 is directory stripe.
+ * \retval 0 isn't directory stripe.
+ * \retval < 1 error code
+ */
+static int mdt_is_dir_stripe(struct mdt_thread_info *info,
+ struct mdt_object *o)
+{
+ struct md_attr *ma = &info->mti_attr;
+ struct lmv_mds_md_v1 *lmv;
+ int rc;
+
+ rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+ if (rc < 0)
+ return rc;
+
+ if (!(ma->ma_valid & MA_LMV))
+ return 0;
+
+ lmv = &ma->ma_lmv->lmv_md_v1;
+
+ if (!lmv_is_sane2(lmv))
+ return -EBADF;
+
+ if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
+ return 1;
+
+ return 0;
+}
+
static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
struct lu_fid *fid)
{
struct mdt_device *mdt = info->mti_mdt;
struct lu_name *lname = &info->mti_name;
+ const char *start = fileset;
char *filename = info->mti_filename;
struct mdt_object *parent;
u32 mode;
*/
*fid = mdt->mdt_md_root_fid;
- while (rc == 0 && fileset != NULL && *fileset != '\0') {
- const char *s1 = fileset;
+ while (rc == 0 && start != NULL && *start != '\0') {
+ const char *s1 = start;
const char *s2;
while (*++s1 == '/')
if (s2 == s1)
break;
- fileset = s2;
+ start = s2;
lname->ln_namelen = s2 - s1;
if (lname->ln_namelen > NAME_MAX) {
rc = PTR_ERR(parent);
else {
mode = lu_object_attr(&parent->mot_obj);
- mdt_object_put(info->mti_env, parent);
- if (!S_ISDIR(mode))
+ if (!S_ISDIR(mode)) {
rc = -ENOTDIR;
+ } else if (mdt_is_remote_object(info, parent, parent)) {
+ if (!mdt->mdt_enable_remote_subdir_mount) {
+ rc = -EREMOTE;
+ LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
+ mdt_obd_name(mdt),
+ fileset, rc);
+ } else {
+ LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
+ mdt_obd_name(mdt),
+ fileset);
+ }
+ }
+ mdt_object_put(info->mti_env, parent);
}
}
struct obd_statfs *osfs;
struct mdt_body *reqbody = NULL;
struct mdt_statfs_cache *msf;
+ ktime_t kstart = ktime_get();
+ int current_blockbits;
int rc;
ENTRY;
spin_unlock(&mdt->mdt_lock);
}
+ /* tgd_blockbit is recordsize bits set during mkfs.
+ * This once set does not change. However, 'zfs set'
+ * can be used to change the MDT blocksize. Instead
+ * of using cached value of 'tgd_blockbit' always
+ * calculate the blocksize bits which may have
+ * changed.
+ */
+ current_blockbits = fls64(osfs->os_bsize) - 1;
+
/* at least try to account for cached pages. its still racy and
* might be under-reporting if clients haven't announced their
* caches with brw recently */
" pending %llu free %llu avail %llu\n",
tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
tgd->tgd_tot_pending,
- osfs->os_bfree << tgd->tgd_blockbits,
- osfs->os_bavail << tgd->tgd_blockbits);
+ osfs->os_bfree << current_blockbits,
+ osfs->os_bavail << current_blockbits);
osfs->os_bavail -= min_t(u64, osfs->os_bavail,
((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
- osfs->os_bsize - 1) >> tgd->tgd_blockbits));
+ osfs->os_bsize - 1) >> current_blockbits));
tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
osfs->os_files, osfs->os_ffree, osfs->os_state);
if (!exp_grant_param_supp(tsi->tsi_exp) &&
- tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) {
+ current_blockbits > COMPAT_BSIZE_SHIFT) {
/* clients which don't support OBD_CONNECT_GRANT_PARAM
* should not see a block size > page size, otherwise
* cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
* block size which is the biggest block size known to work
* with all client's page size. */
- osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
- osfs->os_bfree <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
- osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
+ osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
+ osfs->os_bfree <<= current_blockbits - COMPAT_BSIZE_SHIFT;
+ osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
}
if (rc == 0)
- mdt_counter_incr(req, LPROC_MDT_STATFS);
+ mdt_counter_incr(req, LPROC_MDT_STATFS,
+ ktime_us_delta(ktime_get(), kstart));
out:
mdt_thread_info_fini(info);
RETURN(rc);
}
+__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
+{
+ struct lov_comp_md_v1 *comp_v1;
+ struct lov_mds_md *v1;
+ __u32 off;
+ __u32 dom_stripesize = 0;
+ int i;
+ bool has_ost_stripes = false;
+
+ ENTRY;
+
+ if (is_dom_only)
+ *is_dom_only = 0;
+
+ if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
+ RETURN(0);
+
+ comp_v1 = (struct lov_comp_md_v1 *)lmm;
+ off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
+ v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+ /* Fast check for DoM entry with no mirroring, should be the first */
+ if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
+ lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
+ RETURN(0);
+
+ /* check all entries otherwise */
+ for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
+ struct lov_comp_md_entry_v1 *lcme;
+
+ lcme = &comp_v1->lcm_entries[i];
+ if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
+ continue;
+
+ off = le32_to_cpu(lcme->lcme_offset);
+ v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+ if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
+ LOV_PATTERN_MDT)
+ dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
+ else
+ has_ost_stripes = true;
+
+ if (dom_stripesize && has_ost_stripes)
+ RETURN(dom_stripesize);
+ }
+ /* DoM-only case exits here */
+ if (is_dom_only && dom_stripesize)
+ *is_dom_only = 1;
+ RETURN(dom_stripesize);
+}
+
/**
* Pack size attributes into the reply.
*/
{
struct mdt_body *b;
struct md_attr *ma = &info->mti_attr;
- int dom_stripe;
+ __u32 dom_stripe;
bool dom_lock = false;
ENTRY;
!(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
RETURN(-ENODATA);
- dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm);
+ dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
/* no DoM stripe, no size in reply */
- if (dom_stripe == LMM_NO_DOM)
+ if (!dom_stripe)
RETURN(-ENOENT);
if (lustre_handle_is_used(lh)) {
if (buf->lb_len == 0)
RETURN(0);
+ LASSERT(!info->mti_big_acl_used);
again:
rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
if (rc < 0) {
rc = 0;
} else if (rc == -EOPNOTSUPP) {
rc = 0;
- } else {
- if (rc == -ERANGE &&
- exp_connect_large_acl(info->mti_exp) &&
- buf->lb_buf != info->mti_big_acl) {
+ } else if (rc == -ERANGE) {
+ if (exp_connect_large_acl(info->mti_exp) &&
+ !info->mti_big_acl_used) {
if (info->mti_big_acl == NULL) {
info->mti_big_aclsize =
- MIN(mdt->mdt_max_ea_size,
- XATTR_SIZE_MAX);
+ min_t(unsigned int,
+ mdt->mdt_max_ea_size,
+ XATTR_SIZE_MAX);
OBD_ALLOC_LARGE(info->mti_big_acl,
info->mti_big_aclsize);
if (info->mti_big_acl == NULL) {
buf->lb_buf = info->mti_big_acl;
buf->lb_len = info->mti_big_aclsize;
-
+ info->mti_big_acl_used = 1;
goto again;
}
-
+ /* FS has ACL bigger that our limits */
+ CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
+ mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
+ info->mti_big_aclsize);
+ rc = -E2BIG;
+ } else {
CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
}
} else {
- int client;
- int server;
- int acl_buflen;
- int lmm_buflen = 0;
- int lmmsize = 0;
-
- acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
- if (acl_buflen >= rc)
- goto map;
-
- /* If LOV/LMA EA is small, we can reuse part of their buffer */
- client = ptlrpc_req_get_repsize(pill->rc_req);
- server = lustre_packed_msg_size(pill->rc_req->rq_repmsg);
- if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
- lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD,
- RCL_SERVER);
- lmmsize = repbody->mbo_eadatasize;
- }
-
- if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) {
- CDEBUG(D_INODE, "%s: client prepared buffer size %d "
- "is not big enough with the ACL size %d (%d)\n",
- mdt_obd_name(mdt), client, rc,
- server - acl_buflen - lmm_buflen + rc + lmmsize);
- repbody->mbo_aclsize = 0;
- repbody->mbo_valid &= ~OBD_MD_FLACL;
- RETURN(-ERANGE);
- }
-
-map:
- if (buf->lb_buf == info->mti_big_acl)
- info->mti_big_acl_used = 1;
-
rc = nodemap_map_acl(nodemap, buf->lb_buf,
rc, NODEMAP_FS_TO_CLIENT);
/* if all ACLs mapped out, rc is still >= 0 */
b->mbo_ctime = attr->la_ctime;
b->mbo_valid |= OBD_MD_FLCTIME;
}
+ if (attr->la_valid & LA_BTIME) {
+ b->mbo_btime = attr->la_btime;
+ b->mbo_valid |= OBD_MD_FLBTIME;
+ }
if (attr->la_valid & LA_FLAGS) {
b->mbo_flags = attr->la_flags;
b->mbo_valid |= OBD_MD_FLFLAGS;
RETURN(rc);
}
-int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
- struct md_attr *ma, const char *name)
+int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+ struct md_attr *ma, const char *name)
{
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
return rc;
}
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+ struct md_attr *ma, const char *name)
+{
+ int rc;
+
+ if (!info->mti_big_lmm) {
+ OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
+ if (!info->mti_big_lmm)
+ return -ENOMEM;
+ info->mti_big_lmmsize = PAGE_SIZE;
+ }
+
+ if (strcmp(name, XATTR_NAME_LOV) == 0) {
+ ma->ma_lmm = info->mti_big_lmm;
+ ma->ma_lmm_size = info->mti_big_lmmsize;
+ ma->ma_valid &= ~MA_LOV;
+ } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
+ ma->ma_lmv = info->mti_big_lmm;
+ ma->ma_lmv_size = info->mti_big_lmmsize;
+ ma->ma_valid &= ~MA_LMV;
+ } else {
+ LBUG();
+ }
+
+ LASSERT(!info->mti_big_lmm_used);
+ rc = __mdt_stripe_get(info, o, ma, name);
+ /* since big_lmm is always used here, clear 'used' flag to avoid
+ * assertion in mdt_big_xattr_get().
+ */
+ info->mti_big_lmm_used = 0;
+
+ return rc;
+}
+
int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
struct lu_fid *pfid)
{
RETURN(0);
}
+int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
+ struct lu_fid *pfid, struct lu_name *lname)
+{
+ struct lu_buf *buf = &info->mti_buf;
+ struct link_ea_header *leh;
+ struct link_ea_entry *lee;
+ int reclen;
+ int rc;
+
+ buf->lb_buf = info->mti_xattr_buf;
+ buf->lb_len = sizeof(info->mti_xattr_buf);
+ rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
+ XATTR_NAME_LINK);
+ if (rc == -ERANGE) {
+ rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+ buf->lb_buf = info->mti_big_lmm;
+ buf->lb_len = info->mti_big_lmmsize;
+ }
+ if (rc < 0)
+ return rc;
+
+ if (rc < sizeof(*leh)) {
+ CERROR("short LinkEA on "DFID": rc = %d\n",
+ PFID(mdt_object_fid(o)), rc);
+ return -ENODATA;
+ }
+
+ leh = (struct link_ea_header *)buf->lb_buf;
+ lee = (struct link_ea_entry *)(leh + 1);
+ if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+ leh->leh_magic = LINK_EA_MAGIC;
+ leh->leh_reccount = __swab32(leh->leh_reccount);
+ leh->leh_len = __swab64(leh->leh_len);
+ }
+ if (leh->leh_magic != LINK_EA_MAGIC)
+ return -EINVAL;
+
+ if (leh->leh_reccount == 0)
+ return -ENODATA;
+
+ linkea_entry_unpack(lee, &reclen, lname, pfid);
+
+ return 0;
+}
+
int mdt_attr_get_complex(struct mdt_thread_info *info,
struct mdt_object *o, struct md_attr *ma)
{
}
if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
- rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+ rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
if (rc)
GOTO(out, rc);
}
if (need & MA_LMV && S_ISDIR(mode)) {
- rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+ rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
if (rc != 0)
GOTO(out, rc);
}
if (need & MA_LMV_DEF && S_ISDIR(mode)) {
- rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+ rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
if (rc != 0)
GOTO(out, rc);
}
}
static int mdt_getattr_internal(struct mdt_thread_info *info,
- struct mdt_object *o, int ma_need)
+ struct mdt_object *o, int ma_need)
{
- struct md_object *next = mdt_object_child(o);
- const struct mdt_body *reqbody = info->mti_body;
- struct ptlrpc_request *req = mdt_info_req(info);
- struct md_attr *ma = &info->mti_attr;
- struct lu_attr *la = &ma->ma_attr;
- struct req_capsule *pill = info->mti_pill;
- const struct lu_env *env = info->mti_env;
- struct mdt_body *repbody;
- struct lu_buf *buffer = &info->mti_buf;
- struct obd_export *exp = info->mti_exp;
- int rc;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct md_object *next = mdt_object_child(o);
+ const struct mdt_body *reqbody = info->mti_body;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_attr *la = &ma->ma_attr;
+ struct req_capsule *pill = info->mti_pill;
+ const struct lu_env *env = info->mti_env;
+ struct mdt_body *repbody;
+ struct lu_buf *buffer = &info->mti_buf;
+ struct obd_export *exp = info->mti_exp;
+ ktime_t kstart = ktime_get();
+ int rc;
+
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
}
}
- if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+ if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
reqbody->mbo_valid & OBD_MD_FLDIREA &&
- lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
- /* get default stripe info for this dir. */
- ma->ma_need |= MA_LOV_DEF;
- }
- ma->ma_need |= ma_need;
+ lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
+ /* get default stripe info for this dir. */
+ ma->ma_need |= MA_LOV_DEF;
+ }
+ ma->ma_need |= ma_need;
rc = mdt_attr_get_complex(info, o, ma);
if (unlikely(rc)) {
- CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
- "%s: getattr error for "DFID": rc = %d\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(o)), rc);
+ CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
+ "%s: getattr error for "DFID": rc = %d\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(o)), rc);
RETURN(rc);
}
repbody->mbo_t_state = MS_RESTORE;
}
- if (likely(ma->ma_valid & MA_INODE))
- mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
- else
- RETURN(-EFAULT);
+ if (unlikely(!(ma->ma_valid & MA_INODE)))
+ RETURN(-EFAULT);
+
+ mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
+
+ if (mdt_body_has_lov(la, reqbody)) {
+ u32 stripe_count = 1;
+ bool fixed_layout = false;
- if (mdt_body_has_lov(la, reqbody)) {
- if (ma->ma_valid & MA_LOV) {
- LASSERT(ma->ma_lmm_size);
+ if (ma->ma_valid & MA_LOV) {
+ LASSERT(ma->ma_lmm_size);
repbody->mbo_eadatasize = ma->ma_lmm_size;
if (S_ISDIR(la->la_mode))
repbody->mbo_valid |= OBD_MD_FLDIREA;
else
repbody->mbo_valid |= OBD_MD_FLEASIZE;
mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
- }
+ }
if (ma->ma_valid & MA_LMV) {
+ struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+ u32 magic = le32_to_cpu(lmv->lmv_magic);
+
/* Return -ENOTSUPP for old client */
if (!mdt_is_striped_client(req->rq_export))
RETURN(-ENOTSUPP);
mdt_dump_lmv(D_INFO, ma->ma_lmv);
repbody->mbo_eadatasize = ma->ma_lmv_size;
repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
+
+ stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+ fixed_layout = lmv_is_fixed(lmv);
+ if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
+ mdt_restripe_migrate_add(info, o);
+ else if (magic == LMV_MAGIC_V1 &&
+ lmv_is_restriping(lmv))
+ mdt_restripe_update_add(info, o);
}
if (ma->ma_valid & MA_LMV_DEF) {
/* Return -ENOTSUPP for old client */
repbody->mbo_valid |= (OBD_MD_FLDIREA |
OBD_MD_DEFAULT_MEA);
}
+ CDEBUG(D_VFSTRACE,
+ "dirent count %llu stripe count %u MDT count %d\n",
+ ma->ma_attr.la_dirent_count, stripe_count,
+ atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+ if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
+ ma->ma_attr.la_dirent_count >
+ mdt->mdt_restriper.mdr_dir_split_count &&
+ !fid_is_root(mdt_object_fid(o)) &&
+ mdt->mdt_enable_dir_auto_split &&
+ !o->mot_restriping &&
+ stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
+ !fixed_layout)
+ mdt_auto_split_add(info, o);
} else if (S_ISLNK(la->la_mode) &&
reqbody->mbo_valid & OBD_MD_LINKNAME) {
buffer->lb_buf = ma->ma_lmm;
print_limit < rc ? "..." : "", print_limit,
(char *)ma->ma_lmm + rc - print_limit, rc);
rc = 0;
- }
- }
+ }
+ }
if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
#endif
out:
- if (rc == 0)
- mdt_counter_incr(req, LPROC_MDT_GETATTR);
+ if (rc == 0)
+ mdt_counter_incr(req, LPROC_MDT_GETATTR,
+ ktime_us_delta(ktime_get(), kstart));
- RETURN(rc);
+ RETURN(rc);
}
static int mdt_getattr(struct tgt_session_info *tsi)
/**
* Handler of layout intent RPC requiring the layout modification
*
- * \param[in] info thread environment
- * \param[in] obj object
- * \param[in] layout layout change descriptor
+ * \param[in] info thread environment
+ * \param[in] obj object
+ * \param[out] lhc object ldlm lock handle
+ * \param[in] layout layout change descriptor
*
* \retval 0 on success
* \retval < 0 error code
*/
int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
+ struct mdt_lock_handle *lhc,
struct md_layout_change *layout)
{
- struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
int rc;
+
ENTRY;
if (!mdt_object_exists(obj))
- GOTO(out, rc = -ENOENT);
+ RETURN(-ENOENT);
if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
- GOTO(out, rc = -EINVAL);
+ RETURN(-EINVAL);
rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
MAY_WRITE);
if (rc)
- GOTO(out, rc);
+ RETURN(rc);
- /* take layout lock to prepare layout change */
- mdt_lock_reg_init(lh, LCK_EX);
- rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_LAYOUT);
- if (rc)
- GOTO(out, rc);
+ rc = mdt_check_resent_lock(info, obj, lhc);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (rc > 0) {
+ /* not resent */
+ __u64 lockpart = MDS_INODELOCK_LAYOUT;
+
+ /* take layout lock to prepare layout change */
+ if (layout->mlc_opc == MD_LAYOUT_WRITE)
+ lockpart |= MDS_INODELOCK_UPDATE;
+
+ mdt_lock_handle_init(lhc);
+ mdt_lock_reg_init(lhc, LCK_EX);
+ rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
+ if (rc)
+ RETURN(rc);
+ }
mutex_lock(&obj->mot_som_mutex);
rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
mutex_unlock(&obj->mot_som_mutex);
- mdt_object_unlock(info, obj, lh, 1);
-out:
+
+ if (rc)
+ mdt_object_unlock(info, obj, lhc, 1);
+
RETURN(rc);
}
static int mdt_raw_lookup(struct mdt_thread_info *info,
struct mdt_object *parent,
- const struct lu_name *lname,
- struct ldlm_reply *ldlm_rep)
+ const struct lu_name *lname)
{
- struct lu_fid *child_fid = &info->mti_tmp_fid1;
- int rc;
+ struct lu_fid *fid = &info->mti_tmp_fid1;
+ struct mdt_body *repbody;
+ bool is_dotdot = false;
+ bool is_old_parent_stripe = false;
+ bool is_new_parent_checked = false;
+ int rc;
+
ENTRY;
LASSERT(!info->mti_cross_ref);
+ /* Always allow to lookup ".." */
+ if (lname->ln_namelen == 2 &&
+ lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
+ info->mti_spec.sp_permitted = 1;
+ is_dotdot = true;
+ if (mdt_is_dir_stripe(info, parent) == 1)
+ is_old_parent_stripe = true;
+ }
+ mdt_object_get(info->mti_env, parent);
+lookup:
/* Only got the fid of this obj by name */
- fid_zero(child_fid);
- rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
- lname, child_fid, &info->mti_spec);
- if (rc == 0) {
- struct mdt_body *repbody;
+ fid_zero(fid);
+ rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
+ &info->mti_spec);
+ mdt_object_put(info->mti_env, parent);
+ if (rc)
+ RETURN(rc);
- repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
- repbody->mbo_fid1 = *child_fid;
- repbody->mbo_valid = OBD_MD_FLID;
- mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
- } else if (rc == -ENOENT) {
- mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+ /* getattr_name("..") should return master object FID for striped dir */
+ if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
+ parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+ if (IS_ERR(parent))
+ RETURN(PTR_ERR(parent));
+
+ /* old client getattr_name("..") with stripe FID */
+ if (unlikely(is_old_parent_stripe)) {
+ is_old_parent_stripe = false;
+ goto lookup;
+ }
+
+ /* ".." may be a stripe */
+ if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
+ is_new_parent_checked = true;
+ goto lookup;
+ }
+
+ mdt_object_put(info->mti_env, parent);
}
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ repbody->mbo_fid1 = *fid;
+ repbody->mbo_valid = OBD_MD_FLID;
+
RETURN(rc);
}
__u64 child_bits,
struct ldlm_reply *ldlm_rep)
{
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_body *reqbody = NULL;
- struct mdt_object *parent = info->mti_object;
- struct mdt_object *child;
- struct lu_fid *child_fid = &info->mti_tmp_fid1;
- struct lu_name *lname = NULL;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_body *reqbody = NULL;
+ struct mdt_object *parent = info->mti_object;
+ struct mdt_object *child = NULL;
+ struct lu_fid *child_fid = &info->mti_tmp_fid1;
+ struct lu_name *lname = NULL;
struct mdt_lock_handle *lhp = NULL;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
struct req_capsule *pill = info->mti_pill;
__u64 try_bits = 0;
bool is_resent;
}
rc = mdt_getattr_internal(info, child, 0);
- if (unlikely(rc != 0))
+ if (unlikely(rc != 0)) {
mdt_object_unlock(info, child, lhc, 1);
+ RETURN(rc);
+ }
- mdt_pack_secctx_in_reply(info, child);
+ rc = mdt_pack_secctx_in_reply(info, child);
+ if (unlikely(rc)) {
+ mdt_object_unlock(info, child, lhc, 1);
+ RETURN(rc);
+ }
+ rc = mdt_pack_encctx_in_reply(info, child);
+ if (unlikely(rc))
+ mdt_object_unlock(info, child, lhc, 1);
RETURN(rc);
}
mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
if (lu_name_is_valid(lname)) {
+ if (mdt_object_remote(parent)) {
+ CERROR("%s: parent "DFID" is on remote target\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(parent)));
+ RETURN(-EPROTO);
+ }
+
CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
"ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
PNAME(lname), ldlm_rep);
RETURN(err_serious(-EPROTO));
*child_fid = reqbody->mbo_fid2;
-
if (unlikely(!fid_is_sane(child_fid)))
RETURN(err_serious(-EINVAL));
+ if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+ mdt_object_get(info->mti_env, parent);
+ child = parent;
+ } else {
+ child = mdt_object_find(info->mti_env, info->mti_mdt,
+ child_fid);
+ if (IS_ERR(child))
+ RETURN(PTR_ERR(child));
+ }
+
+ if (mdt_object_remote(child)) {
+ CERROR("%s: child "DFID" is on remote target\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(child)));
+ GOTO(out_child, rc = -EPROTO);
+ }
+
+ /* don't fetch LOOKUP lock if it's remote object */
+ rc = mdt_is_remote_object(info, parent, child);
+ if (rc < 0)
+ GOTO(out_child, rc);
+ if (rc)
+ child_bits &= ~MDS_INODELOCK_LOOKUP;
+
CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
"ldlm_rep = %p\n",
PFID(mdt_object_fid(parent)),
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
&parent->mot_obj,
"Parent doesn't exist!");
- RETURN(-ESTALE);
- }
-
- if (mdt_object_remote(parent)) {
- CERROR("%s: parent "DFID" is on remote target\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(parent)));
- RETURN(-EIO);
+ GOTO(out_child, rc = -ESTALE);
}
if (lu_name_is_valid(lname)) {
- /* Always allow to lookup ".." */
- if (unlikely(lname->ln_namelen == 2 &&
- lname->ln_name[0] == '.' &&
- lname->ln_name[1] == '.'))
- info->mti_spec.sp_permitted = 1;
-
if (info->mti_body->mbo_valid == OBD_MD_FLID) {
- rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
+ rc = mdt_raw_lookup(info, parent, lname);
RETURN(rc);
}
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
if (rc != 0)
- GOTO(out_parent, rc);
- }
-
- mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+ GOTO(unlock_parent, rc);
- /*
- *step 3: find the child object by fid & lock it.
- * regardless if it is local or remote.
- *
- *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
- * set parent dir fid the same as child fid in getattr by fid case
- * we should not lu_object_find() the object again, could lead
- * to hung if there is a concurrent unlink destroyed the object.
- */
- if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
- mdt_object_get(info->mti_env, parent);
- child = parent;
- } else {
child = mdt_object_find(info->mti_env, info->mti_mdt,
child_fid);
+ if (unlikely(IS_ERR(child)))
+ GOTO(unlock_parent, rc = PTR_ERR(child));
}
- if (unlikely(IS_ERR(child)))
- GOTO(out_parent, rc = PTR_ERR(child));
+ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+
+ /* step 3: lock child regardless if it is local or remote. */
+ LASSERT(child);
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
if (!mdt_object_exists(child)) {
mdt_lock_reg_init(lhc, LCK_PR);
if (!(child_bits & MDS_INODELOCK_UPDATE) &&
- mdt_object_exists(child) && !mdt_object_remote(child)) {
+ !mdt_object_remote(child)) {
struct md_attr *ma = &info->mti_attr;
ma->ma_valid = 0;
* lock and this might save us RPC on later STAT. For
* directories, it also let negative dentry cache start
* working for this dir. */
- if (ma->ma_valid & MA_INODE &&
- ma->ma_attr.la_valid & LA_CTIME &&
- info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
- ma->ma_attr.la_ctime < ktime_get_real_seconds())
- child_bits |= MDS_INODELOCK_UPDATE;
- }
+ if (ma->ma_valid & MA_INODE &&
+ ma->ma_attr.la_valid & LA_CTIME &&
+ info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
+ ma->ma_attr.la_ctime < ktime_get_real_seconds())
+ child_bits |= MDS_INODELOCK_UPDATE;
+ }
/* layout lock must be granted in a best-effort way
* for IT operations */
GOTO(out_child, rc);
}
- mdt_pack_secctx_in_reply(info, child);
+ rc = mdt_pack_secctx_in_reply(info, child);
+ if (unlikely(rc)) {
+ mdt_object_unlock(info, child, lhc, 1);
+ GOTO(out_child, rc);
+ }
+
+ rc = mdt_pack_encctx_in_reply(info, child);
+ if (unlikely(rc)) {
+ mdt_object_unlock(info, child, lhc, 1);
+ GOTO(out_child, rc);
+ }
lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
if (lock) {
PLDLMRES(lock->l_resource),
PFID(mdt_object_fid(child)));
+ if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+ OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
+ req->rq_deadline -
+ req->rq_arrival_time.tv_sec +
+ cfs_fail_val ?: 3);
+ /* Put the lock to the waiting list and force the cancel */
+ ldlm_set_ast_sent(lock);
+ }
+
if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
- mdt_object_exists(child) && !mdt_object_remote(child) &&
- child != parent) {
+ !mdt_object_remote(child) && child != parent) {
mdt_object_put(info->mti_env, child);
rc = mdt_pack_size2body(info, child_fid,
&lhc->mlh_reg_lh);
unlock_res_and_lock(lock);
}
LDLM_LOCK_PUT(lock);
- GOTO(out_parent, rc = 0);
+ GOTO(unlock_parent, rc = 0);
}
LDLM_LOCK_PUT(lock);
}
EXIT;
out_child:
- mdt_object_put(info->mti_env, child);
-out_parent:
+ if (child)
+ mdt_object_put(info->mti_env, child);
+unlock_parent:
if (lhp)
mdt_object_unlock(info, parent, lhp, 1);
return rc;
static int mdt_getattr_name(struct tgt_session_info *tsi)
{
struct mdt_thread_info *info = tsi2mdt_info(tsi);
- struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
- struct mdt_body *reqbody;
- struct mdt_body *repbody;
- int rc, rc2;
- ENTRY;
+ struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
+ struct mdt_body *reqbody;
+ struct mdt_body *repbody;
+ int rc, rc2;
- reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
- LASSERT(reqbody != NULL);
- repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
- LASSERT(repbody != NULL);
+ ENTRY;
+
+ reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+ LASSERT(reqbody != NULL);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ LASSERT(repbody != NULL);
info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
repbody->mbo_eadatasize = 0;
repbody->mbo_aclsize = 0;
- rc = mdt_init_ucred_intent_getattr(info, reqbody);
- if (unlikely(rc))
- GOTO(out_shrink, rc);
+ rc = mdt_init_ucred(info, reqbody);
+ if (unlikely(rc))
+ GOTO(out_shrink, rc);
- rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
- if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
- ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
- lhc->mlh_reg_lh.cookie = 0;
- }
- mdt_exit_ucred(info);
- EXIT;
+ rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
+ if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+ ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
+ lhc->mlh_reg_lh.cookie = 0;
+ }
+ mdt_exit_ucred(info);
+ EXIT;
out_shrink:
- mdt_client_compatibility(info);
- rc2 = mdt_fix_reply(info);
- if (rc == 0)
- rc = rc2;
+ mdt_client_compatibility(info);
+ rc2 = mdt_fix_reply(info);
+ if (rc == 0)
+ rc = rc2;
mdt_thread_info_fini(info);
return rc;
}
if (la->la_flags & LUSTRE_IMMUTABLE_FL)
rc = -EACCES;
- if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
+ if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
RETURN(0);
if (uc->uc_fsuid == la->la_uid) {
if ((la->la_mode & S_IWUSR) == 0)
tgt_name(tsi->tsi_tgt), vallen);
RETURN(-EINVAL);
}
- if (ptlrpc_req_need_swab(req)) {
+ if (req_capsule_req_need_swab(&req->rq_pill)) {
__swab64s(&cs->cs_recno);
__swab32s(&cs->cs_id);
}
+ if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
+ RETURN(-EACCES);
rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
vallen, val, NULL);
} else if (KEY_IS(KEY_EVICT_BY_NID)) {
exp_max_brw_size(tsi->tsi_exp));
rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
PAGE_SHIFT;
- OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
- if (rdpg->rp_pages == NULL)
- RETURN(-ENOMEM);
+ OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
+ if (rdpg->rp_pages == NULL)
+ RETURN(-ENOMEM);
- for (i = 0; i < rdpg->rp_npages; ++i) {
+ for (i = 0; i < rdpg->rp_npages; ++i) {
rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
- if (rdpg->rp_pages[i] == NULL)
- GOTO(free_rdpg, rc = -ENOMEM);
- }
+ if (rdpg->rp_pages[i] == NULL)
+ GOTO(free_rdpg, rc = -ENOMEM);
+ }
- /* call lower layers to fill allocated pages with directory data */
+ /* call lower layers to fill allocated pages with directory data */
rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
if (rc < 0)
GOTO(free_rdpg, rc);
for (i = 0; i < rdpg->rp_npages; i++)
if (rdpg->rp_pages[i] != NULL)
__free_page(rdpg->rp_pages[i]);
- OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+ OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
RETURN(0);
req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
RCL_CLIENT)) {
if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
- RCL_CLIENT) != 0) {
+ RCL_CLIENT) != 0)
/* pre-set size in server part with max size */
req_capsule_set_size(pill, &RMF_FILE_SECCTX,
RCL_SERVER,
- info->mti_mdt->mdt_max_ea_size);
- } else {
+ OBD_MAX_DEFAULT_EA_SIZE);
+ else
req_capsule_set_size(pill, &RMF_FILE_SECCTX,
RCL_SERVER, 0);
- }
}
+}
+static void mdt_preset_encctx_size(struct mdt_thread_info *info)
+{
+ struct req_capsule *pill = info->mti_pill;
+
+ if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
+ RCL_SERVER))
+ /* pre-set size in server part with max size */
+ req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
+ RCL_SERVER,
+ info->mti_mdt->mdt_max_mdsize);
}
static int mdt_reint_internal(struct mdt_thread_info *info,
LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
mdt_preset_secctx_size(info);
+ mdt_preset_encctx_size(info);
rc = req_capsule_server_pack(pill);
if (rc != 0) {
/*
* Data-on-MDT optimization - read data along with OPEN and return it
- * in reply. Do that only if we have both DOM and LAYOUT locks.
+ * in reply when possible.
*/
- if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) &&
- info->mti_attr.ma_lmm != NULL &&
- mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) {
+ if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
rc = mdt_dom_read_on_open(info, info->mti_mdt,
&lhc->mlh_reg_lh);
- }
return rc;
}
struct ptlrpc_request *req = tgt_ses_req(tsi);
struct req_capsule *pill = tsi->tsi_pill;
struct mdt_body *body;
+ ktime_t kstart = ktime_get();
int rc;
ENTRY;
mdt_thread_info_fini(info);
}
if (rc == 0)
- mdt_counter_incr(req, LPROC_MDT_SYNC);
+ mdt_counter_incr(req, LPROC_MDT_SYNC,
+ ktime_us_delta(ktime_get(), kstart));
RETURN(rc);
}
*/
static int mdt_quotactl(struct tgt_session_info *tsi)
{
- struct obd_export *exp = tsi->tsi_exp;
- struct req_capsule *pill = tsi->tsi_pill;
- struct obd_quotactl *oqctl, *repoqc;
- int id, rc;
- struct mdt_device *mdt = mdt_exp2dev(exp);
- struct lu_device *qmt = mdt->mdt_qmt_dev;
- struct lu_nodemap *nodemap;
+ struct obd_export *exp = tsi->tsi_exp;
+ struct req_capsule *pill = tsi->tsi_pill;
+ struct obd_quotactl *oqctl, *repoqc;
+ int id, rc;
+ struct mdt_device *mdt = mdt_exp2dev(exp);
+ struct lu_device *qmt = mdt->mdt_qmt_dev;
+ struct lu_nodemap *nodemap;
ENTRY;
oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL)
+ if (!oqctl)
RETURN(err_serious(-EPROTO));
rc = req_capsule_server_pack(pill);
case Q_SETINFO:
case Q_SETQUOTA:
case LUSTRE_Q_SETDEFAULT:
+ case LUSTRE_Q_SETQUOTAPOOL:
+ case LUSTRE_Q_SETINFOPOOL:
+ case LUSTRE_Q_SETDEFAULT_POOL:
if (!nodemap_can_setquota(nodemap))
GOTO(out_nodemap, rc = -EPERM);
/* fallthrough */
case Q_GETINFO:
case Q_GETQUOTA:
case LUSTRE_Q_GETDEFAULT:
+ case LUSTRE_Q_GETQUOTAPOOL:
+ case LUSTRE_Q_GETINFOPOOL:
+ case LUSTRE_Q_GETDEFAULT_POOL:
if (qmt == NULL)
GOTO(out_nodemap, rc = -EOPNOTSUPP);
/* slave quotactl */
case Q_GETOQUOTA:
break;
default:
- CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
- GOTO(out_nodemap, rc = -EFAULT);
+ rc = -EFAULT;
+ CERROR("%s: unsupported quotactl command %d: rc = %d\n",
+ mdt_obd_name(mdt), oqctl->qc_cmd, rc);
+ GOTO(out_nodemap, rc);
}
id = oqctl->qc_id;
case Q_GETQUOTA:
case LUSTRE_Q_SETDEFAULT:
case LUSTRE_Q_GETDEFAULT:
+ case LUSTRE_Q_SETQUOTAPOOL:
+ case LUSTRE_Q_GETQUOTAPOOL:
+ case LUSTRE_Q_SETINFOPOOL:
+ case LUSTRE_Q_GETINFOPOOL:
+ case LUSTRE_Q_SETDEFAULT_POOL:
+ case LUSTRE_Q_GETDEFAULT_POOL:
/* forward quotactl request to QMT */
rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
break;
if (oqctl->qc_id != id)
swap(oqctl->qc_id, id);
- *repoqc = *oqctl;
-
+ QCTL_COPY(repoqc, oqctl);
EXIT;
out_nodemap:
rc = lu_env_init(&env, LCT_MD_THREAD);
if (unlikely(rc != 0)) {
- CWARN("%s: lu_env initialization failed, object"
- "%p "DFID" is leaked!\n",
+ CWARN("%s: lu_env initialization failed, object %p "DFID" is leaked!: rc = %d\n",
obd->obd_name, mo,
- PFID(mdt_object_fid(mo)));
+ PFID(mdt_object_fid(mo)), rc);
RETURN(rc);
}
cache);
}
-static int mdt_object_local_lock(struct mdt_thread_info *info,
- struct mdt_object *o,
- struct mdt_lock_handle *lh, __u64 *ibits,
- __u64 trybits, bool cos_incompat)
+int mdt_object_local_lock(struct mdt_thread_info *info, struct mdt_object *o,
+ struct mdt_lock_handle *lh, __u64 *ibits,
+ __u64 trybits, bool cos_incompat)
{
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
union ldlm_policy_data *policy = &info->mti_policy;
policy->l_inodebits.bits = *ibits;
policy->l_inodebits.try_bits = trybits;
+ policy->l_inodebits.li_gid = lh->mlh_gid;
/*
* Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
* object anyway XXX*/
if (lh->mlh_type == MDT_PDO_LOCK &&
lh->mlh_pdo_hash != 0) {
- CDEBUG(D_INFO, "%s: "DFID" convert PDO lock to"
- "EX lock.\n", mdt_obd_name(info->mti_mdt),
+ CDEBUG(D_INFO,
+ "%s: "DFID" convert PDO lock to EX lock.\n",
+ mdt_obd_name(info->mti_mdt),
PFID(mdt_object_fid(o)));
lh->mlh_pdo_hash = 0;
lh->mlh_rreg_mode = LCK_EX;
if (lustre_handle_is_used(h)) {
struct ldlm_lock *lock = ldlm_handle2lock(h);
+ struct ptlrpc_request *req = mdt_info_req(info);
if (o != NULL &&
(lock->l_policy_data.l_inodebits.bits &
(MDS_INODELOCK_XATTR | MDS_INODELOCK_UPDATE)))
mo_invalidate(info->mti_env, mdt_object_child(o));
- if (decref || !info->mti_has_trans ||
+ if (decref || !info->mti_has_trans || !req ||
!(mode & (LCK_PW | LCK_EX))) {
ldlm_lock_decref_and_cancel(h, mode);
LDLM_LOCK_PUT(lock);
} else {
- struct ptlrpc_request *req = mdt_info_req(info);
-
- LASSERT(req != NULL);
tgt_save_slc_lock(&info->mti_mdt->mdt_lut, lock,
req->rq_transno);
ldlm_lock_decref(h, mode);
LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
mdt_preset_secctx_size(info);
+ mdt_preset_encctx_size(info);
rc = req_capsule_server_pack(pill);
if (rc)
info->mti_spec.no_create = 0;
info->mti_spec.sp_rm_entry = 0;
info->mti_spec.sp_permitted = 0;
- info->mti_spec.sp_migrate_close = 0;
info->mti_spec.u.sp_ea.eadata = NULL;
info->mti_spec.u.sp_ea.eadatalen = 0;
info->mti_env = NULL;
info->mti_pill = NULL;
info->mti_exp = NULL;
+ info->mti_mdt = NULL;
if (unlikely(info->mti_big_buf.lb_buf != NULL))
lu_buf_free(&info->mti_big_buf);
{
if (OBD_FAIL_CHECK(OBD_FAIL_TGT_DELAY_CONDITIONAL) &&
cfs_fail_val ==
- tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- schedule_timeout(cfs_time_seconds(3));
- }
+ tsi2mdt_info(tsi)->mti_mdt->mdt_seq_site.ss_node_id)
+ schedule_timeout_uninterruptible(cfs_time_seconds(3));
return tgt_connect(tsi);
}
* If the xid matches, then we know this is a resent request, and allow
* it. (It's probably an OPEN, for which we don't send a lock.
*/
- if (req_can_reconstruct(req, NULL) == 1)
+ if (req_can_reconstruct(req, NULL) != 0)
return;
/*
GOTO(out_shrink, rc = -EINVAL);
}
- rc = mdt_init_ucred_intent_getattr(info, reqbody);
+ rc = mdt_init_ucred(info, reqbody);
if (rc)
GOTO(out_shrink, rc);
struct ldlm_lock **lockp,
__u64 flags)
{
- struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_LAYOUT];
+ struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct md_layout_change layout = { .mlc_opc = MD_LAYOUT_NOP };
struct layout_intent *intent;
+ struct ldlm_reply *ldlm_rep;
struct lu_fid *fid = &info->mti_tmp_fid2;
struct mdt_object *obj = NULL;
int layout_size = 0;
+ struct lu_buf *buf = &layout.mlc_buf;
int rc = 0;
+
ENTRY;
fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
case LAYOUT_INTENT_RESTORE:
CERROR("%s: Unsupported layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), intent->li_opc);
- rc = -ENOTSUPP;
- break;
+ RETURN(-ENOTSUPP);
default:
CERROR("%s: Unknown layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), intent->li_opc);
- rc = -EINVAL;
- break;
+ RETURN(-EINVAL);
}
- if (rc < 0)
- RETURN(rc);
-
- /* Get lock from request for possible resent case. */
- mdt_intent_fixup_resent(info, *lockp, lhc, flags);
obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
if (IS_ERR(obj))
- GOTO(out, rc = PTR_ERR(obj));
-
+ RETURN(PTR_ERR(obj));
if (mdt_object_exists(obj) && !mdt_object_remote(obj)) {
/* if layout is going to be changed don't use the current EA
} else {
layout_size = mdt_attr_get_eabuf_size(info, obj);
if (layout_size < 0)
- GOTO(out_obj, rc = layout_size);
+ GOTO(out, rc = layout_size);
if (layout_size > info->mti_mdt->mdt_max_mdsize)
info->mti_mdt->mdt_max_mdsize = layout_size;
* set reply buffer size, so that ldlm_handle_enqueue0()->
* ldlm_lvbo_fill() will fill the reply buffer with lovea.
*/
- (*lockp)->l_lvb_type = LVB_T_LAYOUT;
req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
layout_size);
rc = req_capsule_server_pack(info->mti_pill);
if (rc)
- GOTO(out_obj, rc);
+ GOTO(out, rc);
+ ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+ if (!ldlm_rep)
+ GOTO(out, rc = -EPROTO);
- if (layout.mlc_opc != MD_LAYOUT_NOP) {
- struct lu_buf *buf = &layout.mlc_buf;
+ mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
- /**
- * mdt_layout_change is a reint operation, when the request
- * is resent, layout write shouldn't reprocess it again.
- */
- rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
- if (rc)
- GOTO(out_obj, rc = rc < 0 ? rc : 0);
+ /* take lock in ldlm_lock_enqueue() for LAYOUT_INTENT_ACCESS */
+ if (layout.mlc_opc == MD_LAYOUT_NOP)
+ GOTO(out, rc = 0);
- /**
- * There is another resent case: the client's job has been
- * done by another client, referring lod_declare_layout_change
- * -EALREADY case, and it became a operation w/o transaction,
- * so we should not do the layout change, otherwise
- * mdt_layout_change() will try to cancel the granted server
- * CR lock whose remote counterpart is still in hold on the
- * client, and a deadlock ensues.
- */
- rc = mdt_check_resent_lock(info, obj, lhc);
- if (rc <= 0)
- GOTO(out_obj, rc);
-
- buf->lb_buf = NULL;
- buf->lb_len = 0;
- if (unlikely(req_is_replay(mdt_info_req(info)))) {
- buf->lb_buf = req_capsule_client_get(info->mti_pill,
- &RMF_EADATA);
- buf->lb_len = req_capsule_get_size(info->mti_pill,
- &RMF_EADATA, RCL_CLIENT);
- /*
- * If it's a replay of layout write intent RPC, the
- * client has saved the extended lovea when
- * it get reply then.
- */
- if (buf->lb_len > 0)
- mdt_fix_lov_magic(info, buf->lb_buf);
- }
+ rc = mdt_check_resent(info, mdt_reconstruct_generic, lhc);
+ if (rc < 0)
+ GOTO(out, rc);
+ if (rc == 1) {
+ DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+ rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+ GOTO(out, rc);
+ }
+
+ buf->lb_buf = NULL;
+ buf->lb_len = 0;
+ if (unlikely(req_is_replay(mdt_info_req(info)))) {
+ buf->lb_buf = req_capsule_client_get(info->mti_pill,
+ &RMF_EADATA);
+ buf->lb_len = req_capsule_get_size(info->mti_pill,
+ &RMF_EADATA, RCL_CLIENT);
/*
- * Instantiate some layout components, if @buf contains
- * lovea, then it's a replay of the layout intent write
- * RPC.
+ * If it's a replay of layout write intent RPC, the client has
+ * saved the extended lovea when it get reply then.
*/
- rc = mdt_layout_change(info, obj, &layout);
- if (rc)
- GOTO(out_obj, rc);
+ if (buf->lb_len > 0)
+ mdt_fix_lov_magic(info, buf->lb_buf);
}
-out_obj:
- mdt_object_put(info->mti_env, obj);
- if (rc == 0 && lustre_handle_is_used(&lhc->mlh_reg_lh))
+ /* Get lock from request for possible resent case. */
+ mdt_intent_fixup_resent(info, *lockp, lhc, flags);
+ (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+
+ /*
+ * Instantiate some layout components, if @buf contains lovea, then it's
+ * a replay of the layout intent write RPC.
+ */
+ rc = mdt_layout_change(info, obj, lhc, &layout);
+ ldlm_rep->lock_policy_res2 = clear_serious(rc);
+
+ if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
rc = mdt_intent_lock_replace(info, lockp, lhc, flags, rc);
+ if (rc == ELDLM_LOCK_REPLACED &&
+ (*lockp)->l_granted_mode == LCK_EX)
+ ldlm_lock_mode_downgrade(*lockp, LCK_CR);
+ }
+ EXIT;
out:
- lhc->mlh_reg_lh.cookie = 0;
-
- RETURN(rc);
+ mdt_object_put(info->mti_env, obj);
+ return rc;
}
static int mdt_intent_open(enum ldlm_intent_flags it_opc,
struct ldlm_reply *rep = NULL;
long opc;
int rc;
+ struct ptlrpc_request *req = mdt_info_req(info);
static const struct req_format *intent_fmts[REINT_MAX] = {
[REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
rc = mdt_reint_internal(info, lhc, opc);
+ if (rc < 0 && lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ DEBUG_REQ(D_ERROR, req, "Replay open failed with %d", rc);
+
/* Check whether the reply has been packed successfully. */
if (mdt_info_req(info)->rq_repmsg != NULL)
rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
break;
case IT_GETATTR:
check_mdt_object = true;
+ /* fallthrough */
case IT_LOOKUP:
it_format = &RQF_LDLM_INTENT_GETATTR;
it_handler = &mdt_intent_getattr;
} else {
rc = err_serious(-EFAULT);
}
+ } else if (ldesc->l_resource.lr_type == LDLM_IBITS &&
+ ldesc->l_policy_data.l_inodebits.bits == MDS_INODELOCK_DOM) {
+ struct ldlm_reply *rep;
+
+ /* No intent was provided but INTENT flag is set along with
+ * DOM bit, this is considered as GLIMPSE request.
+ * This logic is common for MDT and OST glimpse
+ */
+ mdt_ptlrpc_stats_update(req, IT_GLIMPSE);
+ rc = mdt_glimpse_enqueue(info, ns, lockp, flags);
+ /* Check whether the reply has been packed successfully. */
+ if (req->rq_repmsg != NULL) {
+ rep = req_capsule_server_get(info->mti_pill,
+ &RMF_DLM_REP);
+ rep->lock_policy_res2 =
+ ptlrpc_status_hton(rep->lock_policy_res2);
+ }
} else {
/* No intent was provided */
req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
*/
static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
{
- struct seq_server_site *ss = mdt_seq_site(mdt);
- int rc;
- char *prefix;
+ struct seq_server_site *ss = mdt_seq_site(mdt);
+ char *prefix;
ENTRY;
/* check if this is adding the first MDC and controller is not yet
/* Note: seq_client_fini will be called in seq_site_fini */
snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt));
- rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
- prefix, ss->ss_node_id == 0 ? ss->ss_control_seq :
+ seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
+ prefix, ss->ss_node_id == 0 ? ss->ss_control_seq :
NULL);
OBD_FREE(prefix, MAX_OBD_NAME + 5);
- if (rc != 0) {
- OBD_FREE_PTR(ss->ss_client_seq);
- ss->ss_client_seq = NULL;
- RETURN(rc);
- }
- rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
-
- RETURN(rc);
+ RETURN(seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq));
}
static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt)
return rc;
}
+static int mdt_llog_open(struct tgt_session_info *tsi)
+{
+ ENTRY;
+
+ if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
+ RETURN(err_serious(-EACCES));
+
+ RETURN(tgt_llog_open(tsi));
+}
+
#define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET
#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
#define OST_BRW_READ OST_READ
&RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
TGT_RPC_HANDLER(MDS_FIRST_OPC,
HAS_REPLY, MDS_SET_INFO, mdt_set_info,
- &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION),
+ &RQF_MDT_SET_INFO, LUSTRE_MDS_VERSION),
TGT_MDT_HDL(0, MDS_GET_INFO, mdt_get_info),
TGT_MDT_HDL(HAS_REPLY, MDS_GET_ROOT, mdt_get_root),
TGT_MDT_HDL(HAS_BODY, MDS_GETATTR, mdt_getattr),
OST_PUNCH, mdt_punch_hdl,
mdt_hp_punch),
TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC, mdt_data_sync),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE,
+ mdt_fallocate_hdl),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek),
};
static struct tgt_handler mdt_sec_ctx_ops[] = {
TGT_QUOTA_HDL(HAS_REPLY, QUOTA_DQACQ, mdt_quota_dqacq),
};
+static struct tgt_handler mdt_llog_handlers[] = {
+ TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_CREATE, mdt_llog_open),
+ TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_NEXT_BLOCK, tgt_llog_next_block),
+ TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_READ_HEADER, tgt_llog_read_header),
+ TGT_LLOG_HDL(0, LLOG_ORIGIN_HANDLE_PREV_BLOCK, tgt_llog_prev_block),
+};
+
static struct tgt_opc_slice mdt_common_slice[] = {
{
.tos_opc_start = MDS_FIRST_OPC,
{
.tos_opc_start = LLOG_FIRST_OPC,
.tos_opc_end = LLOG_LAST_OPC,
- .tos_hs = tgt_llog_handlers
+ .tos_hs = mdt_llog_handlers
},
{
.tos_opc_start = LFSCK_FIRST_OPC,
next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
+
+ mdt_restriper_stop(m);
ping_evictor_stop();
/* Remove the HSM /proc entry so the coordinator cannot be
RETURN(-EFAULT);
} else {
lsi = s2lsi(lmi->lmi_sb);
+ LASSERT(lsi->lsi_lmd);
/* CMD is supported only in IAM mode */
LASSERT(num);
- node_id = simple_strtol(num, NULL, 10);
+ rc = kstrtol(num, 10, &node_id);
+ if (rc)
+ RETURN(rc);
+
obd->u.obt.obt_magic = OBT_MAGIC;
- if (lsi->lsi_lmd != NULL &&
- lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
+ if (lsi->lsi_lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
m->mdt_skip_lfsck = 1;
}
- /* DoM files get IO lock at open by default */
- m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+ /* Just try to get a DoM lock by default. Otherwise, having a group
+ * lock granted, it may get blocked for a long time. */
+ m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN;
/* DoM files are read at open and data is packed in the reply */
m->mdt_opts.mo_dom_read_open = 1;
m->mdt_enable_remote_dir = 1;
m->mdt_enable_striped_dir = 1;
m->mdt_enable_dir_migration = 1;
+ m->mdt_enable_dir_restripe = 0;
+ m->mdt_enable_dir_auto_split = 0;
m->mdt_enable_remote_dir_gid = 0;
+ m->mdt_enable_chprojid_gid = 0;
m->mdt_enable_remote_rename = 1;
+ m->mdt_dir_restripe_nsonly = 1;
+ m->mdt_enable_remote_subdir_mount = 1;
atomic_set(&m->mdt_mds_mds_conns, 0);
atomic_set(&m->mdt_async_commit_count, 0);
LDLM_NAMESPACE_SERVER,
LDLM_NAMESPACE_GREEDY,
LDLM_NS_TYPE_MDT);
- if (m->mdt_namespace == NULL)
- GOTO(err_fini_seq, rc = -ENOMEM);
+ if (IS_ERR(m->mdt_namespace)) {
+ rc = PTR_ERR(m->mdt_namespace);
+ CERROR("%s: unable to create server namespace: rc = %d\n",
+ obd->obd_name, rc);
+ m->mdt_namespace = NULL;
+ GOTO(err_fini_seq, rc);
+ }
m->mdt_namespace->ns_lvbp = m;
m->mdt_namespace->ns_lvbo = &mdt_lvbo;
GOTO(err_free_ns, rc);
/* Amount of available space excluded from granting and reserved
- * for metadata. It is in percentage and 50% is default value. */
- tgd->tgd_reserved_pcnt = 50;
+ * for metadata. It is a percentage of the total MDT size. */
+ tgd->tgd_reserved_pcnt = 10;
if (ONE_MB_BRW_SIZE < (1U << tgd->tgd_blockbits))
m->mdt_brw_size = 1U << tgd->tgd_blockbits;
if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
+ if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV))
+ m->mdt_lut.lut_local_recovery = 1;
+
+ rc = mdt_restriper_start(m);
+ if (rc)
+ GOTO(err_ping_evictor, rc);
+
RETURN(0);
+
+err_ping_evictor:
+ ping_evictor_stop();
err_procfs:
mdt_tunables_fini(m);
err_recovery:
init_rwsem(&mo->mot_dom_sem);
init_rwsem(&mo->mot_open_sem);
atomic_set(&mo->mot_open_count, 0);
+ mo->mot_restripe_offset = 0;
+ INIT_LIST_HEAD(&mo->mot_restripe_linkage);
RETURN(o);
}
RETURN(NULL);
RETURN(rc);
}
+static void mdt_object_free_rcu(struct rcu_head *head)
+{
+ struct mdt_object *mo = container_of(head, struct mdt_object,
+ mot_header.loh_rcu);
+
+ kmem_cache_free(mdt_object_kmem, mo);
+}
+
static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
{
- struct mdt_object *mo = mdt_obj(o);
- struct lu_object_header *h;
- ENTRY;
+ struct mdt_object *mo = mdt_obj(o);
+ struct lu_object_header *h;
+ ENTRY;
- h = o->lo_header;
- CDEBUG(D_INFO, "object free, fid = "DFID"\n",
- PFID(lu_object_fid(o)));
+ h = o->lo_header;
+ CDEBUG(D_INFO, "object free, fid = "DFID"\n",
+ PFID(lu_object_fid(o)));
LASSERT(atomic_read(&mo->mot_open_count) == 0);
LASSERT(atomic_read(&mo->mot_lease_count) == 0);
lu_object_fini(o);
lu_object_header_fini(h);
- OBD_SLAB_FREE_PTR(mo, mdt_object_kmem);
+ OBD_FREE_PRE(mo, sizeof(*mo), "slab-freed");
+ call_rcu(&mo->mot_header.loh_rcu, mdt_object_free_rcu);
EXIT;
}
RETURN(0);
}
+static inline void mdt_enable_slc(struct mdt_device *mdt)
+{
+ if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_NEVER)
+ mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_BLOCKING;
+}
+
+static inline void mdt_disable_slc(struct mdt_device *mdt)
+{
+ if (mdt->mdt_lut.lut_sync_lock_cancel == SYNC_LOCK_CANCEL_BLOCKING)
+ mdt->mdt_lut.lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
+}
+
/**
* Match client and server connection feature flags.
*
if (OCD_HAS_FLAG(data, CKSUM)) {
__u32 cksum_types = data->ocd_cksum_types;
- /* The client set in ocd_cksum_types the checksum types it
- * supports. We have to mask off the algorithms that we don't
- * support */
- data->ocd_cksum_types &=
- obd_cksum_types_supported_server(obd_name);
+ tgt_mask_cksum_types(&mdt->mdt_lut, &data->ocd_cksum_types);
if (unlikely(data->ocd_cksum_types == 0)) {
CERROR("%s: Connect with checksum support but no "
exp->exp_obd->obd_name, obd_export_nid2str(exp));
}
+ if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+ !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
+ atomic_inc(&mdt->mdt_mds_mds_conns);
+ mdt_enable_slc(mdt);
+ }
+
+ if (!mdt->mdt_lut.lut_dt_conf.ddp_has_lseek_data_hole)
+ data->ocd_connect_flags2 &= ~OBD_CONNECT2_LSEEK;
+
return 0;
}
static int mdt_export_cleanup(struct obd_export *exp)
{
- struct list_head closing_list;
+ LIST_HEAD(closing_list);
struct mdt_export_data *med = &exp->exp_mdt_data;
struct obd_device *obd = exp->exp_obd;
struct mdt_device *mdt;
int rc = 0;
ENTRY;
- INIT_LIST_HEAD(&closing_list);
spin_lock(&med->med_open_lock);
while (!list_empty(&med->med_open_head)) {
struct list_head *tmp = med->med_open_head.next;
rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
/* Don't unlink orphan on failover umount, LU-184 */
- if (exp->exp_flags & OBD_OPT_FAILOVER) {
+ if (exp->exp_flags & OBD_OPT_FAILOVER ||
+ exp->exp_obd->obd_stopping) {
ma->ma_valid = MA_FLAGS;
ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
}
RETURN(rc);
}
-static inline void mdt_enable_slc(struct mdt_device *mdt)
-{
- if (mdt->mdt_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
- mdt->mdt_lut.lut_sync_lock_cancel = BLOCKING_SYNC_ON_CANCEL;
-}
-
-static inline void mdt_disable_slc(struct mdt_device *mdt)
-{
- if (mdt->mdt_lut.lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL)
- mdt->mdt_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
-}
-
static int mdt_obd_disconnect(struct obd_export *exp)
{
int rc;
mdt = mdt_dev(obd->obd_lu_dev);
- if ((data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
- !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
- atomic_inc(&mdt->mdt_mds_mds_conns);
- mdt_enable_slc(mdt);
- }
-
/*
* first, check whether the stack is ready to handle requests
* XXX: probably not very appropriate method is used now
struct getinfo_fid2path *fp,
struct lu_fid *root_fid)
{
- struct mdt_device *mdt = info->mti_mdt;
- struct mdt_object *mdt_obj;
- struct link_ea_header *leh;
- struct link_ea_entry *lee;
- struct lu_name *tmpname = &info->mti_name;
- struct lu_fid *tmpfid = &info->mti_tmp_fid1;
- struct lu_buf *buf = &info->mti_big_buf;
- char *ptr;
- int reclen;
- struct linkea_data ldata = { NULL };
- int rc = 0;
- bool first = true;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct lu_name *tmpname = &info->mti_name;
+ struct lu_fid *tmpfid = &info->mti_tmp_fid1;
+ struct lu_buf *buf = &info->mti_big_buf;
+ struct linkea_data ldata = { NULL };
+ bool first = true;
+ struct mdt_object *mdt_obj;
+ struct link_ea_header *leh;
+ struct link_ea_entry *lee;
+ char *ptr;
+ int reclen;
+ int rc = 0;
+
ENTRY;
/* temp buffer for path element, the buffer will be finally freed
*tmpfid = fp->gf_fid = *mdt_object_fid(obj);
while (!lu_fid_eq(root_fid, &fp->gf_fid)) {
- struct lu_buf lmv_buf;
-
if (!lu_fid_eq(root_fid, &mdt->mdt_md_root_fid) &&
lu_fid_eq(&mdt->mdt_md_root_fid, &fp->gf_fid))
GOTO(out, rc = -ENOENT);
fp->gf_linkno++;
}
- lmv_buf.lb_buf = info->mti_xattr_buf;
- lmv_buf.lb_len = sizeof(info->mti_xattr_buf);
/* Check if it is slave stripes */
- rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
- &lmv_buf, XATTR_NAME_LMV);
+ rc = mdt_is_dir_stripe(info, mdt_obj);
mdt_object_put(info->mti_env, mdt_obj);
- if (rc > 0) {
- union lmv_mds_md *lmm = lmv_buf.lb_buf;
-
- /* For slave stripes, get its master */
- if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE) {
- fp->gf_fid = *tmpfid;
- continue;
- }
- } else if (rc < 0 && rc != -ENODATA) {
+ if (rc < 0)
GOTO(out, rc);
+ if (rc == 1) {
+ fp->gf_fid = *tmpfid;
+ continue;
}
- rc = 0;
-
/* Pack the name in the end of the buffer */
ptr -= tmpname->ln_namelen;
if (ptr - 1 <= fp->gf_u.gf_path)
first = false;
}
+ /* non-zero will be treated as an error */
+ rc = 0;
+
remote_out:
ptr++; /* skip leading / */
memmove(fp->gf_u.gf_path, ptr,
fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
fpout = val;
- if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+ if (req_capsule_req_need_swab(info->mti_pill))
lustre_swab_fid2path(fpin);
memcpy(fpout, fpin, sizeof(*fpin));
sizeof(struct lu_fid)) {
/* client sent its root FID, which is normally fileset FID */
root_fid = fpin->gf_u.gf_root_fid;
- if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+ if (req_capsule_req_need_swab(info->mti_pill))
lustre_swab_lu_fid(root_fid);
if (root_fid != NULL && !fid_is_sane(root_fid))
if (rc == 0)
rc = dt_ro(&env, dt);
break;
- case OBD_IOC_ABORT_RECOVERY:
+ case OBD_IOC_ABORT_RECOVERY: {
+ struct obd_ioctl_data *data = karg;
+
CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
- obd->obd_abort_recovery = 1;
- target_stop_recovery_thread(obd);
+ if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) {
+ obd->obd_abort_recov_mdt = 1;
+ wake_up(&obd->obd_next_transno_waitq);
+ } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
+ /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */
+ obd->obd_abort_recovery = 1;
+ target_stop_recovery_thread(obd);
+ }
rc = 0;
break;
+ }
case OBD_IOC_CHANGELOG_REG:
case OBD_IOC_CHANGELOG_DEREG:
case OBD_IOC_CHANGELOG_CLEAR:
return rc;
}
-static struct obd_ops mdt_obd_device_ops = {
+static const struct obd_ops mdt_obd_device_ops = {
.o_owner = THIS_MODULE,
.o_set_info_async = mdt_obd_set_info_async,
.o_connect = mdt_obd_connect,
return mdt->mdt_opts.mo_cos != 0;
}
-static struct lu_device_type_operations mdt_device_type_ops = {
- .ldto_device_alloc = mdt_device_alloc,
- .ldto_device_free = mdt_device_free,
- .ldto_device_fini = mdt_device_fini
+static const struct lu_device_type_operations mdt_device_type_ops = {
+ .ldto_device_alloc = mdt_device_alloc,
+ .ldto_device_free = mdt_device_free,
+ .ldto_device_fini = mdt_device_fini
};
static struct lu_device_type mdt_device_type = {
if (rc)
GOTO(lu_fini, rc);
- rc = class_register_type(&mdt_obd_device_ops, NULL, true, NULL,
+ rc = class_register_type(&mdt_obd_device_ops, NULL, true,
LUSTRE_MDT_NAME, &mdt_device_type);
if (rc)
GOTO(mds_fini, rc);