#include "mdt_internal.h"
-
static unsigned int max_mod_rpcs_per_client = 8;
module_param(max_mod_rpcs_per_client, uint, 0644);
MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
};
static struct mdt_device *mdt_dev(struct lu_device *d);
-static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
static const struct lu_object_operations mdt_obj_ops;
{
struct mdt_device *mdt = info->mti_mdt;
struct lu_name *lname = &info->mti_name;
- char *name = NULL;
+ char *filename = info->mti_filename;
struct mdt_object *parent;
u32 mode;
int rc = 0;
LASSERT(!info->mti_cross_ref);
- OBD_ALLOC(name, NAME_MAX + 1);
- if (name == NULL)
- return -ENOMEM;
- lname->ln_name = name;
-
/*
* We may want to allow this to mount a completely separate
* fileset from the MDT in the future, but keeping it to
break;
}
- strncpy(name, s1, lname->ln_namelen);
- name[lname->ln_namelen] = '\0';
+ strncpy(filename, s1, lname->ln_namelen);
+ filename[lname->ln_namelen] = '\0';
+ lname->ln_name = filename;
parent = mdt_object_find(info->mti_env, mdt, fid);
if (IS_ERR(parent)) {
}
}
- OBD_FREE(name, NAME_MAX + 1);
-
return rc;
}
static int mdt_statfs(struct tgt_session_info *tsi)
{
- struct ptlrpc_request *req = tgt_ses_req(tsi);
- struct mdt_thread_info *info = tsi2mdt_info(tsi);
- struct mdt_device *mdt = info->mti_mdt;
- struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
- struct ptlrpc_service_part *svcpt;
- struct obd_statfs *osfs;
- int rc;
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct mdt_thread_info *info = tsi2mdt_info(tsi);
+ struct mdt_device *mdt = info->mti_mdt;
+ struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
+ struct md_device *next = mdt->mdt_child;
+ struct ptlrpc_service_part *svcpt;
+ struct obd_statfs *osfs;
+ struct mdt_body *reqbody = NULL;
+ struct mdt_statfs_cache *msf;
+ int rc;
ENTRY;
if (!osfs)
GOTO(out, rc = -EPROTO);
- rc = tgt_statfs_internal(tsi->tsi_env, &mdt->mdt_lut, osfs,
- ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS,
- NULL);
- if (unlikely(rc))
- GOTO(out, rc);
+ if (mdt_is_sum_statfs_client(req->rq_export) &&
+ lustre_packed_msg_size(req->rq_reqmsg) ==
+ req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
+ &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
+ req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
+ reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+ }
+
+ if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
+ msf = &mdt->mdt_sum_osfs;
+ else
+ msf = &mdt->mdt_osfs;
+
+ if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
+ /** statfs data is too old, get up-to-date one */
+ if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
+ rc = next->md_ops->mdo_statfs(info->mti_env,
+ next, osfs);
+ else
+ rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
+ osfs);
+ if (rc)
+ GOTO(out, rc);
+ spin_lock(&mdt->mdt_lock);
+ msf->msf_osfs = *osfs;
+ msf->msf_age = ktime_get_seconds();
+ spin_unlock(&mdt->mdt_lock);
+ } else {
+ /** use cached statfs data */
+ spin_lock(&mdt->mdt_lock);
+ *osfs = msf->msf_osfs;
+ spin_unlock(&mdt->mdt_lock);
+ }
/* at least try to account for cached pages. its still racy and
* might be under-reporting if clients haven't announced their
* Pack size attributes into the reply.
*/
int mdt_pack_size2body(struct mdt_thread_info *info,
- const struct lu_fid *fid, bool dom_lock)
+ const struct lu_fid *fid, struct lustre_handle *lh)
{
struct mdt_body *b;
struct md_attr *ma = &info->mti_attr;
int dom_stripe;
+ bool dom_lock = false;
ENTRY;
if (dom_stripe == LMM_NO_DOM)
RETURN(-ENOENT);
+ if (lustre_handle_is_used(lh)) {
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(lh);
+ if (lock != NULL) {
+ dom_lock = ldlm_has_dom(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+ }
+
/* no DoM lock, no size in reply */
if (!dom_lock)
RETURN(0);
exp_connect_large_acl(info->mti_exp) &&
buf->lb_buf != info->mti_big_acl) {
if (info->mti_big_acl == NULL) {
+ info->mti_big_aclsize =
+ MIN(mdt->mdt_max_ea_size,
+ XATTR_SIZE_MAX);
OBD_ALLOC_LARGE(info->mti_big_acl,
- mdt->mdt_max_ea_size);
+ info->mti_big_aclsize);
if (info->mti_big_acl == NULL) {
+ info->mti_big_aclsize = 0;
CERROR("%s: unable to grow "
DFID" ACL buffer\n",
mdt_obd_name(mdt),
PFID(mdt_object_fid(o)));
RETURN(-ENOMEM);
}
-
- info->mti_big_aclsize =
- mdt->mdt_max_ea_size;
}
CDEBUG(D_INODE, "%s: grow the "DFID
" ACL buffer to size %d\n",
mdt_obd_name(mdt),
PFID(mdt_object_fid(o)),
- mdt->mdt_max_ea_size);
+ info->mti_big_aclsize);
buf->lb_buf = info->mti_big_acl;
buf->lb_len = info->mti_big_aclsize;
return -EINVAL;
}
+ LASSERT(buf->lb_buf);
+
rc = mo_xattr_get(info->mti_env, next, buf, name);
if (rc > 0) {
return rc;
}
-static int mdt_attr_get_pfid(struct mdt_thread_info *info,
- struct mdt_object *o, struct lu_fid *pfid)
+int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
+ struct lu_fid *pfid)
{
struct lu_buf *buf = &info->mti_buf;
struct link_ea_header *leh;
GOTO(out, rc);
if (S_ISREG(mode))
- (void) mdt_get_som(info, o, &ma->ma_attr);
+ (void) mdt_get_som(info, o, ma);
ma->ma_valid |= MA_INODE;
}
GOTO(out, rc);
}
+ /*
+ * In the handle of MA_INODE, we may already get the SOM attr.
+ */
+ if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
+ rc = mdt_get_som(info, o, ma);
+ if (rc != 0)
+ GOTO(out, rc);
+ }
+
if (need & MA_HSM && S_ISREG(mode)) {
buf->lb_buf = info->mti_xattr_buf;
buf->lb_len = sizeof(info->mti_xattr_buf);
if (rc)
GOTO(out, rc);
+ mutex_lock(&obj->mot_som_mutex);
rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
-
+ mutex_unlock(&obj->mot_som_mutex);
mdt_object_unlock(info, obj, lh, 1);
out:
RETURN(rc);
/* permission check. Make sure the calling process having permission
* to write both files. */
rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
- MAY_WRITE);
+ MAY_WRITE);
if (rc < 0)
GOTO(put, rc);
rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
- MAY_WRITE);
+ MAY_WRITE);
if (rc < 0)
GOTO(put, rc);
LDLM_LOCK_PUT(lock);
mdt_object_put(info->mti_env, child);
/* NB: call the mdt_pack_size2body always after
- * mdt_object_put(), that is why this speacial
+ * mdt_object_put(), that is why this special
* exit path is used. */
rc = mdt_pack_size2body(info, child_fid,
- child_bits & MDS_INODELOCK_DOM);
+ &lhc->mlh_reg_lh);
if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
/* DOM lock was taken in advance but this is
* not DoM file. Drop the lock. */
GOTO(out_parent, rc = 0);
}
- }
- if (lock)
- LDLM_LOCK_PUT(lock);
+ }
+ if (lock)
+ LDLM_LOCK_PUT(lock);
- EXIT;
+ EXIT;
out_child:
- mdt_object_put(info->mti_env, child);
+ mdt_object_put(info->mti_env, child);
out_parent:
- if (lhp)
- mdt_object_unlock(info, parent, lhp, 1);
- return rc;
+ if (lhp)
+ mdt_object_unlock(info, parent, lhp, 1);
+ return rc;
}
/* normal handler: should release the child lock */
out_ucred:
mdt_exit_ucred(info);
out_shrink:
- mdt_client_compatibility(info);
- rc2 = mdt_fix_reply(info);
- if (rc == 0)
- rc = rc2;
- return rc;
+ mdt_client_compatibility(info);
+
+ rc2 = mdt_fix_reply(info);
+ if (rc == 0)
+ rc = rc2;
+
+ /*
+ * Data-on-MDT optimization - read data along with OPEN and return it
+ * in reply. Do that only if we have both DOM and LAYOUT locks.
+ */
+ if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) &&
+ info->mti_attr.ma_lmm != NULL &&
+ mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) {
+ rc = mdt_dom_read_on_open(info, info->mti_mdt,
+ &lhc->mlh_reg_lh);
+ }
+
+ return rc;
}
static long mdt_reint_opcode(struct ptlrpc_request *req,
[REINT_OPEN] = &RQF_MDS_REINT_OPEN,
[REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
[REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK,
- [REINT_MIGRATE] = &RQF_MDS_REINT_RENAME,
+ [REINT_MIGRATE] = &RQF_MDS_REINT_MIGRATE,
[REINT_RESYNC] = &RQF_MDS_REINT_RESYNC,
};
}
/* this should sync the whole device */
-static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
+int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
{
struct dt_device *dt = mdt->mdt_bottom;
int rc;
/* master quotactl */
case Q_SETINFO:
case Q_SETQUOTA:
+ case LUSTRE_Q_SETDEFAULT:
if (!nodemap_can_setquota(nodemap))
GOTO(out_nodemap, rc = -EPERM);
case Q_GETINFO:
case Q_GETQUOTA:
+ case LUSTRE_Q_GETDEFAULT:
if (qmt == NULL)
GOTO(out_nodemap, rc = -EOPNOTSUPP);
/* slave quotactl */
case Q_SETINFO:
case Q_SETQUOTA:
case Q_GETQUOTA:
+ case LUSTRE_Q_SETDEFAULT:
+ case LUSTRE_Q_GETDEFAULT:
/* forward quotactl request to QMT */
rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
break;
{
struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ struct ldlm_cb_set_arg *arg = data;
bool commit_async = false;
int rc;
ENTRY;
unlock_res_and_lock(lock);
RETURN(0);
}
- /* There is no lock conflict if l_blocking_lock == NULL,
- * it indicates a blocking ast sent from ldlm_lock_decref_internal
- * when the last reference to a local lock was released */
- if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
- lock->l_blocking_lock != NULL) {
+
+ /* A blocking ast may be sent from ldlm_lock_decref_internal
+ * when the last reference to a local lock was released and
+ * during blocking event from ldlm_work_bl_ast_lock().
+ * The 'data' parameter is l_ast_data in the first case and
+ * callback arguments in the second one. Distinguish them by that.
+ */
+ if (!data || data == lock->l_ast_data || !arg->bl_desc)
+ goto skip_cos_checks;
+
+ if (lock->l_req_mode & (LCK_PW | LCK_EX)) {
if (mdt_cos_is_enabled(mdt)) {
- if (lock->l_client_cookie !=
- lock->l_blocking_lock->l_client_cookie)
+ if (!arg->bl_desc->bl_same_client)
mdt_set_lock_sync(lock);
} else if (mdt_slc_is_enabled(mdt) &&
- ldlm_is_cos_incompat(lock->l_blocking_lock)) {
+ arg->bl_desc->bl_cos_incompat) {
mdt_set_lock_sync(lock);
/*
* we may do extra commit here, but there is a small
*/
commit_async = true;
}
- } else if (lock->l_req_mode == LCK_COS &&
- lock->l_blocking_lock != NULL) {
+ } else if (lock->l_req_mode == LCK_COS) {
commit_async = true;
}
+skip_cos_checks:
rc = ldlm_blocking_ast_nocheck(lock);
if (commit_async) {
struct lustre_handle *lh, enum ldlm_mode mode,
__u64 *ibits, __u64 trybits, bool cache)
{
- struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
+ struct ldlm_enqueue_info *einfo = &mti->mti_remote_einfo;
union ldlm_policy_data *policy = &mti->mti_policy;
struct ldlm_res_id *res_id = &mti->mti_res_id;
int rc = 0;
einfo->ei_cbdata = o;
}
-
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = *ibits;
policy->l_inodebits.try_bits = trybits;
rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
policy);
- if (rc < 0 && cache) {
+ if (rc < 0 && cache)
mdt_object_put(mti->mti_env, o);
- einfo->ei_cbdata = NULL;
- }
/* Return successfully acquired bits to a caller */
if (rc == 0) {
struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
union ldlm_policy_data *policy = &info->mti_policy;
struct ldlm_res_id *res_id = &info->mti_res_id;
- __u64 dlmflags = 0;
+ __u64 dlmflags = 0, *cookie = NULL;
int rc;
ENTRY;
}
}
-
fid_build_reg_res_name(mdt_object_fid(o), res_id);
dlmflags |= LDLM_FL_ATOMIC_CB;
+ if (info->mti_exp)
+ cookie = &info->mti_exp->exp_handle.h_cookie;
+
/*
* Take PDO lock on whole directory and build correct @res_id for lock
* on part of directory.
* is never going to be sent to client and we do not
* want it slowed down due to possible cancels.
*/
- policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
- policy->l_inodebits.try_bits = 0;
- rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
- policy, res_id, dlmflags,
- info->mti_exp == NULL ? NULL :
- &info->mti_exp->exp_handle.h_cookie);
+ policy->l_inodebits.bits =
+ *ibits & MDS_INODELOCK_UPDATE;
+ policy->l_inodebits.try_bits =
+ trybits & MDS_INODELOCK_UPDATE;
+ /* at least one of them should be set */
+ LASSERT(policy->l_inodebits.bits |
+ policy->l_inodebits.try_bits);
+ rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_pdo_lh,
+ lh->mlh_pdo_mode, policy, res_id,
+ dlmflags, cookie);
if (unlikely(rc != 0))
GOTO(out_unlock, rc);
}
* going to be sent to client. If it is - mdt_intent_policy() path will
* fix it up and turn FL_LOCAL flag off.
*/
- rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
- info->mti_exp == NULL ? NULL :
- &info->mti_exp->exp_handle.h_cookie);
+ rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
+ policy, res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
+ cookie);
out_unlock:
if (rc != 0)
mdt_object_unlock(info, o, lh, 1);
}
}
+ /* other components like LFSCK can use lockless access
+ * and populate cache, so we better invalidate it */
+ mo_invalidate(info->mti_env, mdt_object_child(o));
+
RETURN(0);
}
* actually exists on storage (lu_object_exists()).
*
*/
-static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
+static int mdt_body_unpack(struct mdt_thread_info *info,
+ enum tgt_handler_flags flags)
{
const struct mdt_body *body;
struct mdt_object *obj;
RETURN(rc);
}
-static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
+static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info,
+ enum tgt_handler_flags flags)
{
struct req_capsule *pill = info->mti_pill;
int rc;
return tgt_connect(tsi);
}
-enum mdt_it_code {
- MDT_IT_OPEN,
- MDT_IT_OCREAT,
- MDT_IT_CREATE,
- MDT_IT_GETATTR,
- MDT_IT_READDIR,
- MDT_IT_LOOKUP,
- MDT_IT_UNLINK,
- MDT_IT_TRUNC,
- MDT_IT_GETXATTR,
- MDT_IT_LAYOUT,
- MDT_IT_QUOTA,
- MDT_IT_GLIMPSE,
- MDT_IT_BRW,
- MDT_IT_NR
-};
-
-static int mdt_intent_getattr(enum mdt_it_code opcode,
- struct mdt_thread_info *info,
- struct ldlm_lock **, __u64);
-
-static int mdt_intent_getxattr(enum mdt_it_code opcode,
- struct mdt_thread_info *info,
- struct ldlm_lock **lockp,
- __u64 flags);
-
-static int mdt_intent_layout(enum mdt_it_code opcode,
- struct mdt_thread_info *info,
- struct ldlm_lock **,
- __u64);
-static int mdt_intent_reint(enum mdt_it_code opcode,
- struct mdt_thread_info *info,
- struct ldlm_lock **,
- __u64);
-static int mdt_intent_glimpse(enum mdt_it_code opcode,
+static int mdt_intent_glimpse(enum ldlm_intent_flags it_opc,
struct mdt_thread_info *info,
struct ldlm_lock **lockp, __u64 flags)
{
return mdt_glimpse_enqueue(info, info->mti_mdt->mdt_namespace,
lockp, flags);
}
-static int mdt_intent_brw(enum mdt_it_code opcode,
+static int mdt_intent_brw(enum ldlm_intent_flags it_opc,
struct mdt_thread_info *info,
struct ldlm_lock **lockp, __u64 flags)
{
lockp, flags);
}
-static struct mdt_it_flavor {
- const struct req_format *it_fmt;
- __u32 it_flags;
- int (*it_act)(enum mdt_it_code ,
- struct mdt_thread_info *,
- struct ldlm_lock **,
- __u64);
- long it_reint;
-} mdt_it_flavor[] = {
- [MDT_IT_OPEN] = {
- .it_fmt = &RQF_LDLM_INTENT,
- /*.it_flags = HABEO_REFERO,*/
- .it_flags = 0,
- .it_act = mdt_intent_reint,
- .it_reint = REINT_OPEN
- },
- [MDT_IT_OCREAT] = {
- .it_fmt = &RQF_LDLM_INTENT,
- /*
- * OCREAT is not a MUTABOR request as if the file
- * already exists.
- * We do the extra check of OBD_CONNECT_RDONLY in
- * mdt_reint_open() when we really need to create
- * the object.
- */
- .it_flags = 0,
- .it_act = mdt_intent_reint,
- .it_reint = REINT_OPEN
- },
- [MDT_IT_CREATE] = {
- .it_fmt = &RQF_LDLM_INTENT,
- .it_flags = MUTABOR,
- .it_act = mdt_intent_reint,
- .it_reint = REINT_CREATE
- },
- [MDT_IT_GETATTR] = {
- .it_fmt = &RQF_LDLM_INTENT_GETATTR,
- .it_flags = HABEO_REFERO,
- .it_act = mdt_intent_getattr
- },
- [MDT_IT_READDIR] = {
- .it_fmt = NULL,
- .it_flags = 0,
- .it_act = NULL
- },
- [MDT_IT_LOOKUP] = {
- .it_fmt = &RQF_LDLM_INTENT_GETATTR,
- .it_flags = HABEO_REFERO,
- .it_act = mdt_intent_getattr
- },
- [MDT_IT_UNLINK] = {
- .it_fmt = &RQF_LDLM_INTENT_UNLINK,
- .it_flags = MUTABOR,
- .it_act = NULL,
- .it_reint = REINT_UNLINK
- },
- [MDT_IT_TRUNC] = {
- .it_fmt = NULL,
- .it_flags = MUTABOR,
- .it_act = NULL
- },
- [MDT_IT_GETXATTR] = {
- .it_fmt = &RQF_LDLM_INTENT_GETXATTR,
- .it_flags = HABEO_CORPUS,
- .it_act = mdt_intent_getxattr
- },
- [MDT_IT_LAYOUT] = {
- .it_fmt = &RQF_LDLM_INTENT_LAYOUT,
- .it_flags = 0,
- .it_act = mdt_intent_layout
- },
- [MDT_IT_GLIMPSE] = {
- .it_fmt = &RQF_LDLM_INTENT,
- .it_flags = 0,
- .it_act = mdt_intent_glimpse,
- },
- [MDT_IT_BRW] = {
- .it_fmt = &RQF_LDLM_INTENT,
- .it_flags = 0,
- .it_act = mdt_intent_brw,
- },
-
-};
-
int mdt_intent_lock_replace(struct mdt_thread_info *info,
struct ldlm_lock **lockp,
struct mdt_lock_handle *lh,
dlmreq->lock_handle[0].cookie);
}
-static int mdt_intent_getxattr(enum mdt_it_code opcode,
- struct mdt_thread_info *info,
- struct ldlm_lock **lockp,
- __u64 flags)
+static int mdt_intent_getxattr(enum ldlm_intent_flags it_opc,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ __u64 flags)
{
struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct ldlm_reply *ldlm_rep = NULL;
RETURN(rc);
}
-static int mdt_intent_getattr(enum mdt_it_code opcode,
+static int mdt_intent_getattr(enum ldlm_intent_flags it_opc,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
__u64 flags)
repbody->mbo_eadatasize = 0;
repbody->mbo_aclsize = 0;
- switch (opcode) {
- case MDT_IT_LOOKUP:
+ switch (it_opc) {
+ case IT_LOOKUP:
child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
- break;
- case MDT_IT_GETATTR:
+ break;
+ case IT_GETATTR:
child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
MDS_INODELOCK_PERM;
- break;
- default:
- CERROR("Unsupported intent (%d)\n", opcode);
- GOTO(out_shrink, rc = -EINVAL);
- }
+ break;
+ default:
+ CERROR("%s: unsupported intent %#x\n",
+ mdt_obd_name(info->mti_mdt), (unsigned int)it_opc);
+ GOTO(out_shrink, rc = -EINVAL);
+ }
rc = mdt_init_ucred_intent_getattr(info, reqbody);
if (rc)
return rc;
}
-static int mdt_intent_layout(enum mdt_it_code opcode,
+static int mdt_intent_layout(enum ldlm_intent_flags it_opc,
struct mdt_thread_info *info,
struct ldlm_lock **lockp,
__u64 flags)
int rc = 0;
ENTRY;
- if (opcode != MDT_IT_LAYOUT) {
- CERROR("%s: Unknown intent (%d)\n", mdt_obd_name(info->mti_mdt),
- opcode);
- RETURN(-EINVAL);
- }
-
fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
intent = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
if (layout_size > info->mti_mdt->mdt_max_mdsize)
info->mti_mdt->mdt_max_mdsize = layout_size;
}
+ CDEBUG(D_INFO, "%s: layout_size %d\n",
+ mdt_obd_name(info->mti_mdt), layout_size);
}
/*
out:
lhc->mlh_reg_lh.cookie = 0;
- return rc;
+ RETURN(rc);
}
-static int mdt_intent_reint(enum mdt_it_code opcode,
- struct mdt_thread_info *info,
- struct ldlm_lock **lockp,
- __u64 flags)
+static int mdt_intent_open(enum ldlm_intent_flags it_opc,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ __u64 flags)
{
struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
struct ldlm_reply *rep = NULL;
if (opc < 0)
RETURN(opc);
- if (mdt_it_flavor[opcode].it_reint != opc) {
- CERROR("Reint code %ld doesn't match intent: %d\n",
- opc, opcode);
- RETURN(err_serious(-EPROTO));
- }
-
/* Get lock from request for possible resent case. */
mdt_intent_fixup_resent(info, *lockp, lhc, flags);
RETURN(ELDLM_LOCK_ABORTED);
}
-static int mdt_intent_code(enum ldlm_intent_flags itcode)
+static int mdt_intent_opc(enum ldlm_intent_flags it_opc,
+ struct mdt_thread_info *info,
+ struct ldlm_lock **lockp,
+ u64 flags /* LDLM_FL_* */)
{
+ struct req_capsule *pill = info->mti_pill;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ const struct req_format *it_format;
+ int (*it_handler)(enum ldlm_intent_flags,
+ struct mdt_thread_info *,
+ struct ldlm_lock **,
+ u64);
+ enum tgt_handler_flags it_handler_flags = 0;
+ struct ldlm_reply *rep;
int rc;
+ ENTRY;
- switch (itcode) {
+ switch (it_opc) {
case IT_OPEN:
- rc = MDT_IT_OPEN;
- break;
case IT_OPEN|IT_CREAT:
- rc = MDT_IT_OCREAT;
- break;
- case IT_CREAT:
- rc = MDT_IT_CREATE;
- break;
- case IT_READDIR:
- rc = MDT_IT_READDIR;
+ /*
+ * OCREAT is not a MUTABOR request since the file may
+ * already exist. We do the extra check of
+ * OBD_CONNECT_RDONLY in mdt_reint_open() when we
+ * really need to create the object.
+ */
+ it_format = &RQF_LDLM_INTENT;
+ it_handler = &mdt_intent_open;
break;
case IT_GETATTR:
- rc = MDT_IT_GETATTR;
- break;
case IT_LOOKUP:
- rc = MDT_IT_LOOKUP;
- break;
- case IT_UNLINK:
- rc = MDT_IT_UNLINK;
- break;
- case IT_TRUNC:
- rc = MDT_IT_TRUNC;
+ it_format = &RQF_LDLM_INTENT_GETATTR;
+ it_handler = &mdt_intent_getattr;
+ it_handler_flags = HABEO_REFERO;
break;
case IT_GETXATTR:
- rc = MDT_IT_GETXATTR;
+ it_format = &RQF_LDLM_INTENT_GETXATTR;
+ it_handler = &mdt_intent_getxattr;
+ it_handler_flags = HABEO_CORPUS;
break;
case IT_LAYOUT:
- rc = MDT_IT_LAYOUT;
- break;
- case IT_QUOTA_DQACQ:
- case IT_QUOTA_CONN:
- rc = MDT_IT_QUOTA;
+ it_format = &RQF_LDLM_INTENT_LAYOUT;
+ it_handler = &mdt_intent_layout;
break;
case IT_GLIMPSE:
- rc = MDT_IT_GLIMPSE;
+ it_format = &RQF_LDLM_INTENT;
+ it_handler = &mdt_intent_glimpse;
break;
case IT_BRW:
- rc = MDT_IT_BRW;
- break;
- default:
- CERROR("Unknown intent opcode: 0x%08x\n", itcode);
- rc = -EINVAL;
+ it_format = &RQF_LDLM_INTENT;
+ it_handler = &mdt_intent_brw;
break;
- }
- return rc;
-}
-
-static int mdt_intent_opc(enum ldlm_intent_flags itopc,
- struct mdt_thread_info *info,
- struct ldlm_lock **lockp, __u64 flags)
-{
- struct req_capsule *pill = info->mti_pill;
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_it_flavor *flv;
- int opc;
- int rc;
- ENTRY;
-
- opc = mdt_intent_code(itopc);
- if (opc < 0)
- RETURN(-EINVAL);
-
- if (opc == MDT_IT_QUOTA) {
+ case IT_QUOTA_DQACQ:
+ case IT_QUOTA_CONN: {
struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev;
if (qmt == NULL)
flags);
RETURN(rc);
}
+ default:
+ CERROR("%s: unknown intent code %#x\n",
+ mdt_obd_name(info->mti_mdt), it_opc);
+ RETURN(-EPROTO);
+ }
- flv = &mdt_it_flavor[opc];
- if (flv->it_fmt != NULL)
- req_capsule_extend(pill, flv->it_fmt);
+ req_capsule_extend(pill, it_format);
- rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
+ rc = mdt_unpack_req_pack_rep(info, it_handler_flags);
if (rc < 0)
RETURN(rc);
- if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export))
+ if (it_handler_flags & MUTABOR && mdt_rdonly(req->rq_export))
RETURN(-EROFS);
- if (flv->it_act != NULL) {
- struct ldlm_reply *rep;
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10);
- OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_INTENT_DELAY, 10);
+ /* execute policy */
+ rc = (*it_handler)(it_opc, info, lockp, flags);
- /* execute policy */
- rc = flv->it_act(opc, info, lockp, flags);
-
- /* Check whether the reply has been packed successfully. */
- if (req->rq_repmsg != NULL) {
- rep = req_capsule_server_get(info->mti_pill,
- &RMF_DLM_REP);
- rep->lock_policy_res2 =
- ptlrpc_status_hton(rep->lock_policy_res2);
- }
+ /* Check whether the reply has been packed successfully. */
+ if (req->rq_repmsg != NULL) {
+ rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
+ rep->lock_policy_res2 =
+ ptlrpc_status_hton(rep->lock_policy_res2);
}
RETURN(rc);
LDLM_GLIMPSE_ENQUEUE : LDLM_IBITS_ENQUEUE));
}
-static int mdt_intent_policy(struct ldlm_namespace *ns,
- struct ldlm_lock **lockp, void *req_cookie,
- enum ldlm_mode mode, __u64 flags, void *data)
+static int mdt_intent_policy(const struct lu_env *env,
+ struct ldlm_namespace *ns,
+ struct ldlm_lock **lockp,
+ void *req_cookie,
+ enum ldlm_mode mode,
+ __u64 flags, void *data)
{
struct tgt_session_info *tsi;
struct mdt_thread_info *info;
LASSERT(req != NULL);
- tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+ tsi = tgt_ses_info(env);
info = tsi2mdt_info(tsi);
LASSERT(info != NULL);
mdt_hsm_cdt_fini(m);
+ if (m->mdt_los != NULL) {
+ local_oid_storage_fini(env, m->mdt_los);
+ m->mdt_los = NULL;
+ }
+
if (m->mdt_namespace != NULL) {
ldlm_namespace_free_post(m->mdt_namespace);
d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
struct seq_server_site *ss_site;
const char *identity_upcall = "NONE";
struct md_device *next;
+ struct lu_fid fid;
int rc;
long node_id;
mntopt_t mntopts;
obd = class_name2obd(dev);
LASSERT(obd != NULL);
- m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
+ m->mdt_max_mdsize = MAX_MD_SIZE_OLD;
m->mdt_opts.mo_evict_tgt_nids = 1;
m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
}
/* DoM files get IO lock at open by default */
- m->mdt_opts.mo_dom_lock = 1;
+ m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+ /* DoM files are read at open and data is packed in the reply */
+ m->mdt_opts.mo_dom_read_open = 1;
m->mdt_squash.rsi_uid = 0;
m->mdt_squash.rsi_gid = 0;
INIT_LIST_HEAD(&m->mdt_squash.rsi_nosquash_nids);
init_rwsem(&m->mdt_squash.rsi_sem);
spin_lock_init(&m->mdt_lock);
- m->mdt_enable_remote_dir = 0;
+ m->mdt_enable_remote_dir = 1;
+ m->mdt_enable_striped_dir = 1;
+ m->mdt_enable_dir_migration = 1;
m->mdt_enable_remote_dir_gid = 0;
atomic_set(&m->mdt_mds_mds_conns, 0);
/* set obd_namespace for compatibility with old code */
obd->obd_namespace = m->mdt_namespace;
- rc = mdt_hsm_cdt_init(m);
- if (rc != 0) {
- CERROR("%s: error initializing coordinator, rc %d\n",
- mdt_obd_name(m), rc);
- GOTO(err_free_ns, rc);
- }
-
rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice,
OBD_FAIL_MDS_ALL_REQUEST_NET,
OBD_FAIL_MDS_ALL_REPLY_NET);
if (rc)
- GOTO(err_free_hsm, rc);
+ GOTO(err_free_ns, rc);
/* Amount of available space excluded from granting and reserved
* for metadata. It is in percentage and 50% is default value. */
if (rc)
GOTO(err_tgt, rc);
+ fid.f_seq = FID_SEQ_LOCAL_NAME;
+ fid.f_oid = 1;
+ fid.f_ver = 0;
+ rc = local_oid_storage_init(env, m->mdt_bottom, &fid, &m->mdt_los);
+ if (rc != 0)
+ GOTO(err_fs_cleanup, rc);
+
+ rc = mdt_hsm_cdt_init(m);
+ if (rc != 0) {
+ CERROR("%s: error initializing coordinator, rc %d\n",
+ mdt_obd_name(m), rc);
+ GOTO(err_los_fini, rc);
+ }
+
tgt_adapt_sptlrpc_conf(&m->mdt_lut);
next = m->mdt_child;
if (IS_ERR(m->mdt_identity_cache)) {
rc = PTR_ERR(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
- GOTO(err_fs_cleanup, rc);
+ GOTO(err_free_hsm, rc);
}
rc = mdt_procfs_init(m, dev);
target_recovery_fini(obd);
upcall_cache_cleanup(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
+err_free_hsm:
+ mdt_hsm_cdt_fini(m);
+err_los_fini:
+ local_oid_storage_fini(env, m->mdt_los);
+ m->mdt_los = NULL;
err_fs_cleanup:
mdt_fs_cleanup(env, m);
err_tgt:
tgt_fini(env, &m->mdt_lut);
-err_free_hsm:
- mdt_hsm_cdt_fini(m);
err_free_ns:
ldlm_namespace_free(m->mdt_namespace, NULL, 0);
obd->obd_namespace = m->mdt_namespace = NULL;
lu_object_add_top(h, o);
o->lo_ops = &mdt_obj_ops;
spin_lock_init(&mo->mot_write_lock);
+ mutex_init(&mo->mot_som_mutex);
mutex_init(&mo->mot_lov_mutex);
init_rwsem(&mo->mot_dom_sem);
init_rwsem(&mo->mot_open_sem);
struct mdt_device *mdt,
struct obd_connect_data *data, bool reconnect)
{
+ const char *obd_name = mdt_obd_name(mdt);
LASSERT(data != NULL);
data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
"ocd_version: %x ocd_grant: %d ocd_index: %u "
"ocd_brw_size unexpectedly zero, network data "
"corruption? Refusing to connect this client\n",
- mdt_obd_name(mdt),
- exp->exp_client_uuid.uuid,
+ obd_name, exp->exp_client_uuid.uuid,
exp, data->ocd_connect_flags, data->ocd_version,
data->ocd_grant, data->ocd_index);
return -EPROTO;
if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) {
CWARN("%s: MDS requires FID support, but client not\n",
- mdt_obd_name(mdt));
+ obd_name);
return -EBADE;
}
/* The client set in ocd_cksum_types the checksum types it
* supports. We have to mask off the algorithms that we don't
* support */
- data->ocd_cksum_types &= cksum_types_supported_server();
+ data->ocd_cksum_types &=
+ obd_cksum_types_supported_server(obd_name);
if (unlikely(data->ocd_cksum_types == 0)) {
CERROR("%s: Connect with checksum support but no "
/* Remove mfd handle so it can't be found again.
* We are consuming the mfd_list reference here. */
- class_handle_unhash(&mfd->mfd_handle);
+ class_handle_unhash(&mfd->mfd_open_handle);
list_move_tail(&mfd->mfd_list, &closing_list);
}
spin_unlock(&med->med_open_lock);
* archive request into a noop if it's not actually
* dirty.
*/
- if (mfd->mfd_mode & MDS_FMODE_WRITE)
+ if (mfd->mfd_open_flags & MDS_FMODE_WRITE)
rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
/* Don't unlink orphan on failover umount, LU-184 */