From: yury
Date: Sat, 27 Sep 2008 11:20:20 +0000 (+0000)
Subject: b=16727
X-Git-Tag: v1_9_80~45
X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=54e6e2442374d11ba55518b552f7230a989c9b1c
b=16727
r=adilger,shadow
- various CMD related fixes.
---
diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c
index d77e77a..1d105fb 100644
--- a/lustre/cmm/cmm_split.c
+++ b/lustre/cmm/cmm_split.c
@@ -368,8 +368,6 @@ static int cmm_split_slaves_create(const struct lu_env *env,
GOTO(cleanup, rc);
i++;
}
-
- ma->ma_valid |= MA_LMV;
EXIT;
cleanup:
return rc;
@@ -395,7 +393,7 @@ static inline struct lu_name *cmm_name(const struct lu_env *env,
cmi = cmm_env_info(env);
lname = &cmi->cti_name;
lname->ln_name = name;
- /* NOT count the terminating '\0' of name for length */
+ /* do NOT count the terminating '\0' of name for length */
lname->ln_namelen = buflen - 1;
return lname;
}
@@ -410,7 +408,7 @@ static int cmm_split_remove_entry(const struct lu_env *env,
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct cmm_thread_info *cmi;
- struct md_attr *ma;
+ struct md_attr *ma;
struct cmm_object *obj;
int is_dir, rc;
char *name;
@@ -434,7 +432,7 @@ static int cmm_split_remove_entry(const struct lu_env *env,
/*
* XXX: These days only cross-ref dirs are possible, so for the
* sake of simplicity, in split, we suppose that all cross-ref
- * names pint to directory and do not do additional getattr to
+ * names point to directory and do not do additional getattr to
* remote MDT.
*/
is_dir = 1;
@@ -616,7 +614,7 @@ static int cmm_split_process_dir(const struct lu_env *env,
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
- __u64 hash_segement;
+ __u64 hash_segment;
int rc = 0, i;
ENTRY;
@@ -631,23 +629,23 @@ static int cmm_split_process_dir(const struct lu_env *env,
GOTO(cleanup, rc = -ENOMEM);
}
- LASSERT(ma->ma_valid & MA_LMV);
- hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
+ hash_segment = MAX_HASH_SIZE;
+ do_div(hash_segment, cmm->cmm_tgt_count + 1);
for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
struct lu_fid *lf;
__u64 hash_end;
lf = &ma->ma_lmv->mea_ids[i];
- rdpg->rp_hash = i * hash_segement;
+ rdpg->rp_hash = i * hash_segment;
if (i == cmm->cmm_tgt_count)
hash_end = MAX_HASH_SIZE;
else
- hash_end = rdpg->rp_hash + hash_segement;
+ hash_end = rdpg->rp_hash + hash_segment;
rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
if (rc) {
CERROR("Error (rc = %d) while splitting for %d: fid="
- DFID", %08x:%08x\n", rc, i, PFID(lf),
+ DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
rdpg->rp_hash, hash_end);
GOTO(cleanup, rc);
}
@@ -718,7 +716,6 @@ int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
}
/* Step5: Set mea to the master object. */
- LASSERT(ma->ma_valid & MA_LMV);
buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
rc = mo_xattr_set(env, md_object_next(mo), buf,
MDS_LMV_MD_NAME, 0);
diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c
index edc2fb6..70a1c3e 100644
--- a/lustre/cmm/mdc_object.c
+++ b/lustre/cmm/mdc_object.c
@@ -161,8 +161,9 @@ static int mdc_req2attr_update(const struct lu_env *env,
struct mdc_thread_info *mci;
struct ptlrpc_request *req;
struct mdt_body *body;
- struct lov_mds_md *lov;
+ struct lov_mds_md *md;
struct llog_cookie *cookie;
+ void *acl;
ENTRY;
mci = mdc_info_get(env);
@@ -182,42 +183,68 @@ static int mdc_req2attr_update(const struct lu_env *env,
*ma->ma_capa = *capa;
}
- if (!(body->valid & OBD_MD_FLEASIZE))
- RETURN(0);
+ if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
+ if (body->eadatasize == 0) {
+ CERROR("No size defined for easize field\n");
+ RETURN(-EPROTO);
+ }
+
+ md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
+ body->eadatasize);
+ if (md == NULL)
+ RETURN(-EPROTO);
- if (body->eadatasize == 0) {
- CERROR("OBD_MD_FLEASIZE is set but eadatasize is zero\n");
- RETURN(-EPROTO);
+ LASSERT(ma->ma_lmm != NULL);
+ LASSERT(ma->ma_lmm_size >= body->eadatasize);
+ ma->ma_lmm_size = body->eadatasize;
+ memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
+ ma->ma_valid |= MA_LOV;
}
- lov = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
- body->eadatasize);
- if (lov == NULL)
- RETURN(-EPROTO);
+ if (body->valid & OBD_MD_FLCOOKIE) {
+ /*
+ * ACL and cookie share the same body->aclsize, we need
+ * to make sure that they both never come here.
+ */
+ LASSERT(!(body->valid & OBD_MD_FLACL));
- LASSERT(ma->ma_lmm != NULL);
- LASSERT(ma->ma_lmm_size >= body->eadatasize);
- ma->ma_lmm_size = body->eadatasize;
- memcpy(ma->ma_lmm, lov, ma->ma_lmm_size);
- ma->ma_valid |= MA_LOV;
+ if (body->aclsize == 0) {
+ CERROR("No size defined for cookie field\n");
+ RETURN(-EPROTO);
+ }
- if (!(body->valid & OBD_MD_FLCOOKIE))
- RETURN(0);
+ cookie = req_capsule_server_sized_get(&req->rq_pill,
+ &RMF_LOGCOOKIES,
+ body->aclsize);
+ if (cookie == NULL)
+ RETURN(-EPROTO);
- if (body->aclsize == 0) {
- CERROR("OBD_MD_FLCOOKIE is set but cookie size is zero\n");
- RETURN(-EPROTO);
+ LASSERT(ma->ma_cookie != NULL);
+ LASSERT(ma->ma_cookie_size == body->aclsize);
+ memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
+ ma->ma_valid |= MA_COOKIE;
}
- cookie = req_capsule_server_sized_get(&req->rq_pill, &RMF_ACL,
- body->aclsize);
- if (cookie == NULL)
- RETURN(-EPROTO);
+#ifdef CONFIG_FS_POSIX_ACL
+ if (body->valid & OBD_MD_FLACL) {
+ if (body->aclsize == 0) {
+ CERROR("No size defined for acl field\n");
+ RETURN(-EPROTO);
+ }
+
+ acl = req_capsule_server_sized_get(&req->rq_pill,
+ &RMF_ACL,
+ body->aclsize);
+ if (acl == NULL)
+ RETURN(-EPROTO);
+
+ LASSERT(ma->ma_acl != NULL);
+ LASSERT(ma->ma_acl_size == body->aclsize);
+ memcpy(ma->ma_acl, acl, ma->ma_acl_size);
+ ma->ma_valid |= MA_ACL_DEF;
+ }
+#endif
- LASSERT(ma->ma_cookie != NULL);
- LASSERT(ma->ma_cookie_size == body->aclsize);
- memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
- ma->ma_valid |= MA_COOKIE;
RETURN(0);
}
diff --git a/lustre/include/obd.h b/lustre/include/obd.h
index 81cd3d8..01d8c8f 100644
--- a/lustre/include/obd.h
+++ b/lustre/include/obd.h
@@ -742,10 +742,20 @@ struct lmv_tgt_desc {
struct semaphore ltd_fid_sem;
};
+enum placement_policy {
+ PLACEMENT_CHAR_POLICY = 0,
+ PLACEMENT_NID_POLICY = 1,
+ PLACEMENT_INVAL_POLICY = 2,
+ PLACEMENT_MAX_POLICY
+};
+
+typedef enum placement_policy placement_policy_t;
+
struct lmv_obd {
int refcount;
struct lu_client_fld lmv_fld;
spinlock_t lmv_lock;
+ placement_policy_t lmv_placement;
struct lmv_desc desc;
struct obd_uuid cluuid;
struct obd_export *exp;
@@ -1417,7 +1427,7 @@ enum {
#define MAX_HASH_SIZE_32 0x7fffffffUL
#define MAX_HASH_SIZE 0x7fffffffffffffffULL
-#define MAX_HASH_HIGHEST_BIT 0x1000000000000000
+#define MAX_HASH_HIGHEST_BIT 0x1000000000000000ULL
struct lustre_md {
struct mdt_body *body;
diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c
index d7d37cf..f5b1b4e 100644
--- a/lustre/liblustre/super.c
+++ b/lustre/liblustre/super.c
@@ -2023,7 +2023,6 @@ llu_fsswop_mount(const char *source,
struct inode *root;
struct pnode_base *rootpb;
struct obd_device *obd;
- struct lu_fid rootfid;
struct llu_sb_info *sbi;
struct obd_statfs osfs;
static struct qstr noname = { NULL, 0, 0 };
@@ -2165,16 +2164,20 @@ llu_fsswop_mount(const char *source,
llu_init_ea_size(sbi->ll_md_exp, sbi->ll_dt_exp);
- err = md_getstatus(sbi->ll_md_exp, &rootfid, NULL);
+ fid_zero(&sbi->ll_root_fid);
+ err = md_getstatus(sbi->ll_md_exp, &sbi->ll_root_fid, NULL);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
GOTO(out_lock_cn_cb, err);
}
- CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
- sbi->ll_root_fid = rootfid;
+ if (!fid_is_sane(&sbi->ll_root_fid)) {
+ CERROR("Invalid root fid during mount\n");
+ GOTO(out_lock_cn_cb, err = -EINVAL);
+ }
+ CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&sbi->ll_root_fid));
/* fetch attr of root inode */
- err = md_getattr(sbi->ll_md_exp, &rootfid, NULL,
+ err = md_getattr(sbi->ll_md_exp, &sbi->ll_root_fid, NULL,
OBD_MD_FLGETATTR | OBD_MD_FLBLOCKS, 0, &request);
if (err) {
CERROR("md_getattr failed for root: rc = %d\n", err);
diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c
index 257516f..ca9a7f8 100644
--- a/lustre/llite/llite_lib.c
+++ b/lustre/llite/llite_lib.c
@@ -281,7 +281,6 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
struct inode *root = 0;
struct ll_sb_info *sbi = ll_s2sbi(sb);
struct obd_device *obd;
- struct lu_fid rootfid;
struct obd_capa *oc = NULL;
struct obd_statfs osfs;
struct ptlrpc_request *request = NULL;
@@ -563,13 +562,17 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
GOTO(out_lock_cn_cb, err = -ENOMEM);
}
- err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc);
+ fid_zero(&sbi->ll_root_fid);
+ err = md_getstatus(sbi->ll_md_exp, &sbi->ll_root_fid, &oc);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
GOTO(out_lock_cn_cb, err);
}
- CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
- sbi->ll_root_fid = rootfid;
+ if (!fid_is_sane(&sbi->ll_root_fid)) {
+ CERROR("Invalid root fid during mount\n");
+ GOTO(out_lock_cn_cb, err = -EINVAL);
+ }
+ CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&sbi->ll_root_fid));
sb->s_op = &lustre_super_operations;
sb->s_export_op = &lustre_export_operations;
@@ -582,7 +585,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
else if (sbi->ll_flags & LL_SBI_ACL)
valid |= OBD_MD_FLACL;
- err = md_getattr(sbi->ll_md_exp, &rootfid, oc, valid, 0, &request);
+ err = md_getattr(sbi->ll_md_exp, &sbi->ll_root_fid, oc, valid, 0,
+ &request);
if (oc)
free_capa(oc);
if (err) {
@@ -1875,6 +1879,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
inode->i_mode = (inode->i_mode & S_IFMT)|(body->mode & ~S_IFMT);
if (body->valid & OBD_MD_FLTYPE)
inode->i_mode = (inode->i_mode & ~S_IFMT)|(body->mode & S_IFMT);
+ LASSERT(inode->i_mode != 0);
if (S_ISREG(inode->i_mode)) {
inode->i_blkbits = min(PTLRPC_MAX_BRW_BITS + 1, LL_MAX_BLKSIZE_BITS);
} else {
diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c
index e3f0662..e8e5572 100644
--- a/lustre/llite/statahead.c
+++ b/lustre/llite/statahead.c
@@ -370,6 +370,7 @@ static int do_statahead_interpret(struct ll_statahead_info *sai)
struct dentry *dentry;
struct lookup_intent *it;
int rc = 0;
+ struct mdt_body *body;
ENTRY;
spin_lock(&lli->lli_lock);
@@ -392,6 +393,10 @@ static int do_statahead_interpret(struct ll_statahead_info *sai)
if (entry->se_stat != SA_ENTRY_STATED)
GOTO(out, rc = entry->se_stat);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ GOTO(out, rc = -EFAULT);
+
if (dentry->d_inode == NULL) {
/*
* lookup.
@@ -404,6 +409,13 @@ static int do_statahead_interpret(struct ll_statahead_info *sai)
LASSERT(fid_is_zero(&minfo->mi_data.op_fid2));
+ /*
+ * XXX: No fid in reply, this is probaly cross-ref case.
+ * SA can't handle it yet.
+ */
+ if (body->valid & OBD_MD_MDS)
+ GOTO(out, rc = -EAGAIN);
+
rc = ll_lookup_it_finish(req, it, &icbd);
if (!rc)
/*
@@ -421,10 +433,6 @@ static int do_statahead_interpret(struct ll_statahead_info *sai)
/*
* revalidate.
*/
- struct mdt_body *body;
-
- body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
- sizeof(*body));
if (!lu_fid_eq(&minfo->mi_data.op_fid2, &body->fid1)) {
ll_unhash_aliases(dentry->d_inode);
GOTO(out, rc = -EAGAIN);
diff --git a/lustre/lmv/lmv_fld.c b/lustre/lmv/lmv_fld.c
index fbfe410..c2b9757 100644
--- a/lustre/lmv/lmv_fld.c
+++ b/lustre/lmv/lmv_fld.c
@@ -75,12 +75,13 @@ int lmv_fld_lookup(struct lmv_obd *lmv,
RETURN(rc);
}
- CDEBUG(D_INFO, "Got mds "LPU64" for sequence: "LPU64"\n",
- *mds, fid_seq(fid));
+ CDEBUG(D_INODE, "FLD lookup got mds #"LPU64" for fid="DFID"\n",
+ *mds, PFID(fid));
if (*mds >= lmv->desc.ld_tgt_count) {
- CERROR("Got invalid mds: "LPU64" (max: %d)\n",
- *mds, lmv->desc.ld_tgt_count);
+ CERROR("FLD lookup got invalid mds #"LPU64" (max: %d) "
+ "for fid="DFID"\n", *mds, lmv->desc.ld_tgt_count,
+ PFID(fid));
rc = -EINVAL;
}
RETURN(rc);
diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c
index 285378e..7e35465 100644
--- a/lustre/lmv/lmv_intent.c
+++ b/lustre/lmv/lmv_intent.c
@@ -63,64 +63,55 @@
#include
#include "lmv_internal.h"
-static inline void lmv_drop_intent_lock(struct lookup_intent *it)
-{
- if (it->d.lustre.it_lock_mode != 0) {
- ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
- it->d.lustre.it_lock_mode);
- it->d.lustre.it_lock_mode = 0;
- }
-}
-
int lmv_intent_remote(struct obd_export *exp, void *lmm,
int lmmsize, struct lookup_intent *it,
int flags, struct ptlrpc_request **reqp,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req = NULL;
- struct lustre_handle plock;
- struct md_op_data *op_data;
- struct obd_export *tgt_exp;
- struct mdt_body *body;
- int pmode, rc = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct ptlrpc_request *req = NULL;
+ struct lustre_handle plock;
+ struct md_op_data *op_data;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ int pmode;
+ int rc = 0;
ENTRY;
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_DLM_REP);
- LASSERT(body != NULL);
+ body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ RETURN(-EPROTO);
+ /*
+ * Not cross-ref case, just get out of here.
+ */
if (!(body->valid & OBD_MD_MDS))
RETURN(0);
/*
- * oh, MDS reports that this is remote inode case i.e. we have to ask
- * for real attrs on another MDS.
+ * Unfortunately, we have to lie to MDC/MDS to retrieve
+ * attributes llite needs and provideproper locking.
*/
- if (it->it_op & IT_LOOKUP) {
- /*
- * unfortunately, we have to lie to MDC/MDS to retrieve
- * attributes llite needs.
- */
+ if (it->it_op & IT_LOOKUP)
it->it_op = IT_GETATTR;
- }
- /* we got LOOKUP lock, but we really need attrs */
+ /*
+ * We got LOOKUP lock, but we really need attrs.
+ */
pmode = it->d.lustre.it_lock_mode;
if (pmode) {
plock.cookie = it->d.lustre.it_lock_handle;
it->d.lustre.it_lock_mode = 0;
- it->d.lustre.it_data = 0;
+ it->d.lustre.it_data = NULL;
}
LASSERT(fid_is_sane(&body->fid1));
- it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
-
- tgt_exp = lmv_find_export(lmv, &body->fid1);
- if (IS_ERR(tgt_exp))
- GOTO(out, rc = PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &body->fid1);
+ if (IS_ERR(tgt))
+ GOTO(out, rc = PTR_ERR(tgt));
OBD_ALLOC_PTR(op_data);
if (op_data == NULL)
@@ -128,23 +119,33 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
op_data->op_fid1 = body->fid1;
op_data->op_bias = MDS_CROSS_REF;
+
+ CDEBUG(D_INODE,
+ "REMOTE_INTENT with fid="DFID" -> mds #%d\n",
+ PFID(&body->fid1), tgt->ltd_idx);
- rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
- &req, cb_blocking, extra_lock_flags);
+ it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
+ rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
+ flags, &req, cb_blocking, extra_lock_flags);
+ if (rc)
+ GOTO(out_free_op_data, rc);
/*
- * llite needs LOOKUP lock to track dentry revocation in order to
+ * LLite needs LOOKUP lock to track dentry revocation in order to
* maintain dcache consistency. Thus drop UPDATE lock here and put
* LOOKUP in request.
*/
- if (rc == 0) {
- lmv_drop_intent_lock(it);
- it->d.lustre.it_lock_handle = plock.cookie;
- it->d.lustre.it_lock_mode = pmode;
+ if (it->d.lustre.it_lock_mode != 0) {
+ ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
+ it->d.lustre.it_lock_mode);
+ it->d.lustre.it_lock_mode = 0;
}
+ it->d.lustre.it_lock_handle = plock.cookie;
+ it->d.lustre.it_lock_mode = pmode;
- OBD_FREE_PTR(op_data);
EXIT;
+out_free_op_data:
+ OBD_FREE_PTR(op_data);
out:
if (rc && pmode)
ldlm_lock_decref(&plock, pmode);
@@ -154,41 +155,6 @@ out:
return rc;
}
-int lmv_alloc_slave_fids(struct obd_device *obd, struct lu_fid *pid,
- struct md_op_data *op, struct lu_fid *fid)
-{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_obj *obj;
- mdsno_t mds;
- int mea_idx;
- int rc;
- ENTRY;
-
- obj = lmv_obj_grab(obd, pid);
- if (!obj) {
- CERROR("Object "DFID" should be split\n",
- PFID(pid));
- RETURN(0);
- }
-
- mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)op->op_name, op->op_namelen);
- mds = obj->lo_inodes[mea_idx].li_mds;
- lmv_obj_put(obj);
-
- rc = __lmv_fid_alloc(lmv, fid, mds);
- if (rc) {
- CERROR("Can't allocate new fid, rc %d\n",
- rc);
- RETURN(rc);
- }
-
- CDEBUG(D_INFO, "Allocate new fid "DFID" for split "
- "obj\n", PFID(fid));
-
- RETURN(rc);
-}
-
/*
* IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
* may be split dir.
@@ -199,15 +165,17 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lu_fid rpid = op_data->op_fid1;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct md_op_data *sop_data;
- struct obd_export *tgt_exp;
- struct lmv_stripe_md *mea;
- struct mdt_body *body;
- struct lmv_obj *obj;
- int rc, loop = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lu_fid rpid = op_data->op_fid1;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct md_op_data *sop_data;
+ struct lmv_stripe_md *mea;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ struct lmv_object *obj;
+ int rc;
+ int loop = 0;
+ int sidx;
ENTRY;
OBD_ALLOC_PTR(sop_data);
@@ -221,35 +189,33 @@ repeat:
++loop;
LASSERT(loop <= 2);
- obj = lmv_obj_grab(obd, &rpid);
+ obj = lmv_object_find(obd, &rpid);
if (obj) {
- int mea_idx;
-
/*
* Directory is already split, so we have to forward request to
* the right MDS.
*/
- mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+ sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)op_data->op_name,
op_data->op_namelen);
- rpid = obj->lo_inodes[mea_idx].li_fid;
+ rpid = obj->lo_stripes[sidx].ls_fid;
- sop_data->op_mds = obj->lo_inodes[mea_idx].li_mds;
- tgt_exp = lmv_get_export(lmv, sop_data->op_mds);
+ sop_data->op_mds = obj->lo_stripes[sidx].ls_mds;
+ tgt = lmv_get_target(lmv, sop_data->op_mds);
sop_data->op_bias &= ~MDS_CHECK_SPLIT;
- lmv_obj_put(obj);
- CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid));
- } else {
- struct lmv_tgt_desc *tgt;
+ lmv_object_put(obj);
+ CDEBUG(D_INODE,
+ "Choose slave dir ("DFID") -> mds #%d\n",
+ PFID(&rpid), tgt->ltd_idx);
+ } else {
sop_data->op_bias |= MDS_CHECK_SPLIT;
tgt = lmv_find_target(lmv, &rpid);
sop_data->op_mds = tgt->ltd_idx;
- tgt_exp = tgt->ltd_exp;
}
- if (IS_ERR(tgt_exp))
- GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp));
+ if (IS_ERR(tgt))
+ GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
sop_data->op_fid1 = rpid;
@@ -258,7 +224,6 @@ repeat:
* For open with IT_CREATE and for IT_CREATE cases allocate new
* fid and setup FLD for it.
*/
- /* save old child fid for correctly check stale data*/
sop_data->op_fid3 = sop_data->op_fid2;
rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data);
if (rc)
@@ -270,7 +235,12 @@ repeat:
GOTO(out_free_sop_data, rc);
}
- rc = md_intent_lock(tgt_exp, sop_data, lmm, lmmsize, it, flags,
+ CDEBUG(D_INODE,
+ "OPEN_INTENT with fid1="DFID", fid2="DFID", name='%s' -> mds #%d\n",
+ PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2),
+ sop_data->op_name, tgt->ltd_idx);
+
+ rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, flags,
reqp, cb_blocking, extra_lock_flags);
if (rc == -ERESTART) {
@@ -279,7 +249,7 @@ repeat:
"Got -ERESTART during open!\n");
ptlrpc_req_finished(*reqp);
*reqp = NULL;
- it->d.lustre.it_data = 0;
+ it->d.lustre.it_data = NULL;
/*
* Directory got split. Time to update local object and repeat
@@ -289,8 +259,8 @@ repeat:
rc = lmv_handle_split(exp, &rpid);
if (rc == 0) {
/* We should reallocate child FID. */
- rc = lmv_alloc_slave_fids(obd, &rpid, op_data,
- &sop_data->op_fid2);
+ rc = lmv_allocate_slaves(obd, &rpid, op_data,
+ &sop_data->op_fid2);
if (rc == 0)
goto repeat;
}
@@ -300,6 +270,15 @@ repeat:
GOTO(out_free_sop_data, rc);
/*
+ * Nothing is found, do not access body->fid1 as it is zero and thus
+ * pointless.
+ */
+ if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
+ !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
+ !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
+ GOTO(out_free_sop_data, rc = 0);
+
+ /*
* Okay, MDS has returned success. Probably name has been resolved in
* remote inode.
*/
@@ -313,147 +292,148 @@ repeat:
* this is normal situation, we should not print error here,
* only debug info.
*/
- CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
+ CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
"%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
PFID(&rpid), op_data->op_namelen, op_data->op_name, rc);
GOTO(out_free_sop_data, rc);
}
- /*
- * Nothing is found, do not access body->fid1 as it is zero and thus
- * pointless.
+ /*
+ * Caller may use attrs MDS returns on IT_OPEN lock request so, we have
+ * to update them for split dir.
*/
- if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
- !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
- !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
- GOTO(out_free_sop_data, rc = 0);
-
- /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
- * to update them for split dir */
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_DLM_REP);
+ body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
-
- /* could not find object, FID is not present in response. */
+
+ /*
+ * Could not find object, FID is not present in response.
+ */
if (!(body->valid & OBD_MD_FLID))
GOTO(out_free_sop_data, rc = 0);
- obj = lmv_obj_grab(obd, &body->fid1);
- if (!obj && (mea = lmv_get_mea(*reqp))) {
- /* FIXME: capability for remote! */
- /* wow! this is split dir, we'd like to handle it */
- obj = lmv_obj_create(exp, &body->fid1, mea);
- if (IS_ERR(obj))
- GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
+ obj = lmv_object_find(obd, &body->fid1);
+ if (obj == NULL) {
+ /*
+ * XXX: Capability for remote call!
+ */
+ mea = lmv_get_mea(*reqp);
+ if (mea != NULL) {
+ obj = lmv_object_create(exp, &body->fid1, mea);
+ if (IS_ERR(obj))
+ GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
+ }
}
if (obj) {
- /* This is split dir and we'd want to get attrs. */
- CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
+ /*
+ * This is split dir and we'd want to get attrs.
+ */
+ CDEBUG(D_INODE, "Slave attributes for "DFID"\n",
PFID(&body->fid1));
rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
cb_blocking, extra_lock_flags);
- } else if (S_ISDIR(body->mode)) {
- CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n",
- PFID(&body->fid1));
+ lmv_object_put(obj);
}
-
- if (obj)
- lmv_obj_put(obj);
-
EXIT;
out_free_sop_data:
OBD_FREE_PTR(sop_data);
return rc;
}
-int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
- void *lmm, int lmmsize, struct lookup_intent *it,
- int flags, struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking,
- int extra_lock_flags)
+/*
+ * Handler for: getattr, lookup and revalidate cases.
+ */
+int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
+ void *lmm, int lmmsize, struct lookup_intent *it,
+ int flags, struct ptlrpc_request **reqp,
+ ldlm_blocking_callback cb_blocking,
+ int extra_lock_flags)
{
- struct lmv_obj *obj = NULL, *obj2 = NULL;
- struct obd_device *obd = exp->exp_obd;
- struct lu_fid rpid = op_data->op_fid1;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct md_op_data *sop_data;
- struct lmv_stripe_md *mea;
- struct mdt_body *body;
- mdsno_t mds;
- int rc = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lu_fid rpid = op_data->op_fid1;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_object *obj = NULL;
+ struct md_op_data *sop_data;
+ struct lmv_stripe_md *mea;
+ struct lmv_tgt_desc *tgt = NULL;
+ struct mdt_body *body;
+ int sidx;
+ int loop = 0;
+ int rc = 0;
ENTRY;
OBD_ALLOC_PTR(sop_data);
if (sop_data == NULL)
RETURN(-ENOMEM);
- /* save op_data fro repeat case */
*sop_data = *op_data;
- if (fid_is_sane(&op_data->op_fid2)) {
- /*
- * Caller wants to revalidate attrs of obj we have to revalidate
- * slaves if requested object is split directory.
- */
- CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n",
- PFID(&op_data->op_fid2));
+repeat:
+ ++loop;
+ LASSERT(loop <= 2);
- rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
- if (rc)
- GOTO(out_free_sop_data, rc);
-#if 0
- /*
- * In fact, we do not need this with current intent_lock(), but
- * it may change some day.
- */
- obj = lmv_obj_grab(obd, &op_data->op_fid2);
- if (obj) {
- if (!lu_fid_eq(&op_data->op_fid1, &op_data->op_fid2)){
- rpid = obj->lo_inodes[mds].li_fid;
- mds = obj->lo_inodes[mds].li_mds;
- }
- lmv_obj_put(obj);
- }
-#endif
+ obj = lmv_object_find(obd, &op_data->op_fid1);
+ if (obj && op_data->op_namelen) {
+ sidx = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ (char *)op_data->op_name,
+ op_data->op_namelen);
+ rpid = obj->lo_stripes[sidx].ls_fid;
+ tgt = lmv_get_target(lmv,
+ obj->lo_stripes[sidx].ls_mds);
+ CDEBUG(D_INODE,
+ "Choose slave dir ("DFID") -> mds #%d\n",
+ PFID(&rpid), tgt->ltd_idx);
+ sop_data->op_bias &= ~MDS_CHECK_SPLIT;
} else {
- CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
- op_data->op_namelen, op_data->op_name,
- PFID(&op_data->op_fid1));
-
- rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
- if (rc)
- GOTO(out_free_sop_data, rc);
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
- if (obj && op_data->op_namelen) {
- int mea_idx;
-
- /* directory is already split. calculate mds */
- mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- rpid = obj->lo_inodes[mea_idx].li_fid;
- mds = obj->lo_inodes[mea_idx].li_mds;
- sop_data->op_bias &= ~MDS_CHECK_SPLIT;
- lmv_obj_put(obj);
-
- CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
- mds, PFID(&rpid));
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
- if (rc)
- GOTO(out_free_sop_data, rc);
- sop_data->op_bias |= MDS_CHECK_SPLIT;
- }
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ sop_data->op_bias |= MDS_CHECK_SPLIT;
}
+ if (obj)
+ lmv_object_put(obj);
+
+ if (IS_ERR(tgt))
+ GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
+
+ if (!fid_is_sane(&sop_data->op_fid2))
+ fid_zero(&sop_data->op_fid2);
+
+ CDEBUG(D_INODE,
+ "LOOKUP_INTENT with fid1="DFID", fid2="DFID
+ ", name='%s' -> mds #%d\n",
+ PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2),
+ sop_data->op_name ? sop_data->op_name : "",
+ tgt->ltd_idx);
+ sop_data->op_bias &= ~MDS_CROSS_REF;
sop_data->op_fid1 = rpid;
- rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm,
- lmmsize, it, flags, reqp, cb_blocking,
- extra_lock_flags);
+ rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it,
+ flags, reqp, cb_blocking, extra_lock_flags);
- LASSERTF(rc != -ERESTART, "GETATTR: Got unhandled -ERESTART!\n");
+ if (rc == -ERESTART) {
+ LASSERT(*reqp != NULL);
+ DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
+ "Got -ERESTART during lookup!\n");
+ ptlrpc_req_finished(*reqp);
+ *reqp = NULL;
+ it->d.lustre.it_data = 0;
+
+ /*
+ * Directory got split since last update. This shouldn't be
+ * because splitting causes lock revocation, so revalidate had
+ * to fail and lookup on dir had to return mea.
+ */
+ LASSERT(obj == NULL);
+
+ obj = lmv_object_create(exp, &rpid, NULL);
+ if (IS_ERR(obj))
+ GOTO(out_free_sop_data, rc = PTR_ERR(obj));
+ lmv_object_put(obj);
+ goto repeat;
+ }
+
if (rc < 0)
GOTO(out_free_sop_data, rc);
@@ -461,17 +441,13 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
/*
* This is split dir. In order to optimize things a bit, we
* consider obj valid updating missing parts.
-
- * FIXME: do we need to return any lock here? It would be fine
- * if we don't. This means that nobody should use UPDATE lock to
- * notify about object * removal.
*/
- CDEBUG(D_OTHER,
- "revalidate slaves for "DFID", rc %d\n",
- PFID(&op_data->op_fid2), rc);
+ CDEBUG(D_INODE,
+ "Revalidate slaves for "DFID", rc %d\n",
+ PFID(&op_data->op_fid1), rc);
LASSERT(fid_is_sane(&op_data->op_fid2));
- rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid2, it, rc,
+ rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid1, it, rc,
cb_blocking, extra_lock_flags);
GOTO(out_free_sop_data, rc);
}
@@ -480,8 +456,8 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
GOTO(out_free_sop_data, rc);
/*
- * okay, MDS has returned success. Probably name has been resolved in
- * remote inode.
+ * MDS has returned success. Probably name has been resolved in
+ * remote inode. Let's check this.
*/
rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
reqp, cb_blocking, extra_lock_flags);
@@ -495,319 +471,35 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
GOTO(out_free_sop_data, rc = 0);
- LASSERT(*reqp);
- LASSERT((*reqp)->rq_repmsg);
+ LASSERT(*reqp != NULL);
+ LASSERT((*reqp)->rq_repmsg != NULL);
body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
- /* could not find object, FID is not present in response. */
+ /*
+ * Could not find object, FID is not present in response.
+ */
if (!(body->valid & OBD_MD_FLID))
GOTO(out_free_sop_data, rc = 0);
- obj2 = lmv_obj_grab(obd, &body->fid1);
-
- if (!obj2 && (mea = lmv_get_mea(*reqp))) {
-
- /* FIXME remote capability! */
- /* wow! this is split dir, we'd like to handle it. */
- obj2 = lmv_obj_create(exp, &body->fid1, mea);
- if (IS_ERR(obj2))
- GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2));
- }
-
- if (obj2) {
- /* this is split dir and we'd want to get attrs */
- CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n",
- PFID(&body->fid1), rc);
-
- rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
- cb_blocking, extra_lock_flags);
- lmv_obj_put(obj2);
- }
-
- EXIT;
-out_free_sop_data:
- OBD_FREE_PTR(sop_data);
- return rc;
-}
-
-/* this is not used currently */
-int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lustre_handle *lockh;
- struct md_op_data *op_data;
- struct ldlm_lock *lock;
- struct mdt_body *body2;
- struct mdt_body *body;
- struct lmv_obj *obj;
- int i, rc = 0;
- ENTRY;
-
- LASSERT(reqp);
- LASSERT(*reqp);
-
- /*
- * Master is locked. we'd like to take locks on slaves and update
- * attributes to be returned from the slaves it's important that lookup
- * is called in two cases:
-
- * - for first time (dcache has no such a resolving yet). -
- * ->d_revalidate() returned false.
-
- * Last case possible only if all the objs (master and all slaves aren't
- * valid.
- */
-
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
- RETURN(-ENOMEM);
-
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- LASSERT((body->valid & OBD_MD_FLID) != 0);
- obj = lmv_obj_grab(obd, &body->fid1);
- LASSERT(obj != NULL);
-
- CDEBUG(D_OTHER, "lookup slaves for "DFID"\n",
- PFID(&body->fid1));
-
- lmv_obj_lock(obj);
-
- for (i = 0; i < obj->lo_objcount; i++) {
- struct lu_fid fid = obj->lo_inodes[i].li_fid;
- struct ptlrpc_request *req = NULL;
- struct obd_export *tgt_exp;
- struct lookup_intent it;
-
- if (lu_fid_eq(&fid, &obj->lo_fid))
- /* skip master obj */
- continue;
-
- CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid));
-
- /* is obj valid? */
- memset(&it, 0, sizeof(it));
- it.it_op = IT_GETATTR;
-
- memset(op_data, 0, sizeof(*op_data));
- op_data->op_fid1 = fid;
- op_data->op_fid2 = fid;
- op_data->op_bias = MDS_CROSS_REF;
-
- tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
- if (IS_ERR(tgt_exp))
- GOTO(cleanup, rc = PTR_ERR(tgt_exp));
-
- rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0,
- &req, lmv_blocking_ast, 0);
-
- lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
- if (rc > 0 && req == NULL) {
- /* nice, this slave is valid */
- LASSERT(req == NULL);
- CDEBUG(D_OTHER, "cached\n");
- goto release_lock;
- }
-
- if (rc < 0) {
- /* error during lookup */
- GOTO(cleanup, rc);
- }
- lock = ldlm_handle2lock(lockh);
- LASSERT(lock);
-
- lock->l_ast_data = lmv_obj_get(obj);
-
- body2 = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- LASSERT(body2 != NULL);
-
- obj->lo_inodes[i].li_size = body2->size;
-
- CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long)obj->lo_inodes[i].li_size);
-
- LDLM_LOCK_PUT(lock);
-
- if (req)
- ptlrpc_req_finished(req);
-release_lock:
- lmv_update_body(body, obj->lo_inodes + i);
-
- if (it.d.lustre.it_lock_mode) {
- ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
- it.d.lustre.it_lock_mode = 0;
- }
- }
-
- EXIT;
-cleanup:
- lmv_obj_unlock(obj);
- lmv_obj_put(obj);
- OBD_FREE_PTR(op_data);
- return rc;
-}
-
-int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
- void *lmm, int lmmsize, struct lookup_intent *it,
- int flags, struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking,
- int extra_lock_flags)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lu_fid rpid = op_data->op_fid1;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct md_op_data *sop_data;
- struct lmv_stripe_md *mea;
- struct mdt_body *body;
- struct lmv_obj *obj;
- int rc, loop = 0;
- int mea_idx;
- mdsno_t mds;
- ENTRY;
-
- OBD_ALLOC_PTR(sop_data);
- if (sop_data == NULL)
- RETURN(-ENOMEM);
-
- /* save op_data fro repeat case */
- *sop_data = *op_data;
-
- /*
- * IT_LOOKUP is intended to produce name -> fid resolving (let's call
- * this lookup below) or to confirm requested resolving is still valid
- * (let's call this revalidation) fid_is_sane(&sop_data->op_fid2) specifies
- * revalidation.
- */
- if (fid_is_sane(&op_data->op_fid2)) {
- /*
- * This is revalidate: we have to check is LOOKUP lock still
- * valid for given fid. Very important part is that we have to
- * choose right mds because namespace is per mds.
- */
- rpid = op_data->op_fid1;
- obj = lmv_obj_grab(obd, &rpid);
- if (obj) {
- mea_idx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- rpid = obj->lo_inodes[mea_idx].li_fid;
- mds = obj->lo_inodes[mea_idx].li_mds;
- sop_data->op_bias &= ~MDS_CHECK_SPLIT;
- lmv_obj_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &rpid, &mds);
- if (rc)
- GOTO(out_free_sop_data, rc);
- sop_data->op_bias |= MDS_CHECK_SPLIT;
- }
-
- CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
- PFID(&op_data->op_fid2), mds);
- } else {
-repeat:
- ++loop;
- LASSERT(loop <= 2);
-
- /*
- * This is lookup. During lookup we have to update all the
- * attributes, because returned values will be put in struct
- * inode.
+ obj = lmv_object_find(obd, &body->fid1);
+ if (obj == NULL) {
+ /*
+ * XXX: Remote capability is not handled.
*/
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
- if (obj) {
- if (op_data->op_namelen) {
- /* directory is already split. calculate mds */
- mea_idx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- rpid = obj->lo_inodes[mea_idx].li_fid;
- mds = obj->lo_inodes[mea_idx].li_mds;
- }
- sop_data->op_bias &= ~MDS_CHECK_SPLIT;
- lmv_obj_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
- if (rc)
- GOTO(out_free_sop_data, rc);
- sop_data->op_bias |= MDS_CHECK_SPLIT;
- }
- fid_zero(&sop_data->op_fid2);
- }
-
- sop_data->op_bias &= ~MDS_CROSS_REF;
- sop_data->op_fid1 = rpid;
-
- rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize,
- it, flags, reqp, cb_blocking, extra_lock_flags);
- if (rc > 0) {
- LASSERT(fid_is_sane(&op_data->op_fid2));
- /*
- * Very interesting. it seems object is still valid but for some
- * reason llite calls lookup, not revalidate.
- */
- CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
- PFID(&rpid));
- LASSERT(*reqp == NULL);
- GOTO(out_free_sop_data, rc);
- }
-
- if (rc == 0 && *reqp == NULL) {
- /* once again, we're asked for lookup, not revalidate */
- CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
- PFID(&rpid));
- GOTO(out_free_sop_data, rc);
- }
-
- if (rc == -ERESTART) {
- LASSERT(*reqp != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
- "Got -ERESTART during lookup!\n");
- ptlrpc_req_finished(*reqp);
- *reqp = NULL;
- it->d.lustre.it_data = 0;
- /*
- * Directory got split since last update. This shouldn't be
- * because splitting causes lock revocation, so revalidate had
- * to fail and lookup on dir had to return mea.
- */
- CWARN("we haven't knew about directory splitting!\n");
- LASSERT(obj == NULL);
-
- obj = lmv_obj_create(exp, &rpid, NULL);
- if (IS_ERR(obj))
- GOTO(out_free_sop_data, rc = PTR_ERR(obj));
- lmv_obj_put(obj);
- goto repeat;
- }
-
- if (rc < 0)
- GOTO(out_free_sop_data, rc);
-
- /*
- * Okay, MDS has returned success. Probably name has been resolved in
- * remote inode.
- */
- rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
- cb_blocking, extra_lock_flags);
-
- if (rc == 0 && (mea = lmv_get_mea(*reqp))) {
- /* Wow! This is split dir, we'd like to handle it. */
- body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
- LASSERT(body != NULL);
- LASSERT((body->valid & OBD_MD_FLID) != 0);
-
- obj = lmv_obj_grab(obd, &body->fid1);
- if (!obj) {
- obj = lmv_obj_create(exp, &body->fid1, mea);
+ mea = lmv_get_mea(*reqp);
+ if (mea != NULL) {
+ obj = lmv_object_create(exp, &body->fid1, mea);
if (IS_ERR(obj))
GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
}
- lmv_obj_put(obj);
+ } else {
+ CDEBUG(D_INODE, "Slave attributes for "DFID", rc %d\n",
+ PFID(&body->fid1), rc);
+
+ rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
+ cb_blocking, extra_lock_flags);
+ lmv_object_put(obj);
}
EXIT;
@@ -823,13 +515,13 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
int extra_lock_flags)
{
struct obd_device *obd = exp->exp_obd;
- int rc;
+ int rc;
ENTRY;
LASSERT(it != NULL);
LASSERT(fid_is_sane(&op_data->op_fid1));
- CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
+ CDEBUG(D_INODE, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
LL_IT2STR(it), op_data->op_namelen, op_data->op_name,
PFID(&op_data->op_fid1));
@@ -837,7 +529,7 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
if (rc)
RETURN(rc);
- if (it->it_op & IT_LOOKUP)
+ if (it->it_op & (IT_LOOKUP | IT_GETATTR))
rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
flags, reqp, cb_blocking,
extra_lock_flags);
@@ -845,10 +537,6 @@ int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
flags, reqp, cb_blocking,
extra_lock_flags);
- else if (it->it_op & IT_GETATTR)
- rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it,
- flags, reqp, cb_blocking,
- extra_lock_flags);
else
LBUG();
RETURN(rc);
@@ -859,98 +547,103 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
int master_valid, ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct ptlrpc_request *mreq = *reqp;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lustre_handle master_lockh;
- struct obd_export *tgt_exp;
- struct md_op_data *op_data;
- struct ldlm_lock *lock;
- unsigned long size = 0;
- struct mdt_body *body;
- struct lmv_obj *obj;
- int master_lock_mode;
- int i, rc = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int master_lockm = 0;
+ struct lustre_handle *lockh = NULL;
+ struct ptlrpc_request *mreq = *reqp;
+ struct lustre_handle master_lockh;
+ struct md_op_data *op_data;
+ struct ldlm_lock *lock;
+ unsigned long size = 0;
+ struct mdt_body *body;
+ struct lmv_object *obj;
+ int i;
+ int rc = 0;
+ struct lu_fid fid;
+ struct ptlrpc_request *req;
+ ldlm_blocking_callback cb;
+ struct lookup_intent it;
+ struct lmv_tgt_desc *tgt;
+ int master;
ENTRY;
+ CDEBUG(D_INODE, "Revalidate master obj "DFID"\n", PFID(mid));
+
OBD_ALLOC_PTR(op_data);
if (op_data == NULL)
RETURN(-ENOMEM);
/*
* We have to loop over the subobjects, check validity and update them
- * from MDSs if needed. it's very useful that we need not to update all
- * the fields. say, common fields (that are equal on all the subojects
+ * from MDS if needed. It's very useful that we need not to update all
+ * the fields. Say, common fields (that are equal on all the subojects
* need not to be update, another fields (i_size, for example) are
* cached all the time.
*/
- obj = lmv_obj_grab(obd, mid);
- LASSERT(obj != NULL);
-
- master_lock_mode = 0;
-
- lmv_obj_lock(obj);
+ obj = lmv_object_find_lock(obd, mid);
+ if (obj == NULL)
+ RETURN(-EALREADY);
for (i = 0; i < obj->lo_objcount; i++) {
- struct lu_fid fid = obj->lo_inodes[i].li_fid;
- struct lustre_handle *lockh = NULL;
- struct ptlrpc_request *req = NULL;
- ldlm_blocking_callback cb;
- struct lookup_intent it;
- int master = 0;
-
- CDEBUG(D_OTHER, "revalidate subobj "DFID"\n",
- PFID(&fid));
+ fid = obj->lo_stripes[i].ls_fid;
+ master = lu_fid_eq(&fid, &obj->lo_fid);
+ cb = master ? cb_blocking : lmv_blocking_ast;
- memset(op_data, 0, sizeof(*op_data));
+ /*
+ * We need i_size and we would like to check possible cached locks,
+ * so this is is IT_GETATTR intent.
+ */
memset(&it, 0, sizeof(it));
it.it_op = IT_GETATTR;
- cb = lmv_blocking_ast;
-
- if (lu_fid_eq(&fid, &obj->lo_fid)) {
- if (master_valid) {
- /*
- * lmv_intent_getattr() already checked
- * validness and took the lock.
- */
- if (mreq) {
- /*
- * It even got the reply refresh attrs
- * from that reply.
- */
- body = req_capsule_server_get(
- &mreq->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(body != NULL);
- goto update;
- }
- /* take already cached attrs into account */
- CDEBUG(D_OTHER,
- "master is locked and cached\n");
- goto release_lock;
+ if (master && master_valid) {
+ /*
+ * lmv_intent_lookup() already checked
+ * validness and took the lock.
+ */
+ if (mreq != NULL) {
+ body = req_capsule_server_get(&mreq->rq_pill,
+ &RMF_MDT_BODY);
+ LASSERT(body != NULL);
+ goto update;
}
- master = 1;
- cb = cb_blocking;
+ /*
+ * Take already cached attrs into account.
+ */
+ CDEBUG(D_INODE,
+ "Master "DFID"is locked and cached\n",
+ PFID(mid));
+ goto release_lock;
}
+ /*
+ * Prepare op_data for revalidating. Note that @fid2 shuld be
+ * defined otherwise it will go to server and take new lock
+ * which is what we reall not need here.
+ */
+ memset(op_data, 0, sizeof(*op_data));
+ op_data->op_bias = MDS_CROSS_REF;
op_data->op_fid1 = fid;
op_data->op_fid2 = fid;
- op_data->op_bias = MDS_CROSS_REF;
+ req = NULL;
- /* Is obj valid? */
- tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
- if (IS_ERR(tgt_exp))
- GOTO(cleanup, rc = PTR_ERR(tgt_exp));
+ tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
+ if (IS_ERR(tgt))
+ GOTO(cleanup, rc = PTR_ERR(tgt));
- rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb,
- extra_lock_flags);
+ CDEBUG(D_INODE, "Revalidate slave obj "DFID" -> mds #%d\n",
+ PFID(&fid), tgt->ltd_idx);
+
+ rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0,
+ &req, cb, extra_lock_flags);
lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
if (rc > 0 && req == NULL) {
- /* Nice, this slave is valid */
- LASSERT(req == NULL);
- CDEBUG(D_OTHER, "cached\n");
+ /*
+ * Nice, this slave is valid.
+ */
+ CDEBUG(D_INODE, "Cached slave "DFID"\n", PFID(&fid));
goto release_lock;
}
@@ -958,17 +651,21 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
GOTO(cleanup, rc);
if (master) {
- LASSERT(master_valid == 0);
- /* Save lock on master to be returned to the caller. */
- CDEBUG(D_OTHER, "no lock on master yet\n");
+ /*
+ * Save lock on master to be returned to the caller.
+ */
+ CDEBUG(D_INODE, "No lock on master "DFID" yet\n",
+ PFID(mid));
memcpy(&master_lockh, lockh, sizeof(master_lockh));
- master_lock_mode = it.d.lustre.it_lock_mode;
+ master_lockm = it.d.lustre.it_lock_mode;
it.d.lustre.it_lock_mode = 0;
} else {
- /* This is slave. We want to control it. */
+ /*
+ * This is slave. We want to control it.
+ */
lock = ldlm_handle2lock(lockh);
LASSERT(lock != NULL);
- lock->l_ast_data = lmv_obj_get(obj);
+ lock->l_ast_data = lmv_object_get(obj);
LDLM_LOCK_PUT(lock);
}
@@ -977,7 +674,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
* This is first reply, we'll use it to return updated
* data back to the caller.
*/
- LASSERT(req);
+ LASSERT(req != NULL);
ptlrpc_request_addref(req);
*reqp = req;
}
@@ -986,17 +683,17 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
LASSERT(body != NULL);
update:
- obj->lo_inodes[i].li_size = body->size;
+ obj->lo_stripes[i].ls_size = body->size;
- CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long)obj->lo_inodes[i].li_size);
+ CDEBUG(D_INODE, "Fresh size %lu from "DFID"\n",
+ (unsigned long)obj->lo_stripes[i].ls_size, PFID(&fid));
if (req)
ptlrpc_req_finished(req);
release_lock:
- size += obj->lo_inodes[i].li_size;
+ size += obj->lo_stripes[i].ls_size;
- if (it.d.lustre.it_lock_mode) {
+ if (it.d.lustre.it_lock_mode && lockh) {
ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
it.d.lustre.it_lock_mode = 0;
}
@@ -1007,12 +704,11 @@ release_lock:
* Some attrs got refreshed, we have reply and it's time to put
* fresh attrs to it.
*/
- CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
- (unsigned long)size);
+ CDEBUG(D_INODE, "Return refreshed attrs: size = %lu for "DFID"\n",
+ (unsigned long)size, PFID(mid));
body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
-
body->size = size;
if (mreq == NULL) {
@@ -1025,21 +721,54 @@ release_lock:
}
if (master_valid == 0) {
oit->d.lustre.it_lock_handle = master_lockh.cookie;
- oit->d.lustre.it_lock_mode = master_lock_mode;
+ oit->d.lustre.it_lock_mode = master_lockm;
}
rc = 0;
} else {
- /* It seems all the attrs are fresh and we did no request */
- CDEBUG(D_OTHER, "all the attrs were fresh\n");
+ /*
+ * It seems all the attrs are fresh and we did no request.
+ */
+ CDEBUG(D_INODE, "All the attrs were fresh on "DFID"\n",
+ PFID(mid));
if (master_valid == 0)
- oit->d.lustre.it_lock_mode = master_lock_mode;
+ oit->d.lustre.it_lock_mode = master_lockm;
rc = 1;
}
EXIT;
cleanup:
OBD_FREE_PTR(op_data);
- lmv_obj_unlock(obj);
- lmv_obj_put(obj);
+ lmv_object_put_unlock(obj);
return rc;
}
+
+int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid,
+ struct md_op_data *op, struct lu_fid *fid)
+{
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_object *obj;
+ mdsno_t mds;
+ int sidx;
+ int rc;
+ ENTRY;
+
+ obj = lmv_object_find(obd, pid);
+ if (obj == NULL)
+ RETURN(-EALREADY);
+
+ sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+ (char *)op->op_name, op->op_namelen);
+ mds = obj->lo_stripes[sidx].ls_mds;
+ lmv_object_put(obj);
+
+ rc = __lmv_fid_alloc(lmv, fid, mds);
+ if (rc) {
+ CERROR("Can't allocate fid, rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
+ "obj -> mds #"LPU64"\n", PFID(fid), mds);
+
+ RETURN(rc);
+}
diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h
index c898acf..818d53d 100644
--- a/lustre/lmv/lmv_internal.h
+++ b/lustre/lmv/lmv_internal.h
@@ -40,15 +40,6 @@
#include
#include
-#ifndef __KERNEL__
-/* XXX: dirty hack, needs to be fixed more clever way. */
-struct qstr {
- const char *name;
- size_t len;
- unsigned hashval;
-};
-#endif
-
#define LMV_MAX_TGT_COUNT 128
#define lmv_init_lock(lmv) down(&lmv->init_sem);
@@ -57,66 +48,108 @@ struct qstr {
#define LL_IT2STR(it) \
((it) ? ldlm_it2str((it)->it_op) : "0")
-struct lmv_inode {
- struct lu_fid li_fid; /* id of dirobj */
- mdsno_t li_mds; /* cached mdsno where @li_fid lives */
- unsigned long li_size; /* slave size value */
- int li_flags;
+struct lmv_stripe {
+ /**
+ * Dir stripe fid.
+ */
+ struct lu_fid ls_fid;
+ /**
+ * Cached home mds number for @li_fid.
+ */
+ mdsno_t ls_mds;
+ /**
+ * Stripe object size.
+ */
+ unsigned long ls_size;
+ /**
+ * Stripe flags.
+ */
+ int ls_flags;
};
-#define O_FREEING (1 << 0)
-
-struct lmv_obj {
- struct list_head lo_list;
- struct semaphore lo_guard;
- int lo_state; /* object state. */
- atomic_t lo_count; /* ref counter. */
- struct lu_fid lo_fid; /* master id of dir */
- void *lo_update; /* bitmap of status (up-to-date) */
- __u32 lo_hashtype;
- int lo_objcount; /* number of slaves */
- struct lmv_inode *lo_inodes; /* array of sub-objs */
- struct obd_device *lo_obd; /* pointer to LMV itself */
+#define O_FREEING (1 << 0)
+
+struct lmv_object {
+ /**
+ * Link to global objects list.
+ */
+ struct list_head lo_list;
+ /**
+ * Sema for protecting fields.
+ */
+ struct semaphore lo_guard;
+ /**
+ * Object state like O_FREEING.
+ */
+ int lo_state;
+ /**
+ * Object ref counter.
+ */
+ atomic_t lo_count;
+ /**
+ * Object master fid.
+ */
+ struct lu_fid lo_fid;
+ /**
+ * Object hash type to find stripe by name.
+ */
+ __u32 lo_hashtype;
+ /**
+ * Number of stripes.
+ */
+ int lo_objcount;
+ /**
+ * Array of sub-objs.
+ */
+ struct lmv_stripe *lo_stripes;
+ /**
+ * Pointer to LMV obd.
+ */
+ struct obd_device *lo_obd;
};
-int lmv_obj_setup(struct obd_device *obd);
-void lmv_obj_cleanup(struct obd_device *obd);
+int lmv_object_setup(struct obd_device *obd);
+void lmv_object_cleanup(struct obd_device *obd);
static inline void
-lmv_obj_lock(struct lmv_obj *obj)
+lmv_object_lock(struct lmv_object *obj)
{
LASSERT(obj);
down(&obj->lo_guard);
}
static inline void
-lmv_obj_unlock(struct lmv_obj *obj)
+lmv_object_unlock(struct lmv_object *obj)
{
LASSERT(obj);
up(&obj->lo_guard);
}
-void lmv_obj_add(struct lmv_obj *obj);
-void lmv_obj_del(struct lmv_obj *obj);
+void lmv_object_add(struct lmv_object *obj);
+void lmv_object_del(struct lmv_object *obj);
+
+void lmv_object_put(struct lmv_object *obj);
+void lmv_object_put_unlock(struct lmv_object *obj);
+void lmv_object_free(struct lmv_object *obj);
-void lmv_obj_put(struct lmv_obj *obj);
-void lmv_obj_free(struct lmv_obj *obj);
+struct lmv_object *lmv_object_get(struct lmv_object *obj);
-struct lmv_obj *lmv_obj_get(struct lmv_obj *obj);
+struct lmv_object *lmv_object_find(struct obd_device *obd,
+ const struct lu_fid *fid);
-struct lmv_obj *lmv_obj_grab(struct obd_device *obd,
- const struct lu_fid *fid);
+struct lmv_object *lmv_object_find_lock(struct obd_device *obd,
+ const struct lu_fid *fid);
-struct lmv_obj *lmv_obj_alloc(struct obd_device *obd,
- const struct lu_fid *fid,
- struct lmv_stripe_md *mea);
+struct lmv_object *lmv_object_alloc(struct obd_device *obd,
+ const struct lu_fid *fid,
+ struct lmv_stripe_md *mea);
-struct lmv_obj *lmv_obj_create(struct obd_export *exp,
- const struct lu_fid *fid,
- struct lmv_stripe_md *mea);
+struct lmv_object *lmv_object_create(struct obd_export *exp,
+ const struct lu_fid *fid,
+ struct lmv_stripe_md *mea);
-int lmv_obj_delete(struct obd_export *exp,
- const struct lu_fid *fid);
+int lmv_object_delete(struct obd_export *exp,
+ const struct lu_fid *fid);
int lmv_check_connect(struct obd_device *obd);
@@ -138,11 +171,8 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags);
-int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
- void *lmm, int lmmsize, struct lookup_intent *it,
- int flags, struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking,
- int extra_lock_flags);
+int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid,
+ struct md_op_data *op, struct lu_fid *fid);
int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **,
const struct lu_fid *, struct lookup_intent *, int,
@@ -158,34 +188,31 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
mdsno_t mds);
int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
struct md_op_data *op_data);
-int lmv_alloc_slave_fids(struct obd_device *obd, struct lu_fid *pid,
- struct md_op_data *op, struct lu_fid *fid);
-static inline struct lmv_stripe_md *
-lmv_get_mea(struct ptlrpc_request *req)
+static inline struct lmv_stripe_md *lmv_get_mea(struct ptlrpc_request *req)
{
- struct mdt_body *body;
- struct lmv_stripe_md *mea;
+ struct mdt_body *body;
+ struct lmv_stripe_md *mea;
- LASSERT(req);
+ LASSERT(req != NULL);
body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- if (!body || !S_ISDIR(body->mode) || !body->eadatasize)
- return NULL;
+ if (!body || !S_ISDIR(body->mode) || !body->eadatasize)
+ return NULL;
mea = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
body->eadatasize);
- LASSERT(mea != NULL);
+ LASSERT(mea != NULL);
- if (mea->mea_count == 0)
- return NULL;
+ if (mea->mea_count == 0)
+ return NULL;
if( mea->mea_magic != MEA_MAGIC_LAST_CHAR &&
mea->mea_magic != MEA_MAGIC_ALL_CHARS &&
mea->mea_magic != MEA_MAGIC_HASH_SEGMENT)
return NULL;
-
- return mea;
+
+ return mea;
}
static inline int lmv_get_easize(struct lmv_obd *lmv)
@@ -201,12 +228,6 @@ lmv_get_target(struct lmv_obd *lmv, mdsno_t mds)
return &lmv->tgts[mds];
}
-static inline struct obd_export *
-lmv_get_export(struct lmv_obd *lmv, mdsno_t mds)
-{
- return lmv_get_target(lmv, mds)->ltd_exp;
-}
-
static inline struct lmv_tgt_desc *
lmv_find_target(struct lmv_obd *lmv, const struct lu_fid *fid)
{
@@ -220,22 +241,6 @@ lmv_find_target(struct lmv_obd *lmv, const struct lu_fid *fid)
return lmv_get_target(lmv, mds);
}
-static inline struct obd_export *
-lmv_find_export(struct lmv_obd *lmv, const struct lu_fid *fid)
-{
- struct lmv_tgt_desc *tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
- return (struct obd_export *)tgt;
- return tgt->ltd_exp;
-}
-
-static inline void lmv_update_body(struct mdt_body *body,
- struct lmv_inode *lino)
-{
- /* update object size */
- body->size += lino->li_size;
-}
-
/* lproc_lmv.c */
#ifdef LPROCFS
void lprocfs_lmv_init_vars(struct lprocfs_static_vars *lvars);
diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c
index 4a30926..5417c67 100644
--- a/lustre/lmv/lmv_obd.c
+++ b/lustre/lmv/lmv_obd.c
@@ -64,8 +64,8 @@
#include "lmv_internal.h"
/* object cache. */
-cfs_mem_cache_t *obj_cache;
-atomic_t obj_cache_count = ATOMIC_INIT(0);
+cfs_mem_cache_t *lmv_object_cache;
+atomic_t lmv_object_count = ATOMIC_INIT(0);
static void lmv_activate_target(struct lmv_obd *lmv,
struct lmv_tgt_desc *tgt,
@@ -78,7 +78,8 @@ static void lmv_activate_target(struct lmv_obd *lmv,
lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
}
-/* Error codes:
+/**
+ * Error codes:
*
* -EINVAL : UUID can't be found in the LMV's target list
* -ENOTCONN: The UUID is found, but the target connection is bad (!)
@@ -87,9 +88,10 @@ static void lmv_activate_target(struct lmv_obd *lmv,
static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
int activate)
{
- struct lmv_tgt_desc *tgt;
- struct obd_device *obd;
- int i, rc = 0;
+ struct lmv_tgt_desc *tgt;
+ struct obd_device *obd;
+ int i;
+ int rc = 0;
ENTRY;
CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
@@ -100,7 +102,7 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
if (tgt->ltd_exp == NULL)
continue;
- CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
+ CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n",
i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
@@ -125,11 +127,9 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
GOTO(out_lmv_lock, rc);
}
- CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
- obd, activate ? "" : "in");
-
+ CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
+ activate ? "" : "in");
lmv_activate_target(lmv, tgt, activate);
-
EXIT;
out_lmv_lock:
@@ -140,8 +140,8 @@ static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
struct obd_connect_data *data)
{
- struct lmv_tgt_desc *tgt;
- int i;
+ struct lmv_tgt_desc *tgt;
+ int i;
ENTRY;
LASSERT(data != NULL);
@@ -169,9 +169,10 @@ struct obd_uuid *lmv_get_uuid(struct obd_export *exp) {
static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
enum obd_notify_event ev, void *data)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_uuid *uuid;
- int rc = 0;
+ struct obd_connect_data *conn_data;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct obd_uuid *uuid;
+ int rc = 0;
ENTRY;
if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
@@ -196,11 +197,11 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
RETURN(rc);
}
} else if (ev == OBD_NOTIFY_OCD) {
- struct obd_connect_data *conn_data =
- &watched->u.cli.cl_import->imp_connect_data;
+ conn_data = &watched->u.cli.cl_import->imp_connect_data;
- /* Set connect data to desired target, update
- * exp_connect_flags. */
+ /*
+ * Set connect data to desired target, update exp_connect_flags.
+ */
rc = lmv_set_mdc_data(lmv, uuid, conn_data);
if (rc) {
CERROR("can't set connect data to target %s, rc %d\n",
@@ -218,19 +219,25 @@ static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
}
#if 0
else if (ev == OBD_NOTIFY_DISCON) {
- /* For disconnect event, flush fld cache for failout MDS case. */
+ /*
+ * For disconnect event, flush fld cache for failout MDS case.
+ */
fld_client_flush(&lmv->lmv_fld);
}
#endif
- /* Pass the notification up the chain. */
+ /*
+ * Pass the notification up the chain.
+ */
if (obd->obd_observer)
rc = obd_notify(obd->obd_observer, watched, ev, data);
RETURN(rc);
}
-/* this is fake connect function. Its purpose is to initialize lmv and say
- * caller that everything is okay. Real connection will be performed later. */
+/**
+ * This is fake connect function. Its purpose is to initialize lmv and say
+ * caller that everything is okay. Real connection will be performed later.
+ */
static int lmv_connect(const struct lu_env *env,
struct lustre_handle *conn, struct obd_device *obd,
struct obd_uuid *cluuid, struct obd_connect_data *data,
@@ -239,9 +246,9 @@ static int lmv_connect(const struct lu_env *env,
#ifdef __KERNEL__
struct proc_dir_entry *lmv_proc_dir;
#endif
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *exp;
- int rc = 0;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct obd_export *exp;
+ int rc = 0;
ENTRY;
rc = class_connect(conn, obd, cluuid);
@@ -252,8 +259,10 @@ static int lmv_connect(const struct lu_env *env,
exp = class_conn2export(conn);
- /* we don't want to actually do the underlying connections more than
- * once, so keep track. */
+ /*
+ * We don't want to actually do the underlying connections more than
+ * once, so keep track.
+ */
lmv->refcount++;
if (lmv->refcount > 1) {
class_export_put(exp);
@@ -277,10 +286,12 @@ static int lmv_connect(const struct lu_env *env,
}
#endif
- /* all real clients should perform actual connection right away, because
+ /*
+ * All real clients should perform actual connection right away, because
* it is possible, that LMV will not have opportunity to connect targets
* and MDC stuff will be called directly, for instance while reading
- * ../mdc/../kbytesfree procfs file, etc. */
+ * ../mdc/../kbytesfree procfs file, etc.
+ */
if (data->ocd_connect_flags & OBD_CONNECT_REAL)
rc = lmv_check_connect(obd);
@@ -296,9 +307,9 @@ static int lmv_connect(const struct lu_env *env,
static void lmv_set_timeouts(struct obd_device *obd)
{
- struct lmv_tgt_desc *tgts;
- struct lmv_obd *lmv;
- int i;
+ struct lmv_tgt_desc *tgts;
+ struct lmv_obd *lmv;
+ int i;
lmv = &obd->u.lmv;
if (lmv->server_timeout == 0)
@@ -319,9 +330,11 @@ static void lmv_set_timeouts(struct obd_device *obd)
static int lmv_init_ea_size(struct obd_export *exp, int easize,
int def_easize, int cookiesize)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- int i, rc = 0, change = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int i;
+ int rc = 0;
+ int change = 0;
ENTRY;
if (lmv->max_easize < easize) {
@@ -363,28 +376,20 @@ static int lmv_init_ea_size(struct obd_export *exp, int easize,
int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_uuid *cluuid = &lmv->cluuid;
- struct obd_connect_data *mdc_data = NULL;
- struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
- struct lustre_handle conn = {0, };
- struct obd_device *mdc_obd;
- struct obd_export *mdc_exp;
- struct lu_fld_target target;
- int rc;
#ifdef __KERNEL__
- struct proc_dir_entry *lmv_proc_dir;
+ struct proc_dir_entry *lmv_proc_dir;
#endif
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct obd_uuid *cluuid = &lmv->cluuid;
+ struct obd_connect_data *mdc_data = NULL;
+ struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
+ struct lustre_handle conn = {0, };
+ struct obd_device *mdc_obd;
+ struct obd_export *mdc_exp;
+ struct lu_fld_target target;
+ int rc;
ENTRY;
- /* for MDS: don't connect to yourself */
- if (obd_uuid_equals(&tgt->ltd_uuid, cluuid)) {
- CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
- /* XXX - the old code didn't increment active tgt count.
- * should we ? */
- RETURN(0);
- }
-
mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
&obd->obd_uuid);
if (!mdc_obd) {
@@ -411,12 +416,13 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
mdc_exp = class_conn2export(&conn);
- /* Init fid sequence client for this mdc. */
+ /*
+ * Init fid sequence client for this mdc and add new fld target.
+ */
rc = obd_fid_init(mdc_exp);
if (rc)
RETURN(rc);
- /* Add new FLD target. */
target.ft_srv = NULL;
target.ft_exp = mdc_exp;
target.ft_idx = tgt->ltd_idx;
@@ -434,7 +440,9 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
}
if (obd->obd_observer) {
- /* tell the mds_lmv about the new target */
+ /*
+ * Tell the observer about the new target.
+ */
rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
if (rc) {
@@ -447,13 +455,15 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
tgt->ltd_exp = mdc_exp;
lmv->desc.ld_active_tgt_count++;
- /* copy connect data, it may be used later */
+ /*
+ * Copy connect data, it may be used later.
+ */
lmv->datas[tgt->ltd_idx] = *mdc_data;
md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
lmv->max_def_easize, lmv->max_cookiesize);
- CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
+ CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
atomic_read(&obd->obd_refcount));
@@ -472,7 +482,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
mdc_symlink = proc_symlink(mdc_obd->obd_name,
lmv_proc_dir, name);
if (mdc_symlink == NULL) {
- CERROR("could not register LMV target "
+ CERROR("Could not register LMV target "
"/proc/fs/lustre/%s/%s/target_obds/%s.",
obd->obd_type->typ_name, obd->obd_name,
mdc_obd->obd_name);
@@ -486,18 +496,18 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
{
- struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt;
- int rc = 0;
+ int rc = 0;
ENTRY;
- CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
+ CDEBUG(D_CONFIG, "Target uuid: %s.\n", tgt_uuid->uuid);
lmv_init_lock(lmv);
if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
lmv_init_unlock(lmv);
- CERROR("can't add %s, LMV module compiled for %d MDCs. "
+ CERROR("Can't add %s, LMV module compiled for %d MDCs. "
"That many MDCs already configured.\n",
tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
RETURN(-EINVAL);
@@ -543,12 +553,13 @@ int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
RETURN(rc);
}
-/* performs a check if passed obd is connected. If no - connect it. */
int lmv_check_connect(struct obd_device *obd)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- int i, rc, easize;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int i;
+ int rc;
+ int easize;
ENTRY;
if (lmv->connected)
@@ -565,7 +576,7 @@ int lmv_check_connect(struct obd_device *obd)
RETURN(-EINVAL);
}
- CDEBUG(D_CONFIG, "time to connect %s to %s\n",
+ CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
lmv->cluuid.uuid, obd->obd_name);
LASSERT(lmv->tgts != NULL);
@@ -593,7 +604,7 @@ int lmv_check_connect(struct obd_device *obd)
--lmv->desc.ld_active_tgt_count;
rc2 = obd_disconnect(tgt->ltd_exp);
if (rc2) {
- CERROR("error: LMV target %s disconnect on "
+ CERROR("LMV target %s disconnect on "
"MDC idx %d: error %d\n",
tgt->ltd_uuid.uuid, i, rc2);
}
@@ -607,11 +618,11 @@ int lmv_check_connect(struct obd_device *obd)
static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
{
#ifdef __KERNEL__
- struct proc_dir_entry *lmv_proc_dir;
+ struct proc_dir_entry *lmv_proc_dir;
#endif
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_device *mdc_obd;
- int rc;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct obd_device *mdc_obd;
+ int rc;
ENTRY;
LASSERT(tgt != NULL);
@@ -641,7 +652,7 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
if (rc)
CERROR("Can't finanize fids factory\n");
- CDEBUG(D_OTHER, "Disconnected from %s(%s) successfully\n",
+ CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
tgt->ltd_exp->exp_obd->obd_name,
tgt->ltd_exp->exp_obd->obd_uuid.uuid);
@@ -661,18 +672,21 @@ static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
static int lmv_disconnect(struct obd_export *exp)
{
- struct obd_device *obd = class_exp2obd(exp);
+ struct obd_device *obd = class_exp2obd(exp);
#ifdef __KERNEL__
struct proc_dir_entry *lmv_proc_dir;
#endif
- struct lmv_obd *lmv = &obd->u.lmv;
- int rc, i;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int rc;
+ int i;
ENTRY;
if (!lmv->tgts)
goto out_local;
- /* Only disconnect the underlying layers on the final disconnect. */
+ /*
+ * Only disconnect the underlying layers on the final disconnect.
+ */
lmv->refcount--;
if (lmv->refcount != 0)
goto out_local;
@@ -709,9 +723,11 @@ out_local:
static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
int len, void *karg, void *uarg)
{
- struct obd_device *obddev = class_exp2obd(exp);
- struct lmv_obd *lmv = &obddev->u.lmv;
- int i, rc = 0, set = 0;
+ struct obd_device *obddev = class_exp2obd(exp);
+ struct lmv_obd *lmv = &obddev->u.lmv;
+ int i;
+ int rc = 0;
+ int set = 0;
ENTRY;
if (lmv->desc.ld_tgt_count == 0)
@@ -737,14 +753,12 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
if (!mdc_obd)
RETURN(-EINVAL);
- /* got statfs data */
rc = obd_statfs(mdc_obd, &stat_buf,
cfs_time_current_64() - HZ, 0);
if (rc)
RETURN(rc);
if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
RETURN(rc);
- /* copy UUID */
rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
data->ioc_plen2);
break;
@@ -777,11 +791,6 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
RETURN(rc);
}
-enum MDS_POLICY {
- CHAR_TYPE,
- NID_TYPE
-};
-
static int lmv_all_chars_policy(int count, const char *name,
int len)
{
@@ -795,51 +804,60 @@ static int lmv_all_chars_policy(int count, const char *name,
static int lmv_nid_policy(struct lmv_obd *lmv)
{
- struct obd_import *imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
- __u32 id;
+ struct obd_import *imp;
+ __u32 id;
+
/*
- * XXX Hack: to get nid we assume that underlying obd device is mdc.
+ * XXX: To get nid we assume that underlying obd device is mdc.
*/
+ imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
return id % lmv->desc.ld_tgt_count;
}
static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
- int type)
+ placement_policy_t placement)
{
- switch (type) {
- case CHAR_TYPE:
+ switch (placement) {
+ case PLACEMENT_CHAR_POLICY:
return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
op_data->op_name,
op_data->op_namelen);
- case NID_TYPE:
+ case PLACEMENT_NID_POLICY:
return lmv_nid_policy(lmv);
default:
break;
}
- CERROR("unsupport type %d \n", type);
+ CERROR("Unsupported placement policy %x\n", placement);
return -EINVAL;
}
-/* This is _inode_ placement policy function (not name). */
+/**
+ * This is _inode_ placement policy function (not name).
+ */
static int lmv_placement_policy(struct obd_device *obd,
struct md_op_data *op_data,
mdsno_t *mds)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_obj *obj;
- int rc;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_object *obj;
+ int rc;
ENTRY;
LASSERT(mds != NULL);
+ if (lmv->desc.ld_tgt_count == 1) {
+ *mds = 0;
+ RETURN(0);
+ }
+
/*
* Allocate new fid on target according to operation type and parent
* home mds.
*/
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
+ obj = lmv_object_find(obd, &op_data->op_fid1);
if (obj != NULL || op_data->op_name == NULL ||
op_data->op_opc != LUSTRE_OPC_MKDIR) {
/*
@@ -847,7 +865,7 @@ static int lmv_placement_policy(struct obd_device *obd,
* dir is split.
*/
if (obj) {
- lmv_obj_put(obj);
+ lmv_object_put(obj);
/*
* If we have this flag turned on, and we see that
@@ -867,71 +885,78 @@ static int lmv_placement_policy(struct obd_device *obd,
*/
*mds = op_data->op_mds;
rc = 0;
-
-#if 0
- /* XXX: This should be removed later wehn we sure it is not
- * needed. */
- rc = lmv_fld_lookup(lmv, &op_data->op_fid1, mds);
- if (rc)
- GOTO(out, rc);
-#endif
} else {
/*
* Parent directory is not split and we want to create a
* directory in it. Let's calculate where to place it according
- * to name.
+ * to operation data @op_data.
*/
- *mds = lmv_choose_mds(lmv, op_data, NID_TYPE);
+ *mds = lmv_choose_mds(lmv, op_data, lmv->lmv_placement);
rc = 0;
}
- EXIT;
-#if 0
-out:
-#endif
+
if (rc) {
CERROR("Can't choose MDS, err = %d\n", rc);
} else {
LASSERT(*mds < lmv->desc.ld_tgt_count);
}
- return rc;
+ RETURN(rc);
}
int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
mdsno_t mds)
{
- struct lmv_tgt_desc *tgt = &lmv->tgts[mds];
- int rc;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
- /* New seq alloc and FLD setup should be atomic. */
+ tgt = lmv_get_target(lmv, mds);
+
+ /*
+ * New seq alloc and FLD setup should be atomic. Otherwise we may find
+ * on server that seq in new allocated fid is not yet known.
+ */
down(&tgt->ltd_fid_sem);
- /* Asking underlaying tgt layer to allocate new fid. */
+ if (!tgt->ltd_active)
+ GOTO(out, rc = -ENODEV);
+
+ /*
+ * Asking underlaying tgt layer to allocate new fid.
+ */
rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
if (rc > 0) {
LASSERT(fid_is_sane(fid));
- /* Client switches to new sequence, setup FLD. */
+ /*
+ * Client switches to new sequence, setup FLD.
+ */
rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
mds, NULL);
if (rc) {
+ /*
+ * Delete just allocated fid sequence in case
+ * of fail back.
+ */
CERROR("Can't create fld entry, rc %d\n", rc);
- /* Delete just allocated fid sequence */
obd_fid_delete(tgt->ltd_exp, NULL);
}
}
+
+ EXIT;
+out:
up(&tgt->ltd_fid_sem);
- RETURN(rc);
+ return rc;
}
int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
struct md_op_data *op_data)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct lmv_obd *lmv = &obd->u.lmv;
- mdsno_t mds;
- int rc;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lmv_obd *lmv = &obd->u.lmv;
+ mdsno_t mds;
+ int rc;
ENTRY;
LASSERT(op_data != NULL);
@@ -956,10 +981,9 @@ int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
{
ENTRY;
-
- LASSERT(exp && fid);
- if (lmv_obj_delete(exp, fid)) {
- CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n",
+ LASSERT(exp != NULL && fid != NULL);
+ if (lmv_object_delete(exp, fid)) {
+ CDEBUG(D_INODE, "Object "DFID" is destroyed.\n",
PFID(fid));
}
RETURN(0);
@@ -967,10 +991,11 @@ static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lprocfs_static_vars lvars;
- struct lmv_desc *desc;
- int rc, i = 0;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lprocfs_static_vars lvars;
+ struct lmv_desc *desc;
+ int rc;
+ int i = 0;
ENTRY;
if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
@@ -980,7 +1005,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
- CERROR("descriptor size wrong: %d > %d\n",
+ CERROR("Lmv descriptor size wrong: %d > %d\n",
(int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
RETURN(-EINVAL);
}
@@ -1008,14 +1033,14 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
lmv->max_cookiesize = 0;
lmv->max_def_easize = 0;
lmv->max_easize = 0;
+ lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
spin_lock_init(&lmv->lmv_lock);
sema_init(&lmv->init_sem, 1);
- rc = lmv_obj_setup(obd);
+ rc = lmv_object_setup(obd);
if (rc) {
- CERROR("Can't setup LMV object manager, "
- "error %d.\n", rc);
+ CERROR("Can't setup LMV object manager, error %d.\n", rc);
GOTO(out_free_datas, rc);
}
@@ -1026,14 +1051,13 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd_status",
0444, &lmv_proc_target_fops, obd);
if (rc)
- CWARN("Error adding the target_obd_status file\n");
+ CWARN("Error adding target_obd_stats file (%d)\n", rc);
}
#endif
rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
LUSTRE_CLI_FLD_HASH_DHT);
if (rc) {
- CERROR("can't init FLD, err %d\n",
- rc);
+ CERROR("Can't init FLD, err %d\n", rc);
GOTO(out_free_datas, rc);
}
@@ -1050,12 +1074,12 @@ out_free_tgts:
static int lmv_cleanup(struct obd_device *obd)
{
- struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_obd *lmv = &obd->u.lmv;
ENTRY;
fld_client_fini(&lmv->lmv_fld);
lprocfs_obd_cleanup(obd);
- lmv_obj_cleanup(obd);
+ lmv_object_cleanup(obd);
OBD_FREE(lmv->datas, lmv->datas_size);
OBD_FREE(lmv->tgts, lmv->tgts_size);
@@ -1064,9 +1088,9 @@ static int lmv_cleanup(struct obd_device *obd)
static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
{
- struct lustre_cfg *lcfg = buf;
- struct obd_uuid tgt_uuid;
- int rc;
+ struct lustre_cfg *lcfg = buf;
+ struct obd_uuid tgt_uuid;
+ int rc;
ENTRY;
switch(lcfg->lcfg_command) {
@@ -1089,9 +1113,10 @@ out:
static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
__u64 max_age, __u32 flags)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_statfs *temp;
- int rc = 0, i;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct obd_statfs *temp;
+ int rc = 0;
+ int i;
ENTRY;
rc = lmv_check_connect(obd);
@@ -1134,9 +1159,9 @@ static int lmv_getstatus(struct obd_export *exp,
struct lu_fid *fid,
struct obd_capa **pc)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
@@ -1144,7 +1169,6 @@ static int lmv_getstatus(struct obd_export *exp,
RETURN(rc);
rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
-
RETURN(rc);
}
@@ -1153,22 +1177,22 @@ static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
const char *input, int input_size, int output_size,
int flags, struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_getxattr(tgt_exp, fid, oc, valid, name, input, input_size,
- output_size, flags, request);
+ rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
+ input_size, output_size, flags, request);
RETURN(rc);
}
@@ -1179,22 +1203,22 @@ static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
int flags, __u32 suppgid,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_setxattr(tgt_exp, fid, oc, valid, name,
- input, input_size, output_size, flags, suppgid,
+ rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
+ input_size, output_size, flags, suppgid,
request);
RETURN(rc);
@@ -1204,28 +1228,29 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
struct obd_capa *oc, obd_valid valid, int ea_size,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- struct lmv_obj *obj;
- int rc, i;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ int rc;
+ int i;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_getattr(tgt_exp, fid, oc, valid, ea_size, request);
+ rc = md_getattr(tgt->ltd_exp, fid, oc, valid, ea_size, request);
if (rc)
RETURN(rc);
- obj = lmv_obj_grab(obd, fid);
+ obj = lmv_object_find_lock(obd, fid);
- CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n", PFID(fid),
+ CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(fid),
obj ? "(split)" : "");
/*
@@ -1239,7 +1264,7 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
struct mdt_body *body;
if (*request == NULL) {
- lmv_obj_put(obj);
+ lmv_object_put(obj);
RETURN(rc);
}
@@ -1247,8 +1272,6 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
&RMF_MDT_BODY);
LASSERT(body != NULL);
- lmv_obj_lock(obj);
-
for (i = 0; i < obj->lo_objcount; i++) {
if (lmv->tgts[i].ltd_exp == NULL) {
CWARN("%s: NULL export for %d\n",
@@ -1256,15 +1279,16 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
continue;
}
- /* skip master obj. */
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
+ /*
+ * Skip master object.
+ */
+ if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid))
continue;
- lmv_update_body(body, &obj->lo_inodes[i]);
+ body->size += obj->lo_stripes[i].ls_size;
}
- lmv_obj_unlock(obj);
- lmv_obj_put(obj);
+ lmv_object_put_unlock(obj);
}
RETURN(rc);
@@ -1273,16 +1297,17 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
ldlm_iterator_t it, void *data)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- int i, rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int i;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
+ CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
/*
* With CMD every object can have two locks in different namespaces:
@@ -1295,44 +1320,43 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
RETURN(0);
}
-static int lmv_close(struct obd_export *exp,
- struct md_op_data *op_data,
- struct md_open_data *mod,
- struct ptlrpc_request **request)
+static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
+ struct md_open_data *mod, struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
- rc = md_close(tgt_exp, op_data, mod, request);
+ CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
+ rc = md_close(tgt->ltd_exp, op_data, mod, request);
RETURN(rc);
}
-/*
+/**
* Called in the case MDS returns -ERESTART on create on open, what means that
* directory is split and its LMV presentation object has to be updated.
*/
int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req = NULL;
- struct obd_export *tgt_exp;
- struct lmv_obj *obj;
- struct lustre_md md;
- int mealen, rc;
- __u64 valid;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct ptlrpc_request *req = NULL;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ struct lustre_md md;
+ int mealen;
+ int rc;
+ __u64 valid;
ENTRY;
md.mea = NULL;
@@ -1340,35 +1364,35 @@ int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- /* time to update mea of parent fid */
- rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req);
+ /*
+ * Time to update mea of parent fid.
+ */
+ rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req);
if (rc) {
CERROR("md_getattr() failed, error %d\n", rc);
GOTO(cleanup, rc);
}
- rc = md_get_lustre_md(tgt_exp, req, NULL, exp, &md);
+ rc = md_get_lustre_md(tgt->ltd_exp, req, NULL, exp, &md);
if (rc) {
- CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
+ CERROR("md_get_lustre_md() failed, error %d\n", rc);
GOTO(cleanup, rc);
}
if (md.mea == NULL)
GOTO(cleanup, rc = -ENODATA);
- obj = lmv_obj_create(exp, fid, md.mea);
+ obj = lmv_object_create(exp, fid, md.mea);
if (IS_ERR(obj))
rc = PTR_ERR(obj);
else
- lmv_obj_put(obj);
+ lmv_object_put(obj);
- /* XXX LOV STACKING */
obd_free_memmd(exp, (void *)&md.mea);
-
EXIT;
cleanup:
if (req)
@@ -1381,11 +1405,13 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
__u32 gid, cfs_cap_t cap_effective, __u64 rdev,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- struct lmv_obj *obj;
- int rc, loop = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ int rc;
+ int loop = 0;
+ int sidx;
ENTRY;
rc = lmv_check_connect(obd);
@@ -1397,28 +1423,24 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
repeat:
++loop;
LASSERT(loop <= 2);
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
- if (obj) {
- int mea_idx;
- mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+ obj = lmv_object_find(obd, &op_data->op_fid1);
+ if (obj) {
+ sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
op_data->op_name, op_data->op_namelen);
- op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
+ op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
op_data->op_bias &= ~MDS_CHECK_SPLIT;
- op_data->op_mds = obj->lo_inodes[mea_idx].li_mds;
- tgt_exp = lmv_get_export(lmv, op_data->op_mds);
- lmv_obj_put(obj);
+ op_data->op_mds = obj->lo_stripes[sidx].ls_mds;
+ tgt = lmv_get_target(lmv, op_data->op_mds);
+ lmv_object_put(obj);
} else {
- struct lmv_tgt_desc *tgt;
-
tgt = lmv_find_target(lmv, &op_data->op_fid1);
op_data->op_bias |= MDS_CHECK_SPLIT;
op_data->op_mds = tgt->ltd_idx;
- tgt_exp = tgt->ltd_exp;
}
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
if (rc == -ERESTART)
@@ -1426,16 +1448,17 @@ repeat:
else if (rc)
RETURN(rc);
- CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen,
- op_data->op_name, PFID(&op_data->op_fid1));
+ CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n",
+ op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+ op_data->op_mds);
op_data->op_flags |= MF_MDC_CANCEL_FID1;
- rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
+ rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
cap_effective, rdev, request);
if (rc == 0) {
if (*request == NULL)
RETURN(rc);
- CDEBUG(D_OTHER, "created - "DFID"\n", PFID(&op_data->op_fid1));
+ CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
} else if (rc == -ERESTART) {
LASSERT(*request != NULL);
DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -1449,8 +1472,8 @@ repeat:
*/
rc = lmv_handle_split(exp, &op_data->op_fid1);
if (rc == 0) {
- rc = lmv_alloc_slave_fids(obd, &op_data->op_fid1,
- op_data, &op_data->op_fid2);
+ rc = lmv_allocate_slaves(obd, &op_data->op_fid1,
+ op_data, &op_data->op_fid2);
if (rc)
RETURN(rc);
goto repeat;
@@ -1463,21 +1486,21 @@ static int lmv_done_writing(struct obd_export *exp,
struct md_op_data *op_data,
struct md_open_data *mod)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_done_writing(tgt_exp, op_data, mod);
+ rc = md_done_writing(tgt->ltd_exp, op_data, mod);
RETURN(rc);
}
@@ -1486,12 +1509,13 @@ lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lookup_intent *it, struct md_op_data *op_data,
struct lustre_handle *lockh, void *lmm, int lmmsize)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_stripe_md *mea = op_data->op_mea1;
- struct md_op_data *op_data2;
- struct obd_export *tgt_exp;
- int i, rc = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *mea = op_data->op_mea1;
+ struct md_op_data *op_data2;
+ struct lmv_tgt_desc *tgt;
+ int i;
+ int rc = 0;
ENTRY;
OBD_ALLOC_PTR(op_data2);
@@ -1504,17 +1528,17 @@ lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
op_data2->op_fid1 = mea->mea_ids[i];
op_data2->op_bias = 0;
- tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
- if (IS_ERR(tgt_exp))
- GOTO(cleanup, rc = PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &op_data2->op_fid1);
+ if (IS_ERR(tgt))
+ GOTO(cleanup, rc = PTR_ERR(tgt));
- if (tgt_exp == NULL)
+ if (tgt->ltd_exp == NULL)
continue;
- rc = md_enqueue(tgt_exp, einfo, it, op_data2,
+ rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data2,
lockh + i, lmm, lmmsize, NULL, 0);
- CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
+ CDEBUG(D_INODE, "Take lock on slave "DFID" -> %d/%d\n",
PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
if (rc)
@@ -1535,7 +1559,9 @@ cleanup:
OBD_FREE_PTR(op_data2);
if (rc != 0) {
- /* drop all taken locks */
+ /*
+ * Drop all taken locks.
+ */
while (--i >= 0) {
if (lockh[i].cookie)
ldlm_lock_decref(lockh + i, einfo->ei_mode);
@@ -1551,15 +1577,16 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh, void *lmm, int lmmsize,
int extra_lock_flags)
{
- struct ptlrpc_request *req = it->d.lustre.it_data;
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lustre_handle plock;
- struct obd_export *tgt_exp;
- struct md_op_data *rdata;
- struct lu_fid fid_copy;
- struct mdt_body *body;
- int rc = 0, pmode;
+ struct ptlrpc_request *req = it->d.lustre.it_data;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lustre_handle plock;
+ struct lmv_tgt_desc *tgt;
+ struct md_op_data *rdata;
+ struct lu_fid fid1;
+ struct mdt_body *body;
+ int rc = 0;
+ int pmode;
ENTRY;
body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
@@ -1568,32 +1595,34 @@ lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
if (!(body->valid & OBD_MD_MDS))
RETURN(0);
- CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
+ CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
- /* We got LOOKUP lock, but we really need attrs */
+ /*
+ * We got LOOKUP lock, but we really need attrs.
+ */
pmode = it->d.lustre.it_lock_mode;
LASSERT(pmode != 0);
memcpy(&plock, lockh, sizeof(plock));
it->d.lustre.it_lock_mode = 0;
it->d.lustre.it_data = NULL;
- fid_copy = body->fid1;
+ fid1 = body->fid1;
it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
ptlrpc_req_finished(req);
- tgt_exp = lmv_find_export(lmv, &fid_copy);
- if (IS_ERR(tgt_exp))
- GOTO(out, rc = PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &fid1);
+ if (IS_ERR(tgt))
+ GOTO(out, rc = PTR_ERR(tgt));
OBD_ALLOC_PTR(rdata);
if (rdata == NULL)
GOTO(out, rc = -ENOMEM);
- rdata->op_fid1 = fid_copy;
+ rdata->op_fid1 = fid1;
rdata->op_bias = MDS_CROSS_REF;
- rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh,
+ rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
lmm, lmmsize, NULL, extra_lock_flags);
OBD_FREE_PTR(rdata);
EXIT;
@@ -1608,70 +1637,73 @@ lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh, void *lmm, int lmmsize,
struct ptlrpc_request **req, int extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp = NULL;
- struct lmv_obj *obj;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ int sidx;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
+ CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
+ LL_IT2STR(it), PFID(&op_data->op_fid1));
+
if (op_data->op_mea1 && it && it->it_op == IT_UNLINK) {
rc = lmv_enqueue_slaves(exp, einfo, it, op_data,
lockh, lmm, lmmsize);
RETURN(rc);
}
- if (op_data->op_namelen) {
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
- if (obj) {
- int mea_idx;
-
- /* directory is split. look for right mds for this
- * name */
- mea_idx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
- tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
- lmv_obj_put(obj);
- }
+ obj = lmv_object_find(obd, &op_data->op_fid1);
+ if (obj && op_data->op_namelen) {
+ sidx = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ (char *)op_data->op_name,
+ op_data->op_namelen);
+ op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
+ tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
+ } else {
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
}
+ if (obj)
+ lmv_object_put(obj);
- if (tgt_exp == NULL)
- tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
-
- CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
- PFID(&op_data->op_fid1));
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh,
+ CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
+ LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
+
+ rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
lmm, lmmsize, req, extra_lock_flags);
- if (rc == 0 && it && it->it_op == IT_OPEN)
+ if (rc == 0 && it && it->it_op == IT_OPEN) {
rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
lmm, lmmsize, extra_lock_flags);
+ }
RETURN(rc);
}
static int
lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
- struct obd_capa *oc, const char *filename, int namelen,
+ struct obd_capa *oc, const char *name, int namelen,
obd_valid valid, int ea_size, __u32 suppgid,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lu_fid rid = *fid;
- struct obd_export *tgt_exp;
- struct mdt_body *body;
- struct lmv_obj *obj;
- int rc, loop = 0;
+ struct ptlrpc_request *req = NULL;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lu_fid rid = *fid;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ struct lmv_object *obj;
+ int rc;
+ int loop = 0;
+ int sidx;
ENTRY;
rc = lmv_check_connect(obd);
@@ -1681,28 +1713,25 @@ lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
repeat:
++loop;
LASSERT(loop <= 2);
- obj = lmv_obj_grab(obd, &rid);
+ obj = lmv_object_find(obd, &rid);
if (obj) {
- int mea_idx;
-
- /* Directory is split. Look for right mds for this name */
- mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- filename, namelen - 1);
- rid = obj->lo_inodes[mea_idx].li_fid;
- tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
- lmv_obj_put(obj);
+ sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+ name, namelen - 1);
+ rid = obj->lo_stripes[sidx].ls_fid;
+ tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
valid &= ~OBD_MD_FLCKSPLIT;
+ lmv_object_put(obj);
} else {
- tgt_exp = lmv_find_export(lmv, &rid);
+ tgt = lmv_find_target(lmv, &rid);
valid |= OBD_MD_FLCKSPLIT;
}
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n",
- namelen, filename, PFID(fid), PFID(&rid));
+ CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n",
+ namelen, name, PFID(fid), PFID(&rid), tgt->ltd_idx);
- rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid,
+ rc = md_getattr_name(tgt->ltd_exp, &rid, oc, name, namelen, valid,
ea_size, suppgid, request);
if (rc == 0) {
body = req_capsule_server_get(&(*request)->rq_pill,
@@ -1710,20 +1739,18 @@ repeat:
LASSERT(body != NULL);
if (body->valid & OBD_MD_MDS) {
- struct ptlrpc_request *req = NULL;
-
rid = body->fid1;
- CDEBUG(D_OTHER, "request attrs for "DFID"\n",
+ CDEBUG(D_INODE, "Request attrs for "DFID"\n",
PFID(&rid));
- tgt_exp = lmv_find_export(lmv, &rid);
- if (IS_ERR(tgt_exp)) {
+ tgt = lmv_find_target(lmv, &rid);
+ if (IS_ERR(tgt)) {
ptlrpc_req_finished(*request);
- RETURN(PTR_ERR(tgt_exp));
+ RETURN(PTR_ERR(tgt));
}
- rc = md_getattr_name(tgt_exp, &rid, NULL, NULL, 1,
- valid | OBD_MD_FLCROSSREF,
+ rc = md_getattr_name(tgt->ltd_exp, &rid, NULL, NULL,
+ 1, valid | OBD_MD_FLCROSSREF,
ea_size, suppgid, &req);
ptlrpc_req_finished(*request);
*request = req;
@@ -1753,90 +1780,100 @@ repeat:
fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
NULL)
-/* @tgt_exp is the export the metadata request is sent.
- * @fid_exp is the export the cancel should be sent for the current fid.
- * if @fid_exp is NULL, the export is found for the current fid.
- * @op_data keeps the current fid, which is pointed through @flag.
- * @mode, @bits -- lock match parameters. */
-static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp,
- struct obd_export *fid_exp,
- struct md_op_data *op_data,
- ldlm_mode_t mode, int bits, int flag)
+static int lmv_early_cancel_slaves(struct obd_export *exp,
+ struct md_op_data *op_data, int op_tgt,
+ ldlm_mode_t mode, int bits, int flag)
{
- struct lu_fid *fid = md_op_data_fid(op_data, flag);
- ldlm_policy_data_t policy = {{0}};
- int rc = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ ldlm_policy_data_t policy = {{0}};
+ struct lu_fid *op_fid;
+ struct lu_fid *st_fid;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ int rc = 0;
+ int i;
ENTRY;
- if (!fid_is_sane(fid))
+ op_fid = md_op_data_fid(op_data, flag);
+ if (!fid_is_sane(op_fid))
RETURN(0);
- if (fid_exp == NULL)
- fid_exp = lmv_find_export(lmv, fid);
-
- if (tgt_exp == fid_exp) {
- /* The export is the same as on the target server, cancel
- * will be sent along with the main metadata operation. */
- op_data->op_flags |= flag;
- RETURN(0);
- }
-
+ obj = lmv_object_find(obd, op_fid);
+ if (obj == NULL)
+ RETURN(-EALREADY);
+
policy.l_inodebits.bits = bits;
- rc = md_cancel_unused(fid_exp, fid, &policy, mode, LDLM_FL_ASYNC, NULL);
- RETURN(rc);
+ for (i = 0; i < obj->lo_objcount; i++) {
+ tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
+ st_fid = &obj->lo_stripes[i].ls_fid;
+ if (op_tgt != tgt->ltd_idx) {
+ CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n",
+ PFID(st_fid), tgt->ltd_idx);
+ rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy,
+ mode, LDLM_FL_ASYNC, NULL);
+ if (rc)
+ GOTO(out_put_obj, rc);
+ } else {
+ CDEBUG(D_INODE,
+ "EARLY_CANCEL skip operation target %d on "DFID"\n",
+ op_tgt, PFID(st_fid));
+ /*
+ * Do not cancel locks for operation target, they will
+ * be handled later in underlaying layer when calling
+ * function we run on behalf of.
+ */
+ *op_fid = *st_fid;
+ op_data->op_flags |= flag;
+ }
+ }
+ EXIT;
+out_put_obj:
+ lmv_object_put(obj);
+ return rc;
}
-#ifdef EARLY_CANCEL_FOR_STRIPED_DIR_IS_READY
-/* Check if the fid in @op_data pointed to by flag is of the same export(s)
- * as @tgt_exp. Early cancels will be sent later by mdc code, otherwise, call
- * md_cancel_unused for child export(s). */
-static int lmv_early_cancel_stripes(struct obd_export *exp,
- struct obd_export *tgt_exp,
- struct md_op_data *op_data,
- ldlm_mode_t mode, int bits, int flag)
+static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
+ int op_tgt, ldlm_mode_t mode, int bits, int flag)
{
- struct lu_fid *fid = md_op_data_fid(op_data, flag);
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *st_exp;
- struct lmv_obj *obj;
- int rc = 0;
+ struct lu_fid *fid = md_op_data_fid(op_data, flag);
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ ldlm_policy_data_t policy = {{0}};
+ struct lmv_object *obj;
+ int rc = 0;
ENTRY;
if (!fid_is_sane(fid))
RETURN(0);
- obj = lmv_obj_grab(obd, fid);
+ obj = lmv_object_find(obd, fid);
if (obj) {
- ldlm_policy_data_t policy = {{0}};
- struct lu_fid *st_fid;
- int i;
-
- policy.l_inodebits.bits = bits;
- for (i = 0; i < obj->lo_objcount; i++) {
- st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
- st_fid = &obj->lo_inodes[i].li_fid;
- if (tgt_exp != st_exp) {
- rc = md_cancel_unused(st_exp, st_fid, &policy,
- mode, LDLM_FL_ASYNC,
- NULL);
- if (rc)
- break;
- } else {
- /* Some export matches to @tgt_exp, do cancel
- * for its fid in mdc */
- *fid = *st_fid;
- op_data->op_flags |= flag;
- }
- }
- lmv_obj_put(obj);
+ rc = lmv_early_cancel_slaves(exp, op_data, op_tgt, mode,
+ bits, flag);
+ lmv_object_put(obj);
} else {
- rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data,
- mode, bits, flag);
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ if (tgt->ltd_idx != op_tgt) {
+ CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
+ policy.l_inodebits.bits = bits;
+ rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
+ mode, LDLM_FL_ASYNC, NULL);
+ } else {
+ CDEBUG(D_INODE,
+ "EARLY_CANCEL skip operation target %d on "DFID"\n",
+ op_tgt, PFID(fid));
+ op_data->op_flags |= flag;
+ rc = 0;
+ }
+
}
RETURN(rc);
}
-#endif
/*
* llite passes fid of an target inode in op_data->op_fid1 and id of directory in
@@ -1845,12 +1882,14 @@ static int lmv_early_cancel_stripes(struct obd_export *exp,
static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- struct lmv_obj *obj;
- int rc, loop = 0;
- mdsno_t mds;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ int rc;
+ int loop = 0;
+ mdsno_t mds;
+ int sidx;
ENTRY;
rc = lmv_check_connect(obd);
@@ -1861,25 +1900,22 @@ repeat:
++loop;
LASSERT(loop <= 2);
if (op_data->op_namelen != 0) {
- int mea_idx;
-
- /* Usual link request */
- obj = lmv_obj_grab(obd, &op_data->op_fid2);
+ obj = lmv_object_find(obd, &op_data->op_fid2);
if (obj) {
- mea_idx = raw_name2idx(obj->lo_hashtype,
+ sidx = raw_name2idx(obj->lo_hashtype,
obj->lo_objcount,
op_data->op_name,
op_data->op_namelen);
- op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
- mds = obj->lo_inodes[mea_idx].li_mds;
- lmv_obj_put(obj);
+ op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
+ mds = obj->lo_stripes[sidx].ls_mds;
+ lmv_object_put(obj);
} else {
rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
if (rc)
RETURN(rc);
}
- CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
+ CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
PFID(&op_data->op_fid2), op_data->op_namelen,
op_data->op_name, PFID(&op_data->op_fid1));
} else {
@@ -1887,27 +1923,31 @@ repeat:
if (rc)
RETURN(rc);
- /* request from MDS to acquire i_links for inode by fid1 */
- CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n",
+ /*
+ * Request from MDS to acquire i_links for inode by fid1.
+ */
+ CDEBUG(D_INODE, "Inc i_nlinks for "DFID"\n",
PFID(&op_data->op_fid1));
}
- CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
+ CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n",
mds, PFID(&op_data->op_fid1));
op_data->op_fsuid = current->fsuid;
op_data->op_fsgid = current->fsgid;
op_data->op_cap = cfs_curproc_cap_pack();
+ tgt = lmv_get_target(lmv, mds);
- tgt_exp = lmv->tgts[mds].ltd_exp;
if (op_data->op_namelen) {
+ /*
+ * Cancel UPDATE lock on child (fid1).
+ */
op_data->op_flags |= MF_MDC_CANCEL_FID2;
- /* Cancel UPDATE lock on child (fid1). */
- rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
+ rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
}
if (rc == 0)
- rc = md_link(tgt_exp, op_data, request);
+ rc = md_link(tgt->ltd_exp, op_data, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -1931,15 +1971,19 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
const char *old, int oldlen, const char *new, int newlen,
struct ptlrpc_request **request)
{
- struct obd_export *tgt_exp = NULL, *src_exp;
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- int rc, mea_idx, loop = 0;
- struct lmv_obj *obj;
- mdsno_t mds1, mds2;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *src_tgt;
+ struct lmv_tgt_desc *tgt_tgt;
+ int rc;
+ int sidx;
+ int loop = 0;
+ struct lmv_object *obj;
+ mdsno_t mds1;
+ mdsno_t mds2;
ENTRY;
- CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
+ CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
oldlen, old, PFID(&op_data->op_fid1),
newlen, new, PFID(&op_data->op_fid2));
@@ -1952,8 +1996,8 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
* MDS with old dir entry is asking another MDS to create name
* there.
*/
- CDEBUG(D_OTHER,
- "create %*s(%d/%d) in "DFID" pointing "
+ CDEBUG(D_INODE,
+ "Create %*s(%d/%d) in "DFID" pointing "
"to "DFID"\n", newlen, new, oldlen, newlen,
PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
@@ -1965,15 +2009,15 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
* Target directory can be split, sowe should forward request to
* the right MDS.
*/
- obj = lmv_obj_grab(obd, &op_data->op_fid2);
+ obj = lmv_object_find(obd, &op_data->op_fid2);
if (obj) {
- mea_idx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- (char *)new, newlen);
- op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
- CDEBUG(D_OTHER, "Parent obj "DFID"\n",
+ sidx = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ (char *)new, newlen);
+ op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
+ CDEBUG(D_INODE, "Parent obj "DFID"\n",
PFID(&op_data->op_fid2));
- lmv_obj_put(obj);
+ lmv_object_put(obj);
}
goto request;
}
@@ -1981,37 +2025,33 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
repeat:
++loop;
LASSERT(loop <= 2);
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
+ obj = lmv_object_find(obd, &op_data->op_fid1);
if (obj) {
- /*
- * directory is already split, so we have to forward request to
- * the right MDS.
- */
- mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)old, oldlen);
- op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
- mds1 = obj->lo_inodes[mea_idx].li_mds;
- CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
- lmv_obj_put(obj);
+ sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+ (char *)old, oldlen);
+ op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
+ mds1 = obj->lo_stripes[sidx].ls_mds;
+ CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
+ lmv_object_put(obj);
} else {
rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1);
if (rc)
RETURN(rc);
}
- obj = lmv_obj_grab(obd, &op_data->op_fid2);
+ obj = lmv_object_find(obd, &op_data->op_fid2);
if (obj) {
/*
* Directory is already split, so we have to forward request to
* the right MDS.
*/
- mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+ sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)new, newlen);
- mds2 = obj->lo_inodes[mea_idx].li_mds;
- op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
- CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
- lmv_obj_put(obj);
+ mds2 = obj->lo_stripes[sidx].ls_mds;
+ op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
+ CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
+ lmv_object_put(obj);
} else {
rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2);
if (rc)
@@ -2023,39 +2063,47 @@ request:
op_data->op_fsgid = current->fsgid;
op_data->op_cap = cfs_curproc_cap_pack();
- src_exp = lmv_get_export(lmv, mds1);
- tgt_exp = lmv_get_export(lmv, mds2);
+ src_tgt = lmv_get_target(lmv, mds1);
+ tgt_tgt = lmv_get_target(lmv, mds2);
if (oldlen) {
- /* LOOKUP lock on src child (fid3) should also be cancelled for
- * src_exp in mdc_rename. */
+ /*
+ * LOOKUP lock on src child (fid3) should also be cancelled for
+ * src_tgt in mdc_rename.
+ */
op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
- /* Cancel UPDATE locks on tgt parent (fid2), tgt_exp is its
- * own export. */
- rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, LCK_EX,
- MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+ /*
+ * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
+ * own target.
+ */
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_UPDATE,
+ MF_MDC_CANCEL_FID2);
- /* Cancel LOOKUP locks on tgt child (fid4) for parent tgt_exp.*/
- if (rc == 0)
- rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data,
+ /*
+ * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
+ */
+ if (rc == 0) {
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
LCK_EX, MDS_INODELOCK_LOOKUP,
MF_MDC_CANCEL_FID4);
+ }
- /* XXX: the case when child is a striped dir is not supported.
- * Only the master stripe has all locks cancelled early. */
- /* Cancel all the locks on tgt child (fid4). */
+ /*
+ * Cancel all the locks on tgt child (fid4).
+ */
if (rc == 0)
- rc = lmv_early_cancel(lmv, src_exp, NULL, op_data,
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
LCK_EX, MDS_INODELOCK_FULL,
MF_MDC_CANCEL_FID4);
}
if (rc == 0)
- rc = md_rename(src_exp, op_data, old, oldlen,
+ rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
new, newlen, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
+ DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
"Got -ERESTART during rename!\n");
ptlrpc_req_finished(*request);
*request = NULL;
@@ -2076,41 +2124,42 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request,
struct md_open_data **mod)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req;
- struct obd_export *tgt_exp;
- struct lmv_obj *obj;
- int rc = 0, i;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct ptlrpc_request *req;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ int rc = 0;
+ int i;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
+ obj = lmv_object_find(obd, &op_data->op_fid1);
- CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n",
+ CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x%s\n",
PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
obj ? ", split" : "");
op_data->op_flags |= MF_MDC_CANCEL_FID1;
if (obj) {
for (i = 0; i < obj->lo_objcount; i++) {
- op_data->op_fid1 = obj->lo_inodes[i].li_fid;
+ op_data->op_fid1 = obj->lo_stripes[i].ls_fid;
- tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
- if (IS_ERR(tgt_exp)) {
- rc = PTR_ERR(tgt_exp);
+ tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
+ if (IS_ERR(tgt)) {
+ rc = PTR_ERR(tgt);
break;
}
- rc = md_setattr(tgt_exp, op_data, ea, ealen,
+ rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen,
ea2, ea2len, &req, mod);
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
+ if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid)) {
/*
- * this is master object and this request should
+ * This is master object and this request should
* be returned back to llite.
*/
*request = req;
@@ -2121,13 +2170,13 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
if (rc)
break;
}
- lmv_obj_put(obj);
+ lmv_object_put(obj);
} else {
- tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2,
+ rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
ea2len, request, mod);
}
RETURN(rc);
@@ -2136,33 +2185,34 @@ static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
struct obd_capa *oc, struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_sync(tgt_exp, fid, oc, request);
+ rc = md_sync(tgt->ltd_exp, fid, oc, request);
RETURN(rc);
}
-/* main purpose of LMV blocking ast is to remove split directory LMV
- * presentation object (struct lmv_obj) attached to the lock being revoked. */
-int lmv_blocking_ast(struct ldlm_lock *lock,
- struct ldlm_lock_desc *desc,
+/**
+ * Main purpose of LMV blocking ast is to remove split directory LMV
+ * presentation object (struct lmv_object) attached to the lock being revoked.
+ */
+int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
void *data, int flag)
{
- struct lustre_handle lockh;
- struct lmv_obj *obj;
- int rc;
+ struct lustre_handle lockh;
+ struct lmv_object *obj;
+ int rc;
ENTRY;
switch (flag) {
@@ -2175,17 +2225,19 @@ int lmv_blocking_ast(struct ldlm_lock *lock,
}
break;
case LDLM_CB_CANCELING:
- /* time to drop cached attrs for dirobj */
+ /*
+ * Time to drop cached attrs for split directory object
+ */
obj = lock->l_ast_data;
if (obj) {
- CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
+ CDEBUG(D_INODE, "Cancel %s on "LPU64"/"LPU64
", master "DFID"\n",
lock->l_resource->lr_name.name[3] == 1 ?
"LOOKUP" : "UPDATE",
lock->l_resource->lr_name.name[0],
lock->l_resource->lr_name.name[1],
PFID(&obj->lo_fid));
- lmv_obj_put(obj);
+ lmv_object_put(obj);
}
break;
default:
@@ -2196,7 +2248,7 @@ int lmv_blocking_ast(struct ldlm_lock *lock,
static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
{
- __u64 val;
+ __u64 val;
val = le64_to_cpu(*hash);
if (val < hash_adj)
@@ -2207,16 +2259,16 @@ static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
{
- __u64 id;
+ __u64 id;
struct obd_import *imp;
/*
- * XXX Hack: to get nid we assume that underlying obd device is mdc.
+ * XXX: to get nid we assume that underlying obd device is mdc.
*/
imp = class_exp2cliimp(exp);
id = imp->imp_connection->c_self + fid_flatten(fid);
- CDEBUG(D_INFO, "node rank: "LPX64" "DFID" "LPX64" "LPX64"\n",
+ CDEBUG(D_INODE, "Readpage node rank: "LPX64" "DFID" "LPX64" "LPX64"\n",
imp->imp_connection->c_self, PFID(fid), id, id ^ (id >> 32));
return id ^ (id >> 32);
@@ -2226,20 +2278,23 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
struct obd_capa *oc, __u64 offset64, struct page *page,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- struct lu_fid rid = *fid;
- struct lmv_obj *obj;
- __u64 offset;
- __u64 hash_adj = 0;
- __u32 rank = 0;
- __u64 seg_size = 0;
- __u64 tgt_tmp = 0;
- int tgt = 0;
- int tgt0 = 0;
- int rc;
- int nr = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lu_fid rid = *fid;
+ struct lmv_object *obj;
+ __u64 offset;
+ __u64 hash_adj = 0;
+ __u32 rank = 0;
+ __u64 seg_size = 0;
+ __u64 tgt_tmp = 0;
+ int tgt_idx = 0;
+ int tgt0_idx = 0;
+ int rc;
+ int nr = 0;
+ struct lmv_stripe *los;
+ struct lmv_tgt_desc *tgt;
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
ENTRY;
offset = offset64;
@@ -2248,10 +2303,7 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
if (rc)
RETURN(rc);
- CDEBUG(D_INFO, "READPAGE at "LPX64" from "DFID"\n", offset, PFID(&rid));
-
- obj = lmv_obj_grab(obd, fid);
- if (obj) {
+ CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", offset, PFID(&rid));
/*
* This case handle directory lookup in clustered metadata case (i.e.
@@ -2273,24 +2325,21 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
* [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
* on hash values that we get.
*/
-
- struct lmv_inode *loi;
-
- lmv_obj_lock(obj);
-
+ obj = lmv_object_find_lock(obd, fid);
+ if (obj) {
nr = obj->lo_objcount;
LASSERT(nr > 0);
seg_size = MAX_HASH_SIZE;
do_div(seg_size, nr);
- loi = obj->lo_inodes;
- rank = lmv_node_rank(lmv_get_export(lmv, loi[0].li_mds),
- fid) % nr;
- tgt_tmp = offset;
+ los = obj->lo_stripes;
+ tgt = lmv_get_target(lmv, los[0].ls_mds);
+ rank = lmv_node_rank(tgt->ltd_exp, fid) % nr;
+ tgt_tmp = offset;
do_div(tgt_tmp, seg_size);
- tgt0 = do_div(tgt_tmp, nr);
- tgt = (tgt0 + rank) % nr;
+ tgt0_idx = do_div(tgt_tmp, nr);
+ tgt_idx = (tgt0_idx + rank) % nr;
- if (tgt < tgt0)
+ if (tgt_idx < tgt0_idx)
/*
* Wrap around.
*
@@ -2303,28 +2352,26 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
hash_adj += rank * seg_size;
- CDEBUG(D_INFO, "hash_adj: %x "LPX64" "LPX64"/%x -> "LPX64"/%x\n",
- rank, hash_adj, offset, tgt0, offset + hash_adj, tgt);
+ CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
+ LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
+ offset, tgt0_idx, offset + hash_adj, tgt_idx);
offset = (offset + hash_adj) & MAX_HASH_SIZE;
- rid = obj->lo_inodes[tgt].li_fid;
- tgt_exp = lmv_get_export(lmv, loi[tgt].li_mds);
+ rid = obj->lo_stripes[tgt_idx].ls_fid;
+ tgt = lmv_get_target(lmv, los[tgt_idx].ls_mds);
- CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
- PFID(&rid), (unsigned long)offset, tgt);
+ CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
+ PFID(&rid), (unsigned long)offset, tgt_idx);
} else
- tgt_exp = lmv_find_export(lmv, &rid);
+ tgt = lmv_find_target(lmv, &rid);
- if (IS_ERR(tgt_exp))
- GOTO(cleanup, rc = PTR_ERR(tgt_exp));
+ if (IS_ERR(tgt))
+ GOTO(cleanup, rc = PTR_ERR(tgt));
- rc = md_readpage(tgt_exp, &rid, oc, offset, page, request);
+ rc = md_readpage(tgt->ltd_exp, &rid, oc, offset, page, request);
if (rc)
GOTO(cleanup, rc);
if (obj) {
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
-
dp = cfs_kmap(page);
lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
@@ -2335,134 +2382,72 @@ static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
ent = lu_dirent_next(ent))
lmv_hash_adjust(&ent->lde_hash, hash_adj);
- if (tgt0 != nr - 1) {
+ if (tgt0_idx != nr - 1) {
__u64 end;
end = le64_to_cpu(dp->ldp_hash_end);
if (end == DIR_END_OFF) {
dp->ldp_hash_end = cpu_to_le32(seg_size *
- (tgt0 + 1));
- CDEBUG(D_INFO,
+ (tgt0_idx + 1));
+ CDEBUG(D_INODE,
""DFID" reset end "LPX64" tgt %d\n",
PFID(&rid),
- le64_to_cpu(dp->ldp_hash_end), tgt);
+ le64_to_cpu(dp->ldp_hash_end), tgt_idx);
}
}
cfs_kunmap(page);
}
- /*
- * Here we could remove "." and ".." from all pages which at not from
- * master. But MDS has only "." and ".." for master dir.
- */
EXIT;
cleanup:
- if (obj) {
- lmv_obj_unlock(obj);
- lmv_obj_put(obj);
- }
- return rc;
-}
-
-static int lmv_unlink_slaves(struct obd_export *exp,
- struct md_op_data *op_data,
- struct ptlrpc_request **req)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_stripe_md *mea = op_data->op_mea1;
- struct md_op_data *op_data2;
- struct obd_export *tgt_exp;
- int i, rc = 0;
- ENTRY;
-
- OBD_ALLOC_PTR(op_data2);
- if (op_data2 == NULL)
- RETURN(-ENOMEM);
-
- op_data2->op_mode = S_IFDIR;
- op_data2->op_fsuid = current->fsuid;
- op_data2->op_fsgid = current->fsgid;
- op_data2->op_bias = 0;
-
- LASSERT(mea != NULL);
- for (i = 0; i < mea->mea_count; i++) {
- memset(op_data2, 0, sizeof(*op_data2));
- op_data2->op_fid1 = mea->mea_ids[i];
- tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
- if (IS_ERR(tgt_exp))
- GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
-
- if (tgt_exp == NULL)
- continue;
-
- rc = md_unlink(tgt_exp, op_data2, req);
-
- CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
- PFID(&mea->mea_ids[i]), rc);
-
- if (*req) {
- ptlrpc_req_finished(*req);
- *req = NULL;
- }
- if (rc)
- GOTO(out_free_op_data2, rc);
- }
-
- EXIT;
-out_free_op_data2:
- OBD_FREE_PTR(op_data2);
+ if (obj)
+ lmv_object_put_unlock(obj);
return rc;
}
static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp = NULL;
- struct lmv_obj *obj;
- int rc, loop = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ struct lmv_object *obj;
+ int rc;
+ int loop = 0;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- if (op_data->op_namelen == 0 && op_data->op_mea1 != NULL) {
- /* mds asks to remove slave objects */
- rc = lmv_unlink_slaves(exp, op_data, request);
- RETURN(rc);
- }
-
repeat:
++loop;
LASSERT(loop <= 2);
if (op_data->op_namelen != 0) {
- int mea_idx;
+ int sidx;
- obj = lmv_obj_grab(obd, &op_data->op_fid1);
+ obj = lmv_object_find(obd, &op_data->op_fid1);
if (obj) {
- mea_idx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- op_data->op_name,
- op_data->op_namelen);
+ sidx = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ op_data->op_name,
+ op_data->op_namelen);
op_data->op_bias &= ~MDS_CHECK_SPLIT;
- op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
- tgt_exp = lmv_get_export(lmv,
- obj->lo_inodes[mea_idx].li_mds);
- lmv_obj_put(obj);
- CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
+ op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
+ tgt = lmv_get_target(lmv,
+ obj->lo_stripes[sidx].ls_mds);
+ lmv_object_put(obj);
+ CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n",
op_data->op_namelen, op_data->op_name,
- PFID(&op_data->op_fid1), mea_idx);
+ PFID(&op_data->op_fid1), sidx);
}
} else {
- CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
+ CDEBUG(D_INODE, "Drop i_nlink on "DFID"\n",
PFID(&op_data->op_fid1));
}
- if (tgt_exp == NULL) {
- tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ if (tgt == NULL) {
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
op_data->op_bias |= MDS_CHECK_SPLIT;
}
@@ -2470,21 +2455,25 @@ repeat:
op_data->op_fsgid = current->fsgid;
op_data->op_cap = cfs_curproc_cap_pack();
- /* If child's fid is given, cancel unused locks for it if it is from
- * another export than parent. */
+ /*
+ * If child's fid is given, cancel unused locks for it if it is from
+ * another export than parent.
+ */
if (op_data->op_namelen) {
- /* LOOKUP lock for child (fid3) should also be cancelled on
- * parent tgt_exp in mdc_unlink(). */
+ /*
+ * LOOKUP lock for child (fid3) should also be cancelled on
+ * parent tgt_tgt in mdc_unlink().
+ */
op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
- /* XXX: the case when child is a striped dir is not supported.
- * Only the master stripe has all locks cancelled early. */
- /* Cancel FULL locks on child (fid3). */
- rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
+ /*
+ * Cancel FULL locks on child (fid3).
+ */
+ rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
}
if (rc == 0)
- rc = md_unlink(tgt_exp, op_data, request);
+ rc = md_unlink(tgt->ltd_exp, op_data, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
@@ -2503,45 +2492,9 @@ repeat:
RETURN(rc);
}
-static int lmv_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
- struct obd_device *tgt, int count,
- struct llog_catid *logid, struct obd_uuid *uuid)
-{
-#if 0
- struct llog_ctxt *ctxt;
- int rc;
- ENTRY;
-
- LASSERT(group == OBD_LLOG_GROUP);
- rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
- &llog_client_ops);
- if (rc == 0) {
- ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_CONFIG_REPL_CTXT);
- llog_initiator_connect(ctxt, tgt);
- llog_ctxt_put(ctxt);
- }
- RETURN(rc);
-#else
- return 0;
-#endif
-}
-
-static int lmv_llog_finish(struct obd_device *obd, int count)
-{
- struct llog_ctxt *ctxt;
- int rc = 0;
- ENTRY;
-
- ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
- if (ctxt)
- rc = llog_cleanup(ctxt);
-
- RETURN(rc);
-}
-
static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
{
- int rc = 0;
+ int rc = 0;
switch (stage) {
case OBD_CLEANUP_EARLY:
@@ -2560,16 +2513,17 @@ static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
}
static int lmv_get_info(struct obd_export *exp, __u32 keylen,
- void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
+ void *key, __u32 *vallen, void *val,
+ struct lov_stripe_md *lsm)
{
- struct obd_device *obd;
- struct lmv_obd *lmv;
- int rc = 0;
+ struct obd_device *obd;
+ struct lmv_obd *lmv;
+ int rc = 0;
ENTRY;
obd = class_exp2obd(exp);
if (obd == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
exp->exp_handle.h_cookie);
RETURN(-EINVAL);
}
@@ -2587,7 +2541,9 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
i++, tgts++) {
- /* all tgts should be connected when this get called. */
+ /*
+ * All tgts should be connected when this gets called.
+ */
if (!tgts || !tgts->ltd_exp) {
CERROR("target not setup?\n");
continue;
@@ -2603,8 +2559,10 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
if (rc)
RETURN(rc);
- /* forwarding this request to first MDS, it should know LOV
- * desc. */
+ /*
+ * Forwarding this request to first MDS, it should know LOV
+ * desc.
+ */
rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
vallen, val, NULL);
if (!rc && KEY_IS(KEY_CONN_DATA)) {
@@ -2614,7 +2572,7 @@ static int lmv_get_info(struct obd_export *exp, __u32 keylen,
RETURN(rc);
}
- CDEBUG(D_IOCTL, "invalid key\n");
+ CDEBUG(D_IOCTL, "Invalid key\n");
RETURN(-EINVAL);
}
@@ -2630,7 +2588,7 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
obd = class_exp2obd(exp);
if (obd == NULL) {
- CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
exp->exp_handle.h_cookie);
RETURN(-EINVAL);
}
@@ -2661,10 +2619,12 @@ int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
struct lov_stripe_md *lsm)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_stripe_md *meap, *lsmp;
- int mea_size, i;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *meap;
+ struct lmv_stripe_md *lsmp;
+ int mea_size;
+ int i;
ENTRY;
mea_size = lmv_get_easize(lmv);
@@ -2708,12 +2668,13 @@ int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
struct lov_mds_md *lmm, int lmm_size)
{
- struct obd_device *obd = class_exp2obd(exp);
- struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
- struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
- struct lmv_obd *lmv = &obd->u.lmv;
- int mea_size, i;
- __u32 magic;
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
+ struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int mea_size;
+ int i;
+ __u32 magic;
ENTRY;
mea_size = lmv_get_easize(lmv);
@@ -2741,7 +2702,10 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
{
magic = le32_to_cpu(mea->mea_magic);
} else {
- /* old mea is not handled here */
+ /*
+ * Old mea is not handled here.
+ */
+ CERROR("Old not supportable EA is found\n");
LBUG();
}
@@ -2756,14 +2720,15 @@ int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
RETURN(mea_size);
}
-static int lmv_cancel_unused(struct obd_export *exp,
- const struct lu_fid *fid,
- ldlm_policy_data_t *policy,
- ldlm_mode_t mode, int flags, void *opaque)
+static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
+ ldlm_policy_data_t *policy, ldlm_mode_t mode,
+ int flags, void *opaque)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- int rc = 0, err, i;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int rc = 0;
+ int err;
+ int i;
ENTRY;
LASSERT(fid != NULL);
@@ -2782,11 +2747,12 @@ static int lmv_cancel_unused(struct obd_export *exp,
int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
-
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int rc;
ENTRY;
- RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
+ rc = md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data);
+ RETURN(rc);
}
ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
@@ -2794,18 +2760,20 @@ ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
ldlm_policy_data_t *policy, ldlm_mode_t mode,
struct lustre_handle *lockh)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- ldlm_mode_t rc;
- int i;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ ldlm_mode_t rc;
+ int i;
ENTRY;
- CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
+ CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
- /* with CMD every object can have two locks in different namespaces:
+ /*
+ * With CMD every object can have two locks in different namespaces:
* lookup lock in space of mds storing direntry and update/open lock in
* space of mds storing inode. Thus we check all targets, not only that
- * one fid was created in. */
+ * one fid was created in.
+ */
for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
type, policy, mode, lockh);
@@ -2820,10 +2788,9 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
struct obd_export *dt_exp, struct obd_export *md_exp,
struct lustre_md *md)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- int rc;
-
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ int rc;
ENTRY;
rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, dt_exp, md_exp, md);
RETURN(rc);
@@ -2831,12 +2798,10 @@ int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
-
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
ENTRY;
- /* XXX LOV STACKING */
if (md->mea)
obd_free_memmd(exp, (void *)&md->mea);
RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
@@ -2846,32 +2811,31 @@ int lmv_set_open_replay_data(struct obd_export *exp,
struct obd_client_handle *och,
struct ptlrpc_request *open_req)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
-
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
ENTRY;
- tgt_exp = lmv_find_export(lmv, &och->och_fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &och->och_fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- RETURN(md_set_open_replay_data(tgt_exp, och, open_req));
+ RETURN(md_set_open_replay_data(tgt->ltd_exp, och, open_req));
}
int lmv_clear_open_replay_data(struct obd_export *exp,
struct obd_client_handle *och)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
ENTRY;
- tgt_exp = lmv_find_export(lmv, &och->och_fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &och->och_fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- RETURN(md_clear_open_replay_data(tgt_exp, och));
+ RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
}
static int lmv_get_remote_perm(struct obd_export *exp,
@@ -2879,44 +2843,42 @@ static int lmv_get_remote_perm(struct obd_export *exp,
struct obd_capa *oc, __u32 suppgid,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
-
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
-
- rc = md_get_remote_perm(tgt_exp, fid, oc, suppgid, request);
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+ rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
RETURN(rc);
}
static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
renew_capa_cb_t cb)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, &oc->c_capa.lc_fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_renew_capa(tgt_exp, oc, cb);
+ rc = md_renew_capa(tgt->ltd_exp, oc, cb);
RETURN(rc);
}
@@ -2924,24 +2886,53 @@ int lmv_intent_getattr_async(struct obd_export *exp,
struct md_enqueue_info *minfo,
struct ldlm_enqueue_info *einfo)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct md_op_data *op_data = &minfo->mi_data;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_object *obj;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ int sidx;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- if (fid_is_zero(&minfo->mi_data.op_fid2))
- tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid1);
- else
- tgt_exp = lmv_find_export(lmv, &minfo->mi_data.op_fid2);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ if (!fid_is_sane(&op_data->op_fid2)) {
+ obj = lmv_object_find(obd, &op_data->op_fid1);
+ if (obj && op_data->op_namelen) {
+ sidx = raw_name2idx(obj->lo_hashtype,
+ obj->lo_objcount,
+ (char *)op_data->op_name,
+ op_data->op_namelen);
+ op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
+ tgt = lmv_get_target(lmv,
+ obj->lo_stripes[sidx].ls_mds);
+ CDEBUG(D_INODE,
+ "Choose slave dir ("DFID") -> mds #%d\n",
+ PFID(&op_data->op_fid1), tgt->ltd_idx);
+ } else {
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ }
+ if (obj)
+ lmv_object_put(obj);
+ } else {
+ op_data->op_fid1 = op_data->op_fid2;
+ tgt = lmv_find_target(lmv, &op_data->op_fid2);
+ op_data->op_bias = MDS_CROSS_REF;
+ /*
+ * Unfortunately, we have to lie to MDC/MDS to retrieve
+ * attributes llite needs.
+ */
+ if (minfo->mi_it.it_op & IT_LOOKUP)
+ minfo->mi_it.it_op = IT_GETATTR;
+ }
+
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_intent_getattr_async(tgt_exp, minfo, einfo);
+ rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
RETURN(rc);
}
@@ -2949,21 +2940,21 @@ int lmv_revalidate_lock(struct obd_export *exp,
struct lookup_intent *it,
struct lu_fid *fid)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct obd_export *tgt_exp;
- int rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
if (rc)
RETURN(rc);
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- RETURN(PTR_ERR(tgt_exp));
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_revalidate_lock(tgt_exp, it, fid);
+ rc = md_revalidate_lock(tgt->ltd_exp, it, fid);
RETURN(rc);
}
@@ -2977,8 +2968,6 @@ struct obd_ops lmv_obd_ops = {
.o_connect = lmv_connect,
.o_disconnect = lmv_disconnect,
.o_statfs = lmv_statfs,
- .o_llog_init = lmv_llog_init,
- .o_llog_finish = lmv_llog_finish,
.o_get_info = lmv_get_info,
.o_set_info_async = lmv_set_info_async,
.o_packmd = lmv_packmd,
@@ -3024,13 +3013,13 @@ struct md_ops lmv_md_ops = {
int __init lmv_init(void)
{
struct lprocfs_static_vars lvars;
- int rc;
+ int rc;
- obj_cache = cfs_mem_cache_create("lmv_objects",
- sizeof(struct lmv_obj),
- 0, 0);
- if (!obj_cache) {
- CERROR("error allocating lmv objects cache\n");
+ lmv_object_cache = cfs_mem_cache_create("lmv_objects",
+ sizeof(struct lmv_object),
+ 0, 0);
+ if (!lmv_object_cache) {
+ CERROR("Error allocating lmv objects cache\n");
return -ENOMEM;
}
@@ -3038,7 +3027,7 @@ int __init lmv_init(void)
rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
lvars.module_vars, LUSTRE_LMV_NAME, NULL);
if (rc)
- cfs_mem_cache_destroy(obj_cache);
+ cfs_mem_cache_destroy(lmv_object_cache);
return rc;
}
@@ -3046,14 +3035,12 @@ int __init lmv_init(void)
#ifdef __KERNEL__
static void lmv_exit(void)
{
- int rc;
-
class_unregister_type(LUSTRE_LMV_NAME);
- rc = cfs_mem_cache_destroy(obj_cache);
- LASSERTF(rc == 0,
- "can't free lmv objects cache, %d object(s)"
- "still in use\n", atomic_read(&obj_cache_count));
+ LASSERTF(atomic_read(&lmv_object_count) == 0,
+ "Can't free lmv objects cache, %d object(s) busy\n",
+ atomic_read(&lmv_object_count));
+ cfs_mem_cache_destroy(lmv_object_cache);
}
MODULE_AUTHOR("Sun Microsystems, Inc. ");
diff --git a/lustre/lmv/lmv_object.c b/lustre/lmv/lmv_object.c
index f567fc9..f10c23c 100644
--- a/lustre/lmv/lmv_object.c
+++ b/lustre/lmv/lmv_object.c
@@ -59,35 +59,31 @@
#include
#include "lmv_internal.h"
-/* objects cache. */
-extern cfs_mem_cache_t *obj_cache;
-extern atomic_t obj_cache_count;
+extern cfs_mem_cache_t *lmv_object_cache;
+extern atomic_t lmv_object_count;
-/* object list and its guard. */
static CFS_LIST_HEAD(obj_list);
static spinlock_t obj_list_lock = SPIN_LOCK_UNLOCKED;
-/* creates new obj on passed @fid and @mea. */
-struct lmv_obj *
-lmv_obj_alloc(struct obd_device *obd,
- const struct lu_fid *fid,
- struct lmv_stripe_md *mea)
+struct lmv_object *lmv_object_alloc(struct obd_device *obd,
+ const struct lu_fid *fid,
+ struct lmv_stripe_md *mea)
{
- int i;
- struct lmv_obj *obj;
- unsigned int obj_size;
- struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ unsigned int obj_size;
+ struct lmv_object *obj;
+ int i;
LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR
|| mea->mea_magic == MEA_MAGIC_ALL_CHARS
|| mea->mea_magic == MEA_MAGIC_HASH_SEGMENT);
- OBD_SLAB_ALLOC(obj, obj_cache, CFS_ALLOC_STD,
+ OBD_SLAB_ALLOC(obj, lmv_object_cache, CFS_ALLOC_STD,
sizeof(*obj));
if (!obj)
return NULL;
- atomic_inc(&obj_cache_count);
+ atomic_inc(&lmv_object_count);
obj->lo_fid = *fid;
obj->lo_obd = obd;
@@ -98,141 +94,138 @@ lmv_obj_alloc(struct obd_device *obd,
atomic_set(&obj->lo_count, 0);
obj->lo_objcount = mea->mea_count;
- obj_size = sizeof(struct lmv_inode) *
+ obj_size = sizeof(struct lmv_stripe) *
lmv->desc.ld_tgt_count;
- OBD_ALLOC(obj->lo_inodes, obj_size);
- if (!obj->lo_inodes)
+ OBD_ALLOC(obj->lo_stripes, obj_size);
+ if (!obj->lo_stripes)
goto err_obj;
- memset(obj->lo_inodes, 0, obj_size);
+ memset(obj->lo_stripes, 0, obj_size);
- /* put all ids in */
+ CDEBUG(D_INODE, "Allocate object for "DFID"\n",
+ PFID(fid));
for (i = 0; i < mea->mea_count; i++) {
int rc;
- CDEBUG(D_OTHER, "subobj "DFID"\n",
+ CDEBUG(D_INODE, "Process subobject "DFID"\n",
PFID(&mea->mea_ids[i]));
- obj->lo_inodes[i].li_fid = mea->mea_ids[i];
- LASSERT(fid_is_sane(&obj->lo_inodes[i].li_fid));
+ obj->lo_stripes[i].ls_fid = mea->mea_ids[i];
+ LASSERT(fid_is_sane(&obj->lo_stripes[i].ls_fid));
/*
* Cache slave mds number to use it in all cases it is needed
* instead of constant lookup.
*/
- rc = lmv_fld_lookup(lmv, &obj->lo_inodes[i].li_fid,
- &obj->lo_inodes[i].li_mds);
+ rc = lmv_fld_lookup(lmv, &obj->lo_stripes[i].ls_fid,
+ &obj->lo_stripes[i].ls_mds);
if (rc)
goto err_obj;
}
return obj;
-
err_obj:
OBD_FREE(obj, sizeof(*obj));
return NULL;
}
-/* destroy passed @obj. */
-void
-lmv_obj_free(struct lmv_obj *obj)
+void lmv_object_free(struct lmv_object *obj)
{
- struct lmv_obd *lmv = &obj->lo_obd->u.lmv;
- unsigned int obj_size;
+ struct lmv_obd *lmv = &obj->lo_obd->u.lmv;
+ unsigned int obj_size;
LASSERT(!atomic_read(&obj->lo_count));
- obj_size = sizeof(struct lmv_inode) *
+ obj_size = sizeof(struct lmv_stripe) *
lmv->desc.ld_tgt_count;
- OBD_FREE(obj->lo_inodes, obj_size);
- OBD_SLAB_FREE(obj, obj_cache, sizeof(*obj));
- atomic_dec(&obj_cache_count);
+ OBD_FREE(obj->lo_stripes, obj_size);
+ OBD_SLAB_FREE(obj, lmv_object_cache, sizeof(*obj));
+ atomic_dec(&lmv_object_count);
}
-static void
-__lmv_obj_add(struct lmv_obj *obj)
+static void __lmv_object_add(struct lmv_object *obj)
{
atomic_inc(&obj->lo_count);
list_add(&obj->lo_list, &obj_list);
}
-void
-lmv_obj_add(struct lmv_obj *obj)
+void lmv_object_add(struct lmv_object *obj)
{
spin_lock(&obj_list_lock);
- __lmv_obj_add(obj);
+ __lmv_object_add(obj);
spin_unlock(&obj_list_lock);
}
-static void
-__lmv_obj_del(struct lmv_obj *obj)
+static void __lmv_object_del(struct lmv_object *obj)
{
list_del(&obj->lo_list);
- lmv_obj_free(obj);
+ lmv_object_free(obj);
}
-void
-lmv_obj_del(struct lmv_obj *obj)
+void lmv_object_del(struct lmv_object *obj)
{
spin_lock(&obj_list_lock);
- __lmv_obj_del(obj);
+ __lmv_object_del(obj);
spin_unlock(&obj_list_lock);
}
-static struct lmv_obj *
-__lmv_obj_get(struct lmv_obj *obj)
+static struct lmv_object *__lmv_object_get(struct lmv_object *obj)
{
LASSERT(obj != NULL);
atomic_inc(&obj->lo_count);
return obj;
}
-struct lmv_obj *
-lmv_obj_get(struct lmv_obj *obj)
+struct lmv_object *lmv_object_get(struct lmv_object *obj)
{
spin_lock(&obj_list_lock);
- __lmv_obj_get(obj);
+ __lmv_object_get(obj);
spin_unlock(&obj_list_lock);
return obj;
}
-static void
-__lmv_obj_put(struct lmv_obj *obj)
+static void __lmv_object_put(struct lmv_object *obj)
{
LASSERT(obj);
if (atomic_dec_and_test(&obj->lo_count)) {
- CDEBUG(D_OTHER, "last reference to "DFID" - "
+ CDEBUG(D_INODE, "Last reference to "DFID" - "
"destroying\n", PFID(&obj->lo_fid));
- __lmv_obj_del(obj);
+ __lmv_object_del(obj);
}
}
-void
-lmv_obj_put(struct lmv_obj *obj)
+void lmv_object_put(struct lmv_object *obj)
{
spin_lock(&obj_list_lock);
- __lmv_obj_put(obj);
+ __lmv_object_put(obj);
spin_unlock(&obj_list_lock);
}
-static struct lmv_obj *
-__lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid)
+void lmv_object_put_unlock(struct lmv_object *obj)
+{
+ lmv_object_unlock(obj);
+ lmv_object_put(obj);
+}
+
+static struct lmv_object *__lmv_object_find(struct obd_device *obd, const struct lu_fid *fid)
{
- struct lmv_obj *obj;
- struct list_head *cur;
+ struct lmv_object *obj;
+ struct list_head *cur;
list_for_each(cur, &obj_list) {
- obj = list_entry(cur, struct lmv_obj, lo_list);
+ obj = list_entry(cur, struct lmv_object, lo_list);
- /* check if object is in progress of destroying. If so - skip
- * it. */
+ /*
+ * Check if object is in destroying phase. If so - skip
+ * it.
+ */
if (obj->lo_state & O_FREEING)
continue;
/*
- * we should make sure, that we have found object belong to
+ * We should make sure, that we have found object belong to
* passed obd. It is possible that, object manager will have two
* objects with the same fid belong to different obds, if client
* and mds runs on the same host. May be it is good idea to have
@@ -241,84 +234,99 @@ __lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid)
if (obj->lo_obd != obd)
continue;
- /* check if this is what we're looking for. */
+ /*
+ * Check if this is what we're looking for.
+ */
if (lu_fid_eq(&obj->lo_fid, fid))
- return __lmv_obj_get(obj);
+ return __lmv_object_get(obj);
}
return NULL;
}
-struct lmv_obj *
-lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid)
+struct lmv_object *lmv_object_find(struct obd_device *obd,
+ const struct lu_fid *fid)
{
- struct lmv_obj *obj;
+ struct lmv_object *obj;
ENTRY;
spin_lock(&obj_list_lock);
- obj = __lmv_obj_grab(obd, fid);
+ obj = __lmv_object_find(obd, fid);
spin_unlock(&obj_list_lock);
RETURN(obj);
}
-/* looks in objects list for an object that matches passed @fid. If it is not
- * found -- creates it using passed @mea and puts onto list. */
-static struct lmv_obj *
-__lmv_obj_create(struct obd_device *obd, const struct lu_fid *fid,
- struct lmv_stripe_md *mea)
+struct lmv_object *lmv_object_find_lock(struct obd_device *obd,
+ const struct lu_fid *fid)
{
- struct lmv_obj *new, *obj;
+ struct lmv_object *obj;
ENTRY;
- obj = lmv_obj_grab(obd, fid);
+ obj = lmv_object_find(obd, fid);
+ if (obj)
+ lmv_object_lock(obj);
+
+ RETURN(obj);
+}
+
+static struct lmv_object *__lmv_object_create(struct obd_device *obd,
+ const struct lu_fid *fid,
+ struct lmv_stripe_md *mea)
+{
+ struct lmv_object *new;
+ struct lmv_object *obj;
+ ENTRY;
+
+ obj = lmv_object_find(obd, fid);
if (obj)
RETURN(obj);
- /* no such object yet, allocate and initialize it. */
- new = lmv_obj_alloc(obd, fid, mea);
+ new = lmv_object_alloc(obd, fid, mea);
if (!new)
RETURN(NULL);
- /* check if someone create it already while we were dealing with
- * allocating @obj. */
+ /*
+ * Check if someone created it already while we were dealing with
+ * allocating @obj.
+ */
spin_lock(&obj_list_lock);
- obj = __lmv_obj_grab(obd, fid);
+ obj = __lmv_object_find(obd, fid);
if (obj) {
- /* someone created it already - put @obj and getting out. */
+ /*
+ * Someone created it already - put @obj and getting out.
+ */
spin_unlock(&obj_list_lock);
- lmv_obj_free(new);
+ lmv_object_free(new);
RETURN(obj);
}
- __lmv_obj_add(new);
- __lmv_obj_get(new);
+ __lmv_object_add(new);
+ __lmv_object_get(new);
spin_unlock(&obj_list_lock);
- CDEBUG(D_OTHER, "new obj in lmv cache: "DFID"\n",
+ CDEBUG(D_INODE, "New obj in lmv cache: "DFID"\n",
PFID(fid));
RETURN(new);
-
}
-/* creates object from passed @fid and @mea. If @mea is NULL, it will be
- * obtained from correct MDT and used for constructing the object. */
-struct lmv_obj *
-lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
- struct lmv_stripe_md *mea)
+struct lmv_object *lmv_object_create(struct obd_export *exp,
+ const struct lu_fid *fid,
+ struct lmv_stripe_md *mea)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req = NULL;
- struct obd_export *tgt_exp;
- struct lmv_obj *obj;
- struct lustre_md md;
- int mealen, rc;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct ptlrpc_request *req = NULL;
+ struct lmv_tgt_desc *tgt;
+ struct lmv_object *obj;
+ struct lustre_md md;
+ int mealen;
+ int rc;
ENTRY;
- CDEBUG(D_OTHER, "get mea for "DFID" and create lmv obj\n",
+ CDEBUG(D_INODE, "Get mea for "DFID" and create lmv obj\n",
PFID(fid));
md.mea = NULL;
@@ -326,18 +334,20 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
if (mea == NULL) {
__u64 valid;
- CDEBUG(D_OTHER, "mea isn't passed in, get it now\n");
+ CDEBUG(D_INODE, "Mea isn't passed in, get it now\n");
mealen = lmv_get_easize(lmv);
- /* time to update mea of parent fid */
+ /*
+ * Time to update mea of parent fid.
+ */
md.mea = NULL;
valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
- tgt_exp = lmv_find_export(lmv, fid);
- if (IS_ERR(tgt_exp))
- GOTO(cleanup, obj = (void *)tgt_exp);
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ GOTO(cleanup, obj = (void *)tgt);
- rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req);
+ rc = md_getattr(tgt->ltd_exp, fid, NULL, valid, mealen, &req);
if (rc) {
CERROR("md_getattr() failed, error %d\n", rc);
GOTO(cleanup, obj = ERR_PTR(rc));
@@ -345,7 +355,7 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
rc = md_get_lustre_md(exp, req, NULL, exp, &md);
if (rc) {
- CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
+ CERROR("md_get_lustre_md() failed, error %d\n", rc);
GOTO(cleanup, obj = ERR_PTR(rc));
}
@@ -355,15 +365,16 @@ lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
mea = md.mea;
}
- /* got mea, now create obj for it. */
- obj = __lmv_obj_create(obd, fid, mea);
+ /*
+ * Got mea, now create obj for it.
+ */
+ obj = __lmv_object_create(obd, fid, mea);
if (!obj) {
CERROR("Can't create new object "DFID"\n",
PFID(fid));
GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
}
- /* XXX LOV STACKING */
if (md.mea != NULL)
obd_free_memmd(exp, (void *)&md.mea);
@@ -374,35 +385,26 @@ cleanup:
return obj;
}
-/*
- * looks for object with @fid and orders to destroy it. It is possible the object
- * will not be destroyed right now, because it is still using by someone. In
- * this case it will be marked as "freeing" and will not be accessible anymore
- * for subsequent callers of lmv_obj_grab().
- */
-int
-lmv_obj_delete(struct obd_export *exp, const struct lu_fid *fid)
+int lmv_object_delete(struct obd_export *exp, const struct lu_fid *fid)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obj *obj;
- int rc = 0;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_object *obj;
+ int rc = 0;
ENTRY;
spin_lock(&obj_list_lock);
- obj = __lmv_obj_grab(obd, fid);
+ obj = __lmv_object_find(obd, fid);
if (obj) {
obj->lo_state |= O_FREEING;
- __lmv_obj_put(obj);
- __lmv_obj_put(obj);
+ __lmv_object_put(obj);
+ __lmv_object_put(obj);
rc = 1;
}
spin_unlock(&obj_list_lock);
-
RETURN(rc);
}
-int
-lmv_obj_setup(struct obd_device *obd)
+int lmv_object_setup(struct obd_device *obd)
{
ENTRY;
LASSERT(obd != NULL);
@@ -413,11 +415,11 @@ lmv_obj_setup(struct obd_device *obd)
RETURN(0);
}
-void
-lmv_obj_cleanup(struct obd_device *obd)
+void lmv_object_cleanup(struct obd_device *obd)
{
- struct list_head *cur, *tmp;
- struct lmv_obj *obj;
+ struct list_head *cur;
+ struct list_head *tmp;
+ struct lmv_object *obj;
ENTRY;
CDEBUG(D_INFO, "LMV object manager cleanup (%s)\n",
@@ -425,17 +427,17 @@ lmv_obj_cleanup(struct obd_device *obd)
spin_lock(&obj_list_lock);
list_for_each_safe(cur, tmp, &obj_list) {
- obj = list_entry(cur, struct lmv_obj, lo_list);
+ obj = list_entry(cur, struct lmv_object, lo_list);
if (obj->lo_obd != obd)
continue;
obj->lo_state |= O_FREEING;
if (atomic_read(&obj->lo_count) > 1) {
- CERROR("obj "DFID" has count > 1 (%d)\n",
+ CERROR("Object "DFID" has count (%d)\n",
PFID(&obj->lo_fid), atomic_read(&obj->lo_count));
}
- __lmv_obj_put(obj);
+ __lmv_object_put(obj);
}
spin_unlock(&obj_list_lock);
EXIT;
diff --git a/lustre/lmv/lproc_lmv.c b/lustre/lmv/lproc_lmv.c
index 364943c..e880d23 100644
--- a/lustre/lmv/lproc_lmv.c
+++ b/lustre/lmv/lproc_lmv.c
@@ -49,8 +49,8 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { {0} };
static int lmv_rd_numobd(char *page, char **start, off_t off, int count,
int *eof, void *data)
{
- struct obd_device *dev = (struct obd_device*)data;
- struct lmv_desc *desc;
+ struct obd_device *dev = (struct obd_device*)data;
+ struct lmv_desc *desc;
LASSERT(dev != NULL);
desc = &dev->u.lmv.desc;
@@ -59,11 +59,83 @@ static int lmv_rd_numobd(char *page, char **start, off_t off, int count,
}
+static const char *placement_name[] = {
+ [PLACEMENT_CHAR_POLICY] = "CHAR",
+ [PLACEMENT_NID_POLICY] = "NID"
+};
+
+static placement_policy_t placement_name2policy(char *name, int len)
+{
+ int i;
+
+ for (i = 0; i < PLACEMENT_MAX_POLICY; i++) {
+ if (!strncmp(placement_name[i], name, len))
+ return i;
+ }
+ return PLACEMENT_INVAL_POLICY;
+}
+
+static const char *placement_policy2name(placement_policy_t placement)
+{
+ LASSERT(placement < PLACEMENT_MAX_POLICY);
+ return placement_name[placement];
+}
+
+static int lmv_rd_placement(char *page, char **start, off_t off, int count,
+ int *eof, void *data)
+{
+ struct obd_device *dev = (struct obd_device*)data;
+ struct lmv_obd *lmv;
+
+ LASSERT(dev != NULL);
+ lmv = &dev->u.lmv;
+ *eof = 1;
+ return snprintf(page, count, "%s\n",
+ placement_policy2name(lmv->lmv_placement));
+
+}
+
+#define MAX_POLICY_STRING_SIZE 64
+
+static int lmv_wr_placement(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *dev = (struct obd_device *)data;
+ char dummy[MAX_POLICY_STRING_SIZE + 1];
+ int len = count;
+ placement_policy_t policy;
+ struct lmv_obd *lmv;
+
+ if (copy_from_user(dummy, buffer, MAX_POLICY_STRING_SIZE))
+ return -EFAULT;
+
+ LASSERT(dev != NULL);
+ lmv = &dev->u.lmv;
+
+ if (len > MAX_POLICY_STRING_SIZE)
+ len = MAX_POLICY_STRING_SIZE;
+
+ if (dummy[len - 1] == '\n')
+ len--;
+ dummy[len] = '\0';
+
+ policy = placement_name2policy(dummy, len);
+ if (policy != PLACEMENT_INVAL_POLICY) {
+ spin_lock(&lmv->lmv_lock);
+ lmv->lmv_placement = policy;
+ spin_unlock(&lmv->lmv_lock);
+ } else {
+ CERROR("Invalid placement policy \"%s\"!\n", dummy);
+ return -EINVAL;
+ }
+ return count;
+}
+
static int lmv_rd_activeobd(char *page, char **start, off_t off, int count,
int *eof, void *data)
{
- struct obd_device* dev = (struct obd_device*)data;
- struct lmv_desc *desc;
+ struct obd_device *dev = (struct obd_device*)data;
+ struct lmv_desc *desc;
LASSERT(dev != NULL);
desc = &dev->u.lmv.desc;
@@ -74,8 +146,8 @@ static int lmv_rd_activeobd(char *page, char **start, off_t off, int count,
static int lmv_rd_desc_uuid(char *page, char **start, off_t off, int count,
int *eof, void *data)
{
- struct obd_device *dev = (struct obd_device*) data;
- struct lmv_obd *lmv;
+ struct obd_device *dev = (struct obd_device*) data;
+ struct lmv_obd *lmv;
LASSERT(dev != NULL);
lmv = &dev->u.lmv;
@@ -85,9 +157,8 @@ static int lmv_rd_desc_uuid(char *page, char **start, off_t off, int count,
static void *lmv_tgt_seq_start(struct seq_file *p, loff_t *pos)
{
- struct obd_device *dev = p->private;
- struct lmv_obd *lmv = &dev->u.lmv;
-
+ struct obd_device *dev = p->private;
+ struct lmv_obd *lmv = &dev->u.lmv;
return (*pos >= lmv->desc.ld_tgt_count) ? NULL : &(lmv->tgts[*pos]);
}
@@ -99,37 +170,37 @@ static void lmv_tgt_seq_stop(struct seq_file *p, void *v)
static void *lmv_tgt_seq_next(struct seq_file *p, void *v, loff_t *pos)
{
- struct obd_device *dev = p->private;
- struct lmv_obd *lmv = &dev->u.lmv;
-
+ struct obd_device *dev = p->private;
+ struct lmv_obd *lmv = &dev->u.lmv;
++*pos;
return (*pos >=lmv->desc.ld_tgt_count) ? NULL : &(lmv->tgts[*pos]);
}
static int lmv_tgt_seq_show(struct seq_file *p, void *v)
{
- struct lmv_tgt_desc *tgt = v;
- struct obd_device *dev = p->private;
- struct lmv_obd *lmv = &dev->u.lmv;
- int idx = tgt - &(lmv->tgts[0]);
+ struct lmv_tgt_desc *tgt = v;
+ struct obd_device *dev = p->private;
+ struct lmv_obd *lmv = &dev->u.lmv;
+ int idx = tgt - &(lmv->tgts[0]);
return seq_printf(p, "%d: %s %sACTIVE\n", idx, tgt->ltd_uuid.uuid,
tgt->ltd_active ? "" : "IN");
}
struct seq_operations lmv_tgt_sops = {
- .start = lmv_tgt_seq_start,
- .stop = lmv_tgt_seq_stop,
- .next = lmv_tgt_seq_next,
- .show = lmv_tgt_seq_show,
+ .start = lmv_tgt_seq_start,
+ .stop = lmv_tgt_seq_stop,
+ .next = lmv_tgt_seq_next,
+ .show = lmv_tgt_seq_show,
};
static int lmv_target_seq_open(struct inode *inode, struct file *file)
{
- struct proc_dir_entry *dp = PDE(inode);
- struct seq_file *seq;
- int rc = seq_open(file, &lmv_tgt_sops);
-
+ struct proc_dir_entry *dp = PDE(inode);
+ struct seq_file *seq;
+ int rc;
+
+ rc = seq_open(file, &lmv_tgt_sops);
if (rc)
return rc;
@@ -140,29 +211,30 @@ static int lmv_target_seq_open(struct inode *inode, struct file *file)
}
struct lprocfs_vars lprocfs_lmv_obd_vars[] = {
- { "numobd", lmv_rd_numobd, 0, 0 },
- { "activeobd", lmv_rd_activeobd, 0, 0 },
- { "uuid", lprocfs_rd_uuid, 0, 0 },
- { "desc_uuid", lmv_rd_desc_uuid, 0, 0 },
+ { "numobd", lmv_rd_numobd, 0, 0 },
+ { "placement", lmv_rd_placement, lmv_wr_placement, 0 },
+ { "activeobd", lmv_rd_activeobd, 0, 0 },
+ { "uuid", lprocfs_rd_uuid, 0, 0 },
+ { "desc_uuid", lmv_rd_desc_uuid, 0, 0 },
{ 0 }
};
static struct lprocfs_vars lprocfs_lmv_module_vars[] = {
- { "num_refs", lprocfs_rd_numrefs, 0, 0 },
+ { "num_refs", lprocfs_rd_numrefs, 0, 0 },
{ 0 }
};
struct file_operations lmv_proc_target_fops = {
- .owner = THIS_MODULE,
- .open = lmv_target_seq_open,
- .read = seq_read,
- .llseek = seq_lseek,
- .release = seq_release,
+ .owner = THIS_MODULE,
+ .open = lmv_target_seq_open,
+ .read = seq_read,
+ .llseek = seq_lseek,
+ .release = seq_release,
};
#endif /* LPROCFS */
void lprocfs_lmv_init_vars(struct lprocfs_static_vars *lvars)
{
- lvars->module_vars = lprocfs_lmv_module_vars;
- lvars->obd_vars = lprocfs_lmv_obd_vars;
+ lvars->module_vars = lprocfs_lmv_module_vars;
+ lvars->obd_vars = lprocfs_lmv_obd_vars;
}
diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c
index 545ec84..5eecc29 100644
--- a/lustre/mdt/mdt_handler.c
+++ b/lustre/mdt/mdt_handler.c
@@ -777,7 +777,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
struct md_object *next = mdt_object_child(parent);
struct lu_fid *child_fid = &info->mti_tmp_fid1;
struct lu_name *lname = NULL;
- const char *name;
+ const char *name = NULL;
int namelen = 0;
struct mdt_lock_handle *lhp;
struct ldlm_lock *lock;
@@ -798,27 +798,30 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
namelen = req_capsule_get_size(info->mti_pill, &RMF_NAME,
RCL_CLIENT) - 1;
- LASSERT(namelen >= 0);
-
- /* XXX: "namelen == 0" is for getattr by fid (OBD_CONNECT_ATTRFID),
- * otherwise do not allow empty name, that is the name must contain
- * at least one character and the terminating '\0'*/
- if (namelen == 0) {
- reqbody =req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
- LASSERT(fid_is_sane(&reqbody->fid2));
- name = NULL;
-
- CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
- "ldlm_rep = %p\n",
- PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2),
- ldlm_rep);
- } else {
- lname = mdt_name(info->mti_env, (char *)name, namelen);
- CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
- "ldlm_rep = %p\n",
- PFID(mdt_object_fid(parent)), name, ldlm_rep);
- }
+ if (!info->mti_cross_ref) {
+ /*
+ * XXX: Check for "namelen == 0" is for getattr by fid
+ * (OBD_CONNECT_ATTRFID), otherwise do not allow empty name,
+ * that is the name must contain at least one character and
+ * the terminating '\0'
+ */
+ if (namelen == 0) {
+ reqbody = req_capsule_client_get(info->mti_pill,
+ &RMF_MDT_BODY);
+ LASSERT(fid_is_sane(&reqbody->fid2));
+ name = NULL;
+ CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
+ "ldlm_rep = %p\n",
+ PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2),
+ ldlm_rep);
+ } else {
+ lname = mdt_name(info->mti_env, (char *)name, namelen);
+ CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
+ "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
+ name, ldlm_rep);
+ }
+ }
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
rc = mdt_object_exists(parent);
@@ -827,10 +830,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
&parent->mot_obj.mo_lu,
"Parent doesn't exist!\n");
RETURN(-ESTALE);
- } else
+ } else if (!info->mti_cross_ref) {
LASSERTF(rc > 0, "Parent "DFID" is on remote server\n",
PFID(mdt_object_fid(parent)));
-
+ }
if (lname) {
rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
if (rc != 0) {
@@ -1244,7 +1247,7 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
continue;
fid_le_to_cpu(lf, &ent->lde_fid);
- if (le32_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT)
+ if (le64_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT)
ma->ma_attr.la_mode = S_IFDIR;
else
ma->ma_attr.la_mode = 0;
@@ -1254,7 +1257,7 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
lname = mdt_name(info->mti_env, name,
- le16_to_cpu(ent->lde_namelen) + 1);
+ le16_to_cpu(ent->lde_namelen));
ma->ma_attr_flags |= MDS_PERM_BYPASS;
rc = mdo_name_insert(info->mti_env,
md_object_next(&object->mot_obj),
@@ -1392,9 +1395,9 @@ static int mdt_readpage(struct mdt_thread_info *info)
* reqbody->nlink contains number bytes to read.
*/
rdpg->rp_hash = reqbody->size;
- if ((__u64)rdpg->rp_hash != reqbody->size) {
- CERROR("Invalid hash: %#llx != %#llx\n",
- (__u64)rdpg->rp_hash, reqbody->size);
+ if (rdpg->rp_hash != reqbody->size) {
+ CERROR("Invalid hash: "LPX64" != "LPX64"\n",
+ rdpg->rp_hash, reqbody->size);
RETURN(-EFAULT);
}
rdpg->rp_count = reqbody->nlink;
@@ -1800,6 +1803,17 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
LASSERT(lh->mlh_type != MDT_PDO_LOCK);
}
+ if (lh->mlh_type == MDT_PDO_LOCK) {
+ /* check for exists after object is locked */
+ if (mdt_object_exists(o) == 0) {
+ /* Non-existent object shouldn't have PDO lock */
+ RETURN(-ESTALE);
+ } else {
+ /* Non-dir object shouldn't have PDO lock */
+ LASSERT(S_ISDIR(lu_object_attr(&o->mot_obj.mo_lu)));
+ }
+ }
+
memset(policy, 0, sizeof(*policy));
fid_build_reg_res_name(mdt_object_fid(o), res_id);
@@ -1835,7 +1849,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
/*
* Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
* going to be sent to client. If it is - mdt_intent_policy() path will
- * fix it up and turns FL_LOCAL flag off.
+ * fix it up and turn FL_LOCAL flag off.
*/
rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
@@ -1843,16 +1857,6 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
if (rc)
GOTO(out, rc);
- if (lh->mlh_type == MDT_PDO_LOCK) {
- /* check for exists after object is locked */
- if (mdt_object_exists(o) == 0) {
- /* Non-existent object shouldn't have PDO lock */
- rc = -ESTALE;
- } else {
- /* Non-dir object shouldn't have PDO lock */
- LASSERT(S_ISDIR(lu_object_attr(&o->mot_obj.mo_lu)));
- }
- }
out:
if (rc)
mdt_object_unlock(info, o, lh, 1);
diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c
index b3dc462..c2abdf1 100644
--- a/lustre/mdt/mdt_lib.c
+++ b/lustre/mdt/mdt_lib.c
@@ -690,7 +690,8 @@ static __u64 mdt_attr_valid_xlate(__u64 in, struct mdt_reint_record *rr,
in &= ~(ATTR_MODE|ATTR_UID|ATTR_GID|ATTR_SIZE|ATTR_BLOCKS|
ATTR_ATIME|ATTR_MTIME|ATTR_CTIME|ATTR_FROM_OPEN|
ATTR_ATIME_SET|ATTR_CTIME_SET|ATTR_MTIME_SET|
- ATTR_ATTR_FLAG|ATTR_RAW|MDS_OPEN_OWNEROVERRIDE);
+ ATTR_ATTR_FLAG|ATTR_RAW|MDS_OPEN_OWNEROVERRIDE|
+ ATTR_FORCE|ATTR_KILL_SUID);
if (in != 0)
CERROR("Unknown attr bits: %#llx\n", in);
return out;
@@ -848,9 +849,14 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
req_capsule_client_get(pill, &RMF_CAPA1));
mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
- rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
- rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
- LASSERT(rr->rr_name && rr->rr_namelen > 0);
+ if (!info->mti_cross_ref) {
+ rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
+ LASSERT(rr->rr_name && rr->rr_namelen > 0);
+ } else {
+ rr->rr_name = NULL;
+ rr->rr_namelen = 0;
+ }
#ifdef CONFIG_FS_POSIX_ACL
if (sp->sp_cr_flags & MDS_CREATE_RMT_ACL) {
@@ -938,13 +944,14 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
mdt_set_capainfo(info, 1, rr->rr_fid2,
req_capsule_client_get(pill, &RMF_CAPA2));
+ info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT);
+ info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF);
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (rr->rr_name == NULL)
RETURN(-EFAULT);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
- LASSERT(rr->rr_namelen > 0);
- info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT);
- info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF);
+ if (!info->mti_cross_ref)
+ LASSERT(rr->rr_namelen > 0);
rc = mdt_dlmreq_unpack(info);
RETURN(rc);
@@ -985,13 +992,18 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
mdt_set_capainfo(info, 0, rr->rr_fid1,
req_capsule_client_get(pill, &RMF_CAPA1));
- rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
- if (rr->rr_name == NULL)
- RETURN(-EFAULT);
- rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
- LASSERT(rr->rr_namelen > 0);
- info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT);
info->mti_cross_ref = !!(rec->ul_bias & MDS_CROSS_REF);
+ if (!info->mti_cross_ref) {
+ rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
+ rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
+ if (rr->rr_name == NULL || rr->rr_namelen == 0)
+ RETURN(-EFAULT);
+ } else {
+ rr->rr_name = NULL;
+ rr->rr_namelen = 0;
+
+ }
+ info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT);
if (rec->ul_bias & MDS_VTX_BYPASS)
ma->ma_attr_flags |= MDS_VTX_BYPASS;
else
@@ -1040,16 +1052,16 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
mdt_set_capainfo(info, 1, rr->rr_fid2,
req_capsule_client_get(pill, &RMF_CAPA2));
+ info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT);
+ info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF);
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
rr->rr_tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
if (rr->rr_name == NULL || rr->rr_tgt == NULL)
RETURN(-EFAULT);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
- LASSERT(rr->rr_namelen > 0);
rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT) - 1;
- LASSERT(rr->rr_tgtlen > 0);
- info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT);
- info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF);
+ if (!info->mti_cross_ref)
+ LASSERT(rr->rr_namelen > 0 && rr->rr_tgtlen > 0);
if (rec->rn_bias & MDS_VTX_BYPASS)
ma->ma_attr_flags |= MDS_VTX_BYPASS;
else
diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c
index 51f87c2..ea232c8 100644
--- a/lustre/mdt/mdt_reint.c
+++ b/lustre/mdt/mdt_reint.c
@@ -470,11 +470,21 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
RETURN(err_serious(-ENOENT));
- /* step 1: lock the parent */
+ /*
+ * step 1: lock the parent. Note, this may be child in case of
+ * remote operation denoted by ->mti_cross_ref flag.
+ */
parent_lh = &info->mti_lh[MDT_LH_PARENT];
- mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
- rr->rr_namelen);
-
+ if (info->mti_cross_ref) {
+ /*
+ * Init reg lock for cross ref case when we need to do only
+ * ref del locally.
+ */
+ mdt_lock_reg_init(parent_lh, LCK_PW);
+ } else {
+ mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
+ rr->rr_namelen);
+ }
mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
MDS_INODELOCK_UPDATE);
if (IS_ERR(mp)) {
@@ -500,7 +510,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
rc = mo_ref_del(info->mti_env,
mdt_object_child(mp), ma);
- mdt_handle_last_unlink(info, mp, ma);
+ if (rc == 0)
+ mdt_handle_last_unlink(info, mp, ma);
} else
rc = 0;
GOTO(out_unlock_parent, rc);
diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c
index e0e11b0..9fd874a 100644
--- a/lustre/ptlrpc/layout.c
+++ b/lustre/ptlrpc/layout.c
@@ -1373,8 +1373,11 @@ static void *__req_capsule_get(struct req_capsule *pill,
[RCL_SERVER] = "server"
};
+ LASSERT(pill != NULL);
+ LASSERT(pill != LP_POISON);
fmt = pill->rc_fmt;
LASSERT(fmt != NULL);
+ LASSERT(fmt != LP_POISON);
LASSERT(__req_format_is_sane(fmt));
offset = __req_capsule_offset(pill, field, loc);
diff --git a/lustre/tests/acceptance-small.sh b/lustre/tests/acceptance-small.sh
index d7955da..7e4a9b5 100755
--- a/lustre/tests/acceptance-small.sh
+++ b/lustre/tests/acceptance-small.sh
@@ -87,7 +87,7 @@ for NAME in $CONFIGS; do
export CLIENTMODSONLY=true
fi
- assert_env mds_HOST MDS_MKFS_OPTS MDSDEV
+ assert_env mds_HOST MDS_MKFS_OPTS
assert_env ost_HOST OST_MKFS_OPTS OSTCOUNT
assert_env FSNAME MOUNT MOUNT2
diff --git a/lustre/tests/cfg/lmv.sh b/lustre/tests/cfg/lmv.sh
index e964af7..10ba95f 100644
--- a/lustre/tests/cfg/lmv.sh
+++ b/lustre/tests/cfg/lmv.sh
@@ -47,6 +47,15 @@ DIR=${DIR:-$MOUNT}
DIR1=${DIR:-$MOUNT1}
DIR2=${DIR2:-$MOUNT2}
+if [ $UID -ne 0 ]; then
+ log "running as non-root uid $UID"
+ RUNAS_ID="$UID"
+ RUNAS=""
+else
+ RUNAS_ID=${RUNAS_ID:-500}
+ RUNAS=${RUNAS:-"runas -u $RUNAS_ID"}
+fi
+
PDSH=${PDSH:-no_dsh}
FAILURE_MODE=${FAILURE_MODE:-SOFT} # or HARD
POWER_DOWN=${POWER_DOWN:-"powerman --off"}
diff --git a/lustre/tests/cfg/local.sh b/lustre/tests/cfg/local.sh
index 5305b2d..aa8bc2a 100644
--- a/lustre/tests/cfg/local.sh
+++ b/lustre/tests/cfg/local.sh
@@ -13,8 +13,12 @@ CLIENTS=""
TMP=${TMP:-/tmp}
DAEMONSIZE=${DAEMONSIZE:-500}
-MDSDEV=${MDSDEV:-$TMP/${FSNAME}-mdt1}
MDSCOUNT=${MDSCOUNT:-1}
+[ $MDSCOUNT -gt 4 ] && MDSCOUNT=4
+for num in $(seq $MDSCOUNT); do
+ eval mds${num}_HOST=\$\{mds${num}_HOST:-$mds_HOST\}
+ eval mds${num}failover_HOST=\$\{mds${num}failover_HOST:-$mdsfailover_HOST\}
+done
MDSDEVBASE=${MDSDEVBASE:-$TMP/${FSNAME}-mdt}
MDSSIZE=${MDSSIZE:-100000}
MDSOPT=${MDSOPT:-"--mountfsoptions=acl"}
@@ -56,7 +60,8 @@ MKFSOPT=""
MDSOPT=$MDSOPT" --param lov.stripecount=$STRIPES_PER_OBJ"
[ "x$L_GETIDENTITY" != "x" ] &&
MDSOPT=$MDSOPT" --param mdt.identity_upcall=$L_GETIDENTITY"
-MDS_MKFS_OPTS="--mgs --mdt --fsname=$FSNAME --device-size=$MDSSIZE --param sys.timeout=$TIMEOUT $MKFSOPT $MDSOPT $MDS_MKFS_OPTS"
+MDS_MKFS_OPTS="--mgs --mdt --fsname=$FSNAME --device-size=$MDSSIZE --param sys.timeout=$TIMEOUT $MKFSOPT $MDSOPT"
+MDSn_MKFS_OPTS="--mgsnode=$MGSNID --mdt --fsname=$FSNAME --device-size=$MDSSIZE --param sys.timeout=$TIMEOUT $MKFSOPT $MDSOPT"
MKFSOPT=""
[ "x$OSTJOURNALSIZE" != "x" ] &&
diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh
index 6fec4c5..5f4ce6e 100644
--- a/lustre/tests/conf-sanity.sh
+++ b/lustre/tests/conf-sanity.sh
@@ -11,13 +11,11 @@ set -e
ONLY=${ONLY:-"$*"}
-# These tests don't apply to mountconf
-MOUNTCONFSKIP="10 11 12 13 13b 14 15"
# bug number for skipped test: 13739
HEAD_EXCEPT=" 32a 32b "
# bug number for skipped test:
-ALWAYS_EXCEPT=" $CONF_SANITY_EXCEPT $MOUNTCONFSKIP $HEAD_EXCEPT"
+ALWAYS_EXCEPT=" $CONF_SANITY_EXCEPT $HEAD_EXCEPT"
# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
SRCDIR=`dirname $0`
@@ -27,9 +25,6 @@ PTLDEBUG=${PTLDEBUG:--1}
SAVE_PWD=$PWD
LUSTRE=${LUSTRE:-`dirname $0`/..}
RLUSTRE=${RLUSTRE:-$LUSTRE}
-MOUNTLUSTRE=${MOUNTLUSTRE:-/sbin/mount.lustre}
-MKFSLUSTRE=${MKFSLUSTRE:-/usr/sbin/mkfs.lustre}
-HOSTNAME=`hostname`
. $LUSTRE/tests/test-framework.sh
init_test_env $@
@@ -40,7 +35,7 @@ fi
# use small MDS + OST size to speed formatting time
MDSSIZE=40000
OSTSIZE=40000
-. ${CONFIG:=$LUSTRE/tests/cfg/local.sh}
+. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
#
[ "$SLOW" = "no" ] && EXCEPT_SLOW="0 1 2 3 6 7 15 18 24b 25 30 31 32 33 34a "
@@ -52,12 +47,13 @@ reformat() {
}
writeconf() {
- local facet=mds
+ local facet=$SINGLEMDS
+ local dev=${facet}_dev
shift
stop ${facet} -f
rm -f ${facet}active
# who knows if/where $TUNEFS is installed? Better reformat if it fails...
- do_facet ${facet} "$TUNEFS --writeconf $MDSDEV" || echo "tunefs failed, reformatting instead" && reformat
+ do_facet ${facet} "$TUNEFS --writeconf ${!dev}" || echo "tunefs failed, reformatting instead" && reformat
}
gen_config() {
@@ -72,14 +68,16 @@ gen_config() {
}
start_mds() {
- echo "start mds service on `facet_active_host mds`"
- start mds $MDSDEV $MDS_MOUNT_OPTS || return 94
+ local facet=$SINGLEMDS
+ local dev=${facet}_dev
+ echo "start mds service on `facet_active_host $facet`"
+ start $facet ${!dev} $MDS_MOUNT_OPTS || return 94
}
stop_mds() {
- echo "stop mds service on `facet_active_host mds`"
+ echo "stop mds service on `facet_active_host $SINGLEMDS`"
# These tests all use non-failover stop
- stop mds -f || return 97
+ stop $SINGLEMDS -f || return 97
}
start_ost() {
@@ -385,263 +383,11 @@ test_9() {
run_test 9 "test ptldebug and subsystem for mkfs"
-test_10() {
- echo "generate configuration with the same name for node and mds"
- OLDXMLCONFIG=$XMLCONFIG
- XMLCONFIG="broken.xml"
- [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG
- facet="mds"
- rm -f ${facet}active
- add_facet $facet
- echo "the name for node and mds is the same"
- do_lmc --add mds --node ${facet}_facet --mds ${facet}_facet \
- --dev $MDSDEV --size $MDSSIZE || return $?
- do_lmc --add lov --mds ${facet}_facet --lov lov1 --stripe_sz \
- $STRIPE_BYTES --stripe_cnt $STRIPES_PER_OBJ \
- --stripe_pattern 0 || return $?
- add_ost ost --lov lov1 --dev $OSTDEV --size $OSTSIZE
- facet="client"
- add_facet $facet --lustre_upcall $UPCALL
- do_lmc --add mtpt --node ${facet}_facet --mds mds_facet \
- --lov lov1 --path $MOUNT
-
- echo "mount lustre"
- start_ost
- start_mds
- mount_client $MOUNT
- check_mount || return 41
- cleanup || return $?
-
- echo "Success!"
- XMLCONFIG=$OLDXMLCONFIG
-}
-run_test 10 "mount lustre with the same name for node and mds"
-
-test_11() {
- OLDXMLCONFIG=$XMLCONFIG
- XMLCONFIG="conf11.xml"
-
- [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG
- add_mds mds --dev $MDSDEV --size $MDSSIZE
- add_ost ost --dev $OSTDEV --size $OSTSIZE
- add_client client mds --path $MOUNT --ost ost_svc || return $?
- echo "Default lov config success!"
-
- [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG
- add_mds mds --dev $MDSDEV --size $MDSSIZE
- add_ost ost --dev $OSTDEV --size $OSTSIZE
- add_client client mds --path $MOUNT && return $?
- echo "--add mtpt with neither --lov nor --ost will return error"
-
- echo ""
- echo "Success!"
- XMLCONFIG=$OLDXMLCONFIG
-}
-run_test 11 "use default lov configuration (should return error)"
-
-test_12() {
- OLDXMLCONFIG=$XMLCONFIG
- XMLCONFIG="batch.xml"
- BATCHFILE="batchfile"
-
- # test double quote
- [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG
- [ -f "$BATCHFILE" ] && rm -f $BATCHFILE
- echo "--add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp" > $BATCHFILE
- echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions \"-I 128\"" >> $BATCHFILE
- # --mkfsoptions "-I 128"
- do_lmc -m $XMLCONFIG --batch $BATCHFILE || return $?
- if [ `sed -n '/>-I 128
$BATCHFILE
- echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions \"-I 128" >> $BATCHFILE
- # --mkfsoptions "-I 128
- do_lmc -m $XMLCONFIG --batch $BATCHFILE && return $?
- echo "unmatched double quote should return error"
-
- # test single quote
- rm -f $BATCHFILE
- echo "--add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp" > $BATCHFILE
- echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions '-I 128'" >> $BATCHFILE
- # --mkfsoptions '-I 128'
- do_lmc -m $XMLCONFIG --batch $BATCHFILE || return $?
- if [ `sed -n '/>-I 128 $BATCHFILE
- echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions '-I 128" >> $BATCHFILE
- # --mkfsoptions '-I 128
- do_lmc -m $XMLCONFIG --batch $BATCHFILE && return $?
- echo "unmatched single quote should return error"
-
- # test backslash
- rm -f $BATCHFILE
- echo "--add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp" > $BATCHFILE
- echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions \-\I\ \128" >> $BATCHFILE
- # --mkfsoptions \-\I\ \128
- do_lmc -m $XMLCONFIG --batch $BATCHFILE || return $?
- if [ `sed -n '/>-I 128 $BATCHFILE
- echo "--add mds --node $HOSTNAME --mds mds1 --mkfsoptions -I\ 128\\" >> $BATCHFILE
- # --mkfsoptions -I\ 128\
- do_lmc -m $XMLCONFIG --batch $BATCHFILE && return $?
- echo "backslash followed by nothing should return error"
-
- rm -f $BATCHFILE
- XMLCONFIG=$OLDXMLCONFIG
-}
-run_test 12 "lmc --batch, with single/double quote, backslash in batchfile"
-
-test_13a() { # was test_13
- OLDXMLCONFIG=$XMLCONFIG
- XMLCONFIG="conf13-1.xml"
-
- # check long uuid will be truncated properly and uniquely
- echo "To generate XML configuration file(with long ost name): $XMLCONFIG"
- [ -f "$XMLCONFIG" ] && rm -f $XMLCONFIG
- do_lmc --add net --node $HOSTNAME --nid $HOSTNAME --nettype tcp
- do_lmc --add mds --node $HOSTNAME --mds mds1_name_longer_than_31characters
- do_lmc --add mds --node $HOSTNAME --mds mds2_name_longer_than_31characters
- if [ ! -f "$XMLCONFIG" ]; then
- echo "Error:no file $XMLCONFIG created!"
- return 1
- fi
- EXPECTEDMDS1UUID="e_longer_than_31characters_UUID"
- EXPECTEDMDS2UUID="longer_than_31characters_UUID_2"
- FOUNDMDS1UUID=`awk -F"'" '/ $SECONDXMLCONFIG || return $?
- echo "Generate the second XML configuration file"
- gen_config
- # don't compare .xml mtime, it will always be different
- if [ `sed -e "s/mtime[^ ]*//" $XMLCONFIG | diff - $SECONDXMLCONFIG | wc -l` -eq 0 ]; then
- echo "Success:multiple invocations for lmc generate same XML file"
- else
- echo "Error: multiple invocations for lmc generate different XML file"
- return 1
- fi
-
- rm -f $XMLCONFIG $SECONDXMLCONFIG
- XMLCONFIG=$OLDXMLCONFIG
-}
-run_test 13b "check lmc generates consistent .xml file"
-
-test_14() {
- rm -f $XMLCONFIG
-
- # create xml file with --mkfsoptions for ost
- echo "create xml file with --mkfsoptions for ost"
- add_mds mds --dev $MDSDEV --size $MDSSIZE
- add_lov lov1 mds --stripe_sz $STRIPE_BYTES\
- --stripe_cnt $STRIPES_PER_OBJ --stripe_pattern 0
- add_ost ost --lov lov1 --dev $OSTDEV --size $OSTSIZE \
- --mkfsoptions "-Llabel_conf_14"
- add_client client mds --lov lov1 --path $MOUNT
-
- FOUNDSTRING=`awk -F"<" '//{print $2}' $XMLCONFIG`
- EXPECTEDSTRING="mkfsoptions>-Llabel_conf_14"
- if [ "$EXPECTEDSTRING" != "$FOUNDSTRING" ]; then
- echo "Error: expected: $EXPECTEDSTRING; found: $FOUNDSTRING"
- return 1
- fi
- echo "Success:mkfsoptions for ost written to xml file correctly."
-
- # mount lustre to test lconf mkfsoptions-parsing
- echo "mount lustre"
- start_ost
- start_mds
- mount_client $MOUNT || return $?
- if [ -z "`do_facet ost1 dumpe2fs -h $OSTDEV | grep label_conf_14`" ]; then
- echo "Error: the mkoptions not applied to mke2fs of ost."
- return 1
- fi
- cleanup
- echo "lconf mkfsoptions for ost success"
-
- gen_config
-}
-run_test 14 "test mkfsoptions of ost for lmc and lconf"
-
-cleanup_15() {
- trap 0
- [ -f $MOUNTLUSTRE ] && echo "remove $MOUNTLUSTRE" && rm -f $MOUNTLUSTRE
- if [ -f $MOUNTLUSTRE.sav ]; then
- echo "return original $MOUNTLUSTRE.sav to $MOUNTLUSTRE"
- mv $MOUNTLUSTRE.sav $MOUNTLUSTRE
- fi
-}
-
-# this only tests the kernel mount command, not anything about lustre.
-test_15() {
- MOUNTLUSTRE=${MOUNTLUSTRE:-/sbin/mount.lustre}
- start_ost
- start_mds
-
- echo "mount lustre on ${MOUNT} without $MOUNTLUSTRE....."
- if [ -f "$MOUNTLUSTRE" ]; then
- echo "save $MOUNTLUSTRE to $MOUNTLUSTRE.sav"
- mv $MOUNTLUSTRE $MOUNTLUSTRE.sav && trap cleanup_15 EXIT INT
- if [ -f $MOUNTLUSTRE ]; then
- skip "$MOUNTLUSTRE cannot be moved, skipping test"
- return 0
- fi
- fi
-
- mount_client $MOUNT && error "mount succeeded" && return 1
- echo "mount lustre on $MOUNT without $MOUNTLUSTRE failed as expected"
- cleanup_15
- cleanup || return $?
-}
-run_test 15 "zconf-mount without /sbin/mount.lustre (should return error)"
-
# LOGS/PENDING do not exist anymore since CMD3
test_16() {
- TMPMTPT="${TMP}/conf16"
-
+ local TMPMTPT="${TMP}/conf16"
+ local dev=${SINGLEMDS}_dev
+ local MDSDEV=${!dev}
if [ ! -e "$MDSDEV" ]; then
log "no $MDSDEV existing, so mount Lustre to create one"
setup
@@ -652,7 +398,7 @@ test_16() {
[ -f "$MDSDEV" ] && LOOPOPT="-o loop"
log "change the mode of $MDSDEV/OBJECTS to 555"
- do_facet mds "mkdir -p $TMPMTPT &&
+ do_facet $SINGLEMDS "mkdir -p $TMPMTPT &&
mount $LOOPOPT -t $FSTYPE $MDSDEV $TMPMTPT &&
chmod 555 $TMPMTPT/OBJECTS &&
umount $TMPMTPT" || return $?
@@ -663,7 +409,7 @@ test_16() {
cleanup || return $?
log "read the mode of OBJECTS and check if they has been changed properly"
- EXPECTEDOBJECTSMODE=`do_facet mds "debugfs -R 'stat OBJECTS' $MDSDEV 2> /dev/null" | grep 'Mode: ' | sed -e "s/.*Mode: *//" -e "s/ *Flags:.*//"`
+ EXPECTEDOBJECTSMODE=`do_facet $SINGLEMDS "debugfs -R 'stat OBJECTS' $MDSDEV 2> /dev/null" | grep 'Mode: ' | sed -e "s/.*Mode: *//" -e "s/ *Flags:.*//"`
if [ "$EXPECTEDOBJECTSMODE" = "0777" ]; then
log "Success:Lustre change the mode of OBJECTS correctly"
@@ -674,6 +420,9 @@ test_16() {
run_test 16 "verify that lustre will correct the mode of OBJECTS"
test_17() {
+ local dev=${SINGLEMDS}_dev
+ local MDSDEV=${!dev}
+
if [ ! -e "$MDSDEV" ]; then
echo "no $MDSDEV existing, so mount Lustre to create one"
setup
@@ -682,7 +431,7 @@ test_17() {
fi
echo "Remove mds config log"
- do_facet mds "debugfs -w -R 'unlink CONFIGS/$FSNAME-MDT0000' $MDSDEV || return \$?" || return $?
+ do_facet $SINGLEMDS "debugfs -w -R 'unlink CONFIGS/$FSNAME-MDT0000' $MDSDEV || return \$?" || return $?
start_ost
start_mds && return 42
@@ -693,6 +442,9 @@ run_test 17 "Verify failed mds_postsetup won't fail assertion (2936) (should ret
test_18() {
[ "$FSTYPE" != "ldiskfs" ] && skip "not needed for FSTYPE=$FSTYPE" && return
+ local dev=${SINGLEMDS}_dev
+ local MDSDEV=${!dev}
+
local MIN=2000000
local OK=
@@ -843,7 +595,7 @@ run_test 22 "start a client before osts (should return errs)"
test_23a() { # was test_23
setup
# fail mds
- stop mds
+ stop $SINGLEMDS
# force down client so that recovering mds waits for reconnect
local running=$(grep -c $MOUNT /proc/mounts) || true
if [ $running -ne 0 ]; then
@@ -958,7 +710,7 @@ test_24a() {
umount_client $MOUNT
# the MDS must remain up until last MDT
stop_mds
- MDS=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }')
+ MDS=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }' | head -1)
[ -z "$MDS" ] && error "No MDT" && return 8
cleanup_24a
cleanup_nocli || return 6
@@ -992,9 +744,9 @@ run_test 25 "Verify modules are referenced"
test_26() {
load_modules
# we need modules before mount for sysctl, so make sure...
- do_facet mds "lsmod | grep -q lustre || modprobe lustre"
+ do_facet $SINGLEMDS "lsmod | grep -q lustre || modprobe lustre"
#define OBD_FAIL_MDS_FS_SETUP 0x135
- do_facet mds "lctl set_param fail_loc=0x80000135"
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000135"
start_mds && echo MDS started && return 1
lctl get_param -n devices
DEVS=$(lctl get_param -n devices | wc -l)
@@ -1015,7 +767,7 @@ set_and_check() {
FINAL=$(($ORIG + 5))
fi
echo "Setting $PARAM from $ORIG to $FINAL"
- do_facet mds "$LCTL conf_param $PARAM=$FINAL" || error conf_param failed
+ do_facet $SINGLEMDS "$LCTL conf_param $PARAM=$FINAL" || error conf_param failed
local RESULT
local MAX=90
local WAIT=0
@@ -1046,10 +798,13 @@ test_27a() {
run_test 27a "Reacquire MGS lock if OST started first"
test_27b() {
+ # FIXME. ~grev
setup
- facet_failover mds
- set_and_check mds "lctl get_param -n mdt.$FSNAME-MDT0000.identity_acquire_expire" "$FSNAME-MDT0000.mdt.identity_acquire_expire" || return 3
- set_and_check client "lctl get_param -n mdc.$FSNAME-MDT0000-mdc-*.max_rpcs_in_flight" "$FSNAME-MDT0000.mdc.max_rpcs_in_flight" || return 4
+ local device=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }')
+
+ facet_failover $SINGLEMDS
+ set_and_check $SINGLEMDS "lctl get_param -n mdt.$device.identity_acquire_expire" "$device.mdt.identity_acquire_expire" || return 3
+ set_and_check client "lctl get_param -n mdc.$device-mdc-*.max_rpcs_in_flight" "$device.mdc.max_rpcs_in_flight" || return 4
check_mount
cleanup
}
@@ -1106,7 +861,7 @@ test_29() {
local WAIT=0
while [ 1 ]; do
sleep 5
- RESULT=`do_facet mds " lctl get_param -n $MPROC"`
+ RESULT=`do_facet $SINGLEMDS " lctl get_param -n $MPROC"`
[ ${PIPESTATUS[0]} = 0 ] || error "Can't read $MPROC"
if [ $RESULT -eq $DEAC ]; then
echo "MDT deactivated also after $WAIT sec (got $RESULT)"
@@ -1313,7 +1068,7 @@ test_33a() { # bug 12333, was test_33
start fs2mds $fs2mdsdev $MDS_MOUNT_OPTS && trap cleanup_24a EXIT INT
start fs2ost $fs2ostdev $OST_MOUNT_OPTS
- do_facet mds "$LCTL conf_param $FSNAME2.sys.timeout=200" || rc=1
+ do_facet $SINGLEMDS "$LCTL conf_param $FSNAME2.sys.timeout=200" || rc=1
mkdir -p $MOUNT2
mount -t lustre $MGSNID:/${FSNAME2} $MOUNT2 || rc=2
echo "ok."
@@ -1395,7 +1150,8 @@ test_35() { # bug 12459
log "Set up a fake failnode for the MDS"
FAKENID="127.0.0.2"
- do_facet mds $LCTL conf_param ${FSNAME}-MDT0000.failover.node=$FAKENID || return 4
+ local device=$(do_facet $SINGLEMDS "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }' | head -1)
+ do_facet $SINGLEMDS $LCTL conf_param ${device}.failover.node=$FAKENID || return 4
log "Wait for RECONNECT_INTERVAL seconds (10s)"
sleep 10
@@ -1419,7 +1175,7 @@ test_35() { # bug 12459
# contact after the connection loss
$LCTL dk $TMP/lustre-log-$TESTNAME.log
NEXTCONN=`awk "/${MSG}/ {start = 1;}
- /import_select_connection.*${FSNAME}-MDT0000-mdc.* using connection/ {
+ /import_select_connection.$device-mdc.* using connection/ {
if (start) {
if (\\\$NF ~ /$FAKENID/)
print \\\$NF;
@@ -1548,10 +1304,13 @@ test_38() { # bug 14222
stop_mds
log "rename lov_objid file on MDS"
rm -f $TMP/lov_objid.orig
- do_facet mds "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.orig\\\" $MDSDEV"
- do_facet mds "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV"
- do_facet mds "od -Ax -td8 $TMP/lov_objid.orig"
+ local dev=${SINGLEMDS}_dev
+ local MDSDEV=${!dev}
+ do_facet $SINGLEMDS "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.orig\\\" $MDSDEV"
+ do_facet $SINGLEMDS "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV"
+
+ do_facet $SINGLEMDS "od -Ax -td8 $TMP/lov_objid.orig"
# check create in mds_lov_connect
start_mds
mount_client $MOUNT
@@ -1559,17 +1318,17 @@ test_38() { # bug 14222
[ $V ] && log "verifying $DIR/$tdir/$f"
diff -q $f $DIR/$tdir/$f || ERROR=y
done
- do_facet mds "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new\\\" $MDSDEV"
- do_facet mds "od -Ax -td8 $TMP/lov_objid.new"
+ do_facet $SINGLEMDS "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new\\\" $MDSDEV"
+ do_facet $SINGLEMDS "od -Ax -td8 $TMP/lov_objid.new"
[ "$ERROR" = "y" ] && error "old and new files are different after connect" || true
# check it's updates in sync
umount_client $MOUNT
stop_mds
- do_facet mds dd if=/dev/zero of=$TMP/lov_objid.clear bs=4096 count=1
- do_facet mds "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV"
- do_facet mds "debugfs -w -R \\\"write $TMP/lov_objid.clear lov_objid\\\" $MDSDEV "
+ do_facet $SINGLEMDS dd if=/dev/zero of=$TMP/lov_objid.clear bs=4096 count=1
+ do_facet $SINGLEMDS "debugfs -w -R \\\"rm lov_objid\\\" $MDSDEV"
+ do_facet $SINGLEMDS "debugfs -w -R \\\"write $TMP/lov_objid.clear lov_objid\\\" $MDSDEV "
start_mds
mount_client $MOUNT
@@ -1577,8 +1336,8 @@ test_38() { # bug 14222
[ $V ] && log "verifying $DIR/$tdir/$f"
diff -q $f $DIR/$tdir/$f || ERROR=y
done
- do_facet mds "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new1\\\" $MDSDEV"
- do_facet mds "od -Ax -td8 $TMP/lov_objid.new1"
+ do_facet $SINGLEMDS "debugfs -c -R \\\"dump lov_objid $TMP/lov_objid.new1\\\" $MDSDEV"
+ do_facet $SINGLEMDS "od -Ax -td8 $TMP/lov_objid.new1"
umount_client $MOUNT
stop_mds
[ "$ERROR" = "y" ] && error "old and new files are different after sync" || true
@@ -1600,7 +1359,7 @@ run_test 39 "leak_finder recognizes both LUSTRE and LNET malloc messages"
test_40() { # bug 15759
start_ost
#define OBD_FAIL_TGT_TOOMANY_THREADS 0x706
- do_facet mds "sysctl -w lustre.fail_loc=0x80000706"
+ do_facet $SINGLEMDS "sysctl -w lustre.fail_loc=0x80000706"
start_mds
cleanup
}
@@ -1608,9 +1367,12 @@ run_test 40 "race during service thread startup"
test_41() { #bug 14134
local rc
- start mds $MDSDEV $MDS_MOUNT_OPTS -o nosvc -n
+ local dev=${SINGLEMDS}_dev
+ local MDSDEV=${!dev}
+
+ start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nosvc -n
start ost1 `ostdevname 1` $OST_MOUNT_OPTS
- start mds $MDSDEV $MDS_MOUNT_OPTS -o nomgs
+ start $SINGLEMDS $MDSDEV $MDS_MOUNT_OPTS -o nomgs
mkdir -p $MOUNT
mount_client $MOUNT || return 1
sleep 5
@@ -1620,8 +1382,8 @@ test_41() { #bug 14134
umount_client $MOUNT
stop ost1 -f || return 201
- stop mds -f || return 202
- stop mds -f || return 203
+ stop_mds -f || return 202
+ stop_mds -f || return 203
unload_modules || return 204
return $rc
}
diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh
index 9d95405..3f03f38 100755
--- a/lustre/tests/replay-single.sh
+++ b/lustre/tests/replay-single.sh
@@ -879,11 +879,11 @@ test_41() {
do_facet client dd if=/dev/zero of=$f bs=4k count=1 || return 3
cancel_lru_locks osc
# fail ost2 and read from ost1
- local osc2dev=`do_facet mds "lctl get_param -n devices | grep ${ost2_svc}-osc-MDT0000" | awk '{print $1}'`
+ local osc2dev=`do_facet $SINGLEMDS "lctl get_param -n devices | grep ${ost2_svc}-osc-MDT0000" | awk '{print $1}'`
[ -z "$osc2dev" ] && echo "OST: $ost2_svc" && lctl get_param -n devices && return 4
- do_facet mds $LCTL --device $osc2dev deactivate || return 1
+ do_facet $SINGLEMDS $LCTL --device $osc2dev deactivate || return 1
do_facet client dd if=$f of=/dev/null bs=4k count=1 || return 3
- do_facet mds $LCTL --device $osc2dev activate || return 2
+ do_facet $SINGLEMDS $LCTL --device $osc2dev activate || return 2
return 0
}
run_test 41 "read from a valid osc while other oscs are invalid"
@@ -1765,6 +1765,71 @@ test_70b () {
run_test 70b "mds recovery; $CLIENTCOUNT clients"
# end multi-client tests
+test_80a() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+
+ mkdir -p $DIR/$tdir
+ replay_barrier mds2
+ $CHECKSTAT -t dir $DIR/$tdir || error "$CHECKSTAT -t dir $DIR/$tdir failed"
+ rmdir $DIR/$tdir || error "rmdir $DIR/$tdir failed"
+ fail mds2
+ stat $DIR/$tdir
+}
+run_test 80a "CMD: unlink cross-node dir (fail mds with inode)"
+
+test_80b() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+
+ mkdir -p $DIR/$tdir
+ replay_barrier mds1
+ $CHECKSTAT -t dir $DIR/$tdir || error "$CHECKSTAT -t dir $DIR/$tdir failed"
+ rmdir $DIR/$tdir || error "rmdir $DIR/$tdir failed"
+ fail mds1
+ stat $DIR/$tdir
+}
+run_test 80b "CMD: unlink cross-node dir (fail mds with name)"
+
+test_81a() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+
+ mkdir -p $DIR/$tdir
+ createmany -o $DIR/$tdir/f 3000 || error "createmany failed"
+ sleep 10
+ $CHECKSTAT -t dir $DIR/$tdir || error "$CHECKSTAT -t dir failed"
+ $CHECKSTAT -t file $DIR/$tdir/f1002 || error "$CHECKSTAT -t file failed"
+ replay_barrier mds1
+ rm $DIR/$tdir/f1002 || error "rm $DIR/$tdir/f1002 failed"
+ fail mds1
+ stat $DIR/$tdir/f1002
+}
+run_test 81a "CMD: unlink cross-node file (fail mds with name)"
+
+test_82a() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+
+ local dir=$DIR/d82a
+ replay_barrier mds2
+ mkdir $dir || error "mkdir $dir failed"
+ log "FAILOVER mds2"
+ fail mds2
+ stat $DIR
+ $CHECKSTAT -t dir $dir || error "$CHECKSTAT -t dir $dir failed"
+}
+run_test 82a "CMD: mkdir cross-node dir (fail mds with inode)"
+
+test_82b() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+
+ local dir=$DIR/d82b
+ replay_barrier mds1
+ mkdir $dir || error "mkdir $dir failed"
+ log "FAILOVER mds1"
+ fail mds1
+ stat $DIR
+ $CHECKSTAT -t dir $dir || error "$CHECKSTAT -t dir $dir failed"
+}
+run_test 82b "CMD: mkdir cross-node dir (fail mds with name)"
+
equals_msg `basename $0`: test complete, cleaning up
check_and_cleanup_lustre
[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true
diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh
index e9ec28b..aeeb8c8 100644
--- a/lustre/tests/sanity.sh
+++ b/lustre/tests/sanity.sh
@@ -1126,14 +1126,32 @@ test_29() {
touch $DIR/d29/foo
log 'first d29'
ls -l $DIR/d29
- LOCKCOUNTORIG=`lctl get_param -n ldlm.namespaces.*mdc*.lock_count`
- LOCKUNUSEDCOUNTORIG=`lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count`
- [ -z $"LOCKCOUNTORIG" ] && echo "No mdc lock count" && return 1
+
+ declare -i LOCKCOUNTORIG=0
+ for lock_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_count); do
+ let LOCKCOUNTORIG=$LOCKCOUNTORIG+$lock_count
+ done
+ [ $LOCKCOUNTORIG -eq 0 ] && echo "No mdc lock count" && return 1
+
+ declare -i LOCKUNUSEDCOUNTORIG=0
+ for unused_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count); do
+ let LOCKUNUSEDCOUNTORIG=$LOCKUNUSEDCOUNTORIG+$unused_count
+ done
+
log 'second d29'
ls -l $DIR/d29
log 'done'
- LOCKCOUNTCURRENT=`lctl get_param -n ldlm.namespaces.*mdc*.lock_count`
- LOCKUNUSEDCOUNTCURRENT=`lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count`
+
+ declare -i LOCKCOUNTCURRENT=0
+ for lock_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_count); do
+ let LOCKCOUNTCURRENT=$LOCKCOUNTCURRENT+$lock_count
+ done
+
+ declare -i LOCKUNUSEDCOUNTCURRENT=0
+ for unused_count in $(lctl get_param -n ldlm.namespaces.*mdc*.lock_unused_count); do
+ let LOCKUNUSEDCOUNTCURRENT=$LOCKUNUSEDCOUNTCURRENT+$unused_count
+ done
+
if [ "$LOCKCOUNTCURRENT" -gt "$LOCKCOUNTORIG" ]; then
lctl set_param -n ldlm.dump_namespaces ""
error "CURRENT: $LOCKCOUNTCURRENT > $LOCKCOUNTORIG"
@@ -2180,6 +2198,42 @@ test_51b() {
}
run_test 51b "mkdir .../t-0 --- .../t-$NUMTEST ===================="
+test_51bb() {
+ [ -z "$CLIENTS" ] && skip "needs >= 2 CLIENTS" && return
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+
+ NUMFREE=`df -i -P $DIR | tail -n 1 | awk '{ print $4 }'`
+ [ $NUMFREE -lt 21000 ] && \
+ skip "not enough free inodes ($NUMFREE)" && \
+ return
+
+ check_kernel_version 40 || NUMTEST=31000
+ [ $NUMFREE -lt $NUMTEST ] && NUMTEST=$(($NUMFREE - 50))
+
+ mkdir -p $DIR/d51bb
+
+ IUSED=$(lfs df -i $DIR | grep MDT | awk '{print $3}')
+ OLDUSED=($IUSED)
+
+ do_nodes $CLIENTS "mkdir -p $DIR/\$(hostname)"
+
+ ls $DIR
+
+ do_nodes $CLIENTS "createmany -d $DIR/\$(hostname)/t- $NUMTEST"
+ IUSED=$(lfs df -i $DIR | grep MDT | awk '{print $3}')
+ NEWUSED=($IUSED)
+
+ local rc=0
+ for ((i=0; i<${#NEWUSED[@]}; i++)); do
+ echo "mds $i: inodes count OLD ${OLDUSED[$i]} NEW ${NEWUSED[$i]}"
+ [ ${OLDUSED[$i]} -lt ${NEWUSED[$i]} ] || rc=1
+ done
+
+ [ $rc -ne 0 ] && error "no CMD functionality!"
+}
+run_test 51bb "mkdir .../t-0 --- .../t-$NUMTEST (CMD) ===================="
+
+
test_51c() {
[ ! -d $DIR/d51b ] && skip "$DIR/51b missing" && \
return
@@ -2261,7 +2315,7 @@ test_53() {
for value in `lctl get_param osc.*-osc-MDT0000.prealloc_last_id` ; do
param=`echo ${value[0]} | cut -d "=" -f1`
ostname=`echo $param | cut -d "." -f2 | cut -d - -f 1-2`
- ost_last=`lctl get_param -n obdfilter.$ostname.last_id`
+ ost_last=`lctl get_param -n obdfilter.$ostname.last_id | head -n 1`
mds_last=`lctl get_param -n $param`
echo "$ostname.last_id=$ost_last ; MDS.last_id=$mds_last"
if [ $ost_last != $mds_last ]; then
@@ -2639,7 +2693,7 @@ test_57b() {
$GETSTRIPE $FILE1 2>&1 | grep -q "no stripe" || error "$FILE1 has an EA"
$GETSTRIPE $FILEN 2>&1 | grep -q "no stripe" || error "$FILEN has an EA"
- MDSFREE="`lctl get_param -n osd.*MDT*.kbytesfree 2> /dev/null`"
+ MDSFREE="`lctl get_param -n osd.*MDT0000.kbytesfree 2> /dev/null`"
MDCFREE="`lctl get_param -n mdc.*.kbytesfree | head -n 1`"
echo "opening files to create objects/EAs"
for FILE in `seq -f $DIR/d57b/f%g 1 $FILECOUNT`; do
@@ -2906,19 +2960,21 @@ test_65k() { # bug11679
remote_mds_nodsh && skip "remote MDS" && return
echo "Check OST status: "
- MDS_OSCS=`do_facet mds lctl dl | awk '/[oO][sS][cC].*md[ts]/ { print $4 }'`
+ MDS_OSCS=`do_facet $SINGLEMDS lctl dl | awk '/[oO][sS][cC].*md[ts]/ { print $4 }'`
for OSC in $MDS_OSCS; do
echo $OSC "is activate"
- do_facet mds lctl --device %$OSC activate
+ do_facet $SINGLEMDS lctl --device %$OSC activate
done
do_facet client mkdir -p $DIR/$tdir
for INACTIVE_OSC in $MDS_OSCS; do
echo $INACTIVE_OSC "is Deactivate:"
- do_facet mds lctl --device %$INACTIVE_OSC deactivate
+ do_facet $SINGLEMDS lctl --device %$INACTIVE_OSC deactivate
for STRIPE_OSC in $MDS_OSCS; do
STRIPE_OST=`osc_to_ost $STRIPE_OSC`
- STRIPE_INDEX=`do_facet mds lctl get_param -n lov.*md*.target_obd |
- grep $STRIPE_OST | awk -F: '{print $1}'`
+ STRIPE_INDEX=`do_facet $SINGLEMDS lctl get_param -n lov.*md*.target_obd |
+ grep $STRIPE_OST | awk -F: '{print $1}' | head -n 1`
+
+ [ -f $DIR/$tdir/${STRIPE_INDEX} ] && continue
echo "$SETSTRIPE $DIR/$tdir/${STRIPE_INDEX} -i ${STRIPE_INDEX} -c 1"
do_facet client $SETSTRIPE $DIR/$tdir/${STRIPE_INDEX} -i ${STRIPE_INDEX} -c 1
RC=$?
@@ -2926,7 +2982,7 @@ test_65k() { # bug11679
done
do_facet client rm -f $DIR/$tdir/*
echo $INACTIVE_OSC "is Activate."
- do_facet mds lctl --device %$INACTIVE_OSC activate
+ do_facet $SINGLEMDS lctl --device %$INACTIVE_OSC activate
done
}
run_test 65k "validate manual striping works properly with deactivated OSCs"
@@ -5111,16 +5167,27 @@ test_124a() {
}
run_test 124a "lru resize ======================================="
+get_max_pool_limit()
+{
+ local limit=`lctl get_param -n ldlm.namespaces.*-MDT0000-mdc-*.pool.limit`
+ local max=0
+ for l in $limit; do
+ if test $l -gt $max; then
+ max=$l
+ fi
+ done
+ echo $max
+}
+
test_124b() {
[ -z "`lctl get_param -n mdc.*.connect_flags | grep lru_resize`" ] && \
skip "no lru resize on server" && return 0
- # even for cmd no matter what metadata namespace to use for getting
- # the limit, we use appropriate.
- LIMIT=`lctl get_param -n ldlm.namespaces.*mdc*.pool.limit`
+ LIMIT=`get_max_pool_limit`
NR=$(($(default_lru_size)*20))
if [ $NR -gt $LIMIT ]; then
+ log "Limit lock number by $LIMIT locks"
NR=$LIMIT
fi
lru_resize_disable mdc
@@ -5252,42 +5319,52 @@ test_128() { # bug 15212
}
run_test 128 "interactive lfs for 2 consecutive find's"
+set_dir_limits () {
+ local mntdev
+ local node
+
+ local LDPROC=/proc/fs/ldiskfs
+
+ for node in $(mdts_nodes); do
+ devs=$(do_node $node "lctl get_param -n devices" | awk '($3 ~ "mdt" && $4 ~ "MDT") { print $4 }')
+ for dev in $devs; do
+ mntdev=$(do_node $node "lctl get_param -n osd.$dev.mntdev")
+ do_node $node "echo $1 >$LDPROC/\\\$(basename $mntdev)/max_dir_size"
+ done
+ done
+}
test_129() {
[ "$FSTYPE" != "ldiskfs" ] && skip "not needed for FSTYPE=$FSTYPE" && return 0
- DEV=$(basename $(do_facet mds lctl get_param -n osd.*MDT*.mntdev))
- [ -z "$DEV" ] && error "can't access mds mntdev"
EFBIG=27
- LDPROC=/proc/fs/ldiskfs/$DEV/max_dir_size
MAX=16384
- do_facet mds "echo $MAX > $LDPROC"
+ set_dir_limits $MAX
mkdir -p $DIR/$tdir
I=0
J=0
- while [ ! $I -gt $MAX ]; do
+ while [ ! $I -gt $((MAX * MDSCOUNT)) ]; do
multiop $DIR/$tdir/$J Oc
rc=$?
if [ $rc -eq $EFBIG ]; then
- do_facet mds "echo 0 >$LDPROC"
+ set_dir_limits 0
echo "return code $rc received as expected"
return 0
elif [ $rc -ne 0 ]; then
- do_facet mds "echo 0 >$LDPROC"
+ set_dir_limits 0
error_exit "return code $rc received instead of expected $EFBIG"
fi
J=$((J+1))
I=$(stat -c%s "$DIR/$tdir")
done
- error "exceeded dir size limit: $I bytes"
- do_facet mds "echo 0 >$LDPROC"
+ error "exceeded dir size limit $MAX x $MDSCOUNT $((MAX * MDSCOUNT)) : $I bytes"
+ do_facet $SINGLEMDS "echo 0 >$LDPROC"
}
run_test 129 "test directory size limit ========================"
-
test_130a() {
filefrag_op=$(filefrag -e 2>&1 | grep "invalid option")
[ -n "$filefrag_op" ] && skip "filefrag does not support FIEMAP" && return
diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh
index 3b289e2..9480fff 100644
--- a/lustre/tests/test-framework.sh
+++ b/lustre/tests/test-framework.sh
@@ -461,9 +461,9 @@ mount_facet() {
# start facet device options
start() {
- facet=$1
+ local facet=$1
shift
- device=$1
+ local device=$1
shift
eval export ${facet}_dev=${device}
eval export ${facet}_opt=\"$@\"
@@ -475,7 +475,7 @@ start() {
stop() {
local running
- facet=$1
+ local facet=$1
shift
HOST=`facet_active_host $facet`
[ -z $HOST ] && echo stop: no host for $facet && return 0
@@ -642,14 +642,14 @@ wait_for() {
}
wait_mds_recovery_done () {
- local timeout=`do_facet mds lctl get_param -n timeout`
+ local timeout=`do_facet $SINGLEMDS lctl get_param -n timeout`
#define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 / 2)
# as we are in process of changing obd_timeout in different ways
# let's set MAX longer than that
MAX=$(( timeout * 4 ))
WAIT=0
while [ $WAIT -lt $MAX ]; do
- STATUS=`do_facet $SINGLEMDS "lctl get_param -n mdt.*-MDT*.recovery_status | grep status"`
+ STATUS=`do_facet $SINGLEMDS "lctl get_param -n mdt.*-MDT0000.recovery_status | grep status"`
echo $STATUS | grep COMPLETE && return 0
sleep 5
WAIT=$((WAIT + 5))
@@ -1048,6 +1048,7 @@ formatall() {
if [ ! -z $SEC ]; then
MDS_MKFS_OPTS="$MDS_MKFS_OPTS --param srpc.flavor.default=$SEC"
+ MDSn_MKFS_OPTS="$MDSn_MKFS_OPTS --param srpc.flavor.default=$SEC"
OST_MKFS_OPTS="$OST_MKFS_OPTS --param srpc.flavor.default=$SEC"
fi
@@ -1275,7 +1276,7 @@ absolute_path() {
at_is_valid() {
if [ -z "$AT_MAX_PATH" ]; then
- AT_MAX_PATH=$(do_facet mds "find /sys/ -name at_max")
+ AT_MAX_PATH=$(do_facet $SINGLEMDS "find /sys/ -name at_max")
[ -z "$AT_MAX_PATH" ] && echo "missing /sys/.../at_max " && return 1
fi
return 0
@@ -1285,7 +1286,7 @@ at_is_enabled() {
at_is_valid || error "invalid call"
# only check mds, we assume at_max is the same on all nodes
- local at_max=$(do_facet mds "cat $AT_MAX_PATH")
+ local at_max=$(do_facet $SINGLEMDS "cat $AT_MAX_PATH")
if [ $at_max -eq 0 ]; then
return 1
else
@@ -1334,27 +1335,27 @@ at_max_set() {
drop_request() {
# OBD_FAIL_MDS_ALL_REQUEST_NET
RC=0
- do_facet mds lctl set_param fail_loc=0x123
+ do_facet $SINGLEMDS lctl set_param fail_loc=0x123
do_facet client "$1" || RC=$?
- do_facet mds lctl set_param fail_loc=0
+ do_facet $SINGLEMDS lctl set_param fail_loc=0
return $RC
}
drop_reply() {
# OBD_FAIL_MDS_ALL_REPLY_NET
RC=0
- do_facet mds lctl set_param fail_loc=0x122
+ do_facet $SINGLEMDS lctl set_param fail_loc=0x122
do_facet client "$@" || RC=$?
- do_facet mds lctl set_param fail_loc=0
+ do_facet $SINGLEMDS lctl set_param fail_loc=0
return $RC
}
drop_reint_reply() {
# OBD_FAIL_MDS_REINT_NET_REP
RC=0
- do_facet mds lctl set_param fail_loc=0x119
+ do_facet $SINGLEMDS lctl set_param fail_loc=0x119
do_facet client "$@" || RC=$?
- do_facet mds lctl set_param fail_loc=0
+ do_facet $SINGLEMDS lctl set_param fail_loc=0
return $RC
}
@@ -1389,9 +1390,9 @@ drop_bl_callback() {
drop_ldlm_reply() {
#define OBD_FAIL_LDLM_REPLY 0x30c
RC=0
- do_facet mds lctl set_param fail_loc=0x30c
+ do_facet $SINGLEMDS lctl set_param fail_loc=0x30c
do_facet client "$@" || RC=$?
- do_facet mds lctl set_param fail_loc=0
+ do_facet $SINGLEMDS lctl set_param fail_loc=0
return $RC
}
@@ -1627,8 +1628,8 @@ pass() {
}
check_mds() {
- FFREE=`lctl get_param -n osd.*MDT*.filesfree`
- FTOTAL=`lctl get_param -n osd.*MDT*.filestotal`
+ FFREE=$(do_node $SINGLEMDS lctl get_param -n osd.*MDT*.filesfree | awk 'BEGIN{avail=0}; {avail+=$1}; END{print avail}')
+ FTOTAL=$(do_node $SINGLEMDS lctl get_param -n osd.*MDT*.filestotal | awk 'BEGIN{avail=0}; {avail+=$1}; END{print avail}')
[ $FFREE -ge $FTOTAL ] && error "files free $FFREE > total $FTOTAL" || true
}
@@ -1754,17 +1755,10 @@ remote_ost_nodsh()
}
mdts_nodes () {
- local MDSNODES=$(facet_host $SINGLEMDS)
+ local MDSNODES
local NODES_sort
-
- # FIXME: Currenly we use only $SINGLEMDS,
- # should be fixed when we will start to test cmd.
- echo $MDSNODES
- return
-
for num in `seq $MDSCOUNT`; do
- local myMDS=$(facet_host mds$num)
- MDSNODES="$MDSNODES $myMDS"
+ MDSNODES="$MDSNODES $(facet_host mds$num)"
done
NODES_sort=$(for i in $MDSNODES; do echo $i; done | sort -u)