RETURN(0);
}
- /* MA_INODE is needed to check inode size. */
- memset(ma, 0, sizeof(*ma));
+ /*
+ * MA_INODE is needed to check inode size.
+ * Memory is prepared by caller.
+ */
ma->ma_need = MA_INODE | MA_LMV;
rc = mo_attr_get(env, mo, ma);
if (rc)
RETURN(0);
}
-#define cmm_md_size(stripes) \
- (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
-
struct cmm_object *cmm_object_find(const struct lu_env *env,
struct cmm_device *d,
const struct lu_fid *f)
struct md_object *mo,
struct md_attr *ma)
{
- struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
- struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
- struct mdc_device *mc, *tmp;
- int lmv_size, i = 1, rc = 0;
+ struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+ struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
+ struct lmv_stripe_md *lmv;
+ struct lmv_stripe_md *slave_lmv = NULL;
+ struct mdc_device *mc, *tmp;
+ int i = 1, rc = 0;
ENTRY;
- lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
-
- /* This lmv will free after finish splitting. */
- OBD_ALLOC(lmv, lmv_size);
- if (!lmv)
- RETURN(-ENOMEM);
-
+ lmv = ma->ma_lmv;
lmv->mea_master = cmm->cmm_local_num;
lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
lmv->mea_count = cmm->cmm_tgt_count + 1;
lmv->mea_ids[0] = *lf;
OBD_ALLOC_PTR(slave_lmv);
- if (!slave_lmv)
- GOTO(cleanup, rc = -ENOMEM);
+ if (slave_lmv == NULL)
+ RETURN(-ENOMEM);
slave_lmv->mea_master = cmm->cmm_local_num;
slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
i++;
}
- ma->ma_lmv_size = lmv_size;
- ma->ma_lmv = lmv;
EXIT;
cleanup:
- if (slave_lmv)
- OBD_FREE_PTR(slave_lmv);
- if (rc && lmv) {
- OBD_FREE(lmv, lmv_size);
- ma->ma_lmv = NULL;
- ma->ma_lmv_size = 0;
- }
+ OBD_FREE_PTR(slave_lmv);
return rc;
}
kunmap(rdpg->rp_pages[0]);
rc = mo_readpage(env, md_object_next(mo), rdpg);
- if (rc)
+ if (rc) {
+ CERROR("Error in readpage: %d\n", rc);
RETURN(rc);
+ }
/* Remove the old entries */
rc = cmm_remove_entries(env, mo, rdpg, end, &len);
- if (rc)
+ if (rc) {
+ CERROR("Error in remove entry: %d\n", rc);
RETURN(rc);
+ }
/* Send page to slave object */
if (len > 0) {
rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
- if (rc)
+ if (rc) {
+ CERROR("Error in sending pages: %d\n", rc);
RETURN(rc);
+ }
}
kmap(rdpg->rp_pages[0]);
rdpg->rp_hash = i * hash_segement;
hash_end = rdpg->rp_hash + hash_segement;
rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
- if (rc)
+ if (rc) {
+ CERROR("Error (rc=%d) while splitting for %d: fid="
+ DFID", %08x:%08x\n", rc, i, PFID(lf),
+ rdpg->rp_hash, hash_end);
GOTO(cleanup, rc);
+ }
}
EXIT;
cleanup:
return rc;
}
+#define cmm_md_size(stripes) \
+ (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
+
int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
- struct lu_buf *buf;
- int rc = 0, split;
+ struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
+ struct lu_buf *buf;
+ int rc = 0, split, lmv_size;
ENTRY;
LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
+ memset(ma, 0, sizeof(*ma));
+ lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
+
+ /*
+ * Preparing memory for LMV. This will be freed after finish splitting.
+ */
+ OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
+ if (ma->ma_lmv == NULL)
+ RETURN(-ENOMEM);
+
/* Step1: Checking whether the dir needs to be split. */
rc = cmm_expect_splitting(env, mo, ma, &split);
if (rc)
- RETURN(rc);
+ GOTO(cleanup, rc);
if (split != CMM_EXPECT_SPLIT)
- RETURN(0);
+ GOTO(cleanup, rc = 0);
LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if "
"dir is protected by MDL_EX lock. Lock mode 0x%x\n",
rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
if (rc) {
CERROR("Can't disable trans for split, rc %d\n", rc);
- RETURN(rc);
+ GOTO(cleanup, rc);
}
/* Step2: Create slave objects (on slave MDTs) */
+ ma->ma_valid = 0;
+ ma->ma_lmv_size = lmv_size;
rc = cmm_slaves_create(env, mo, ma);
if (rc) {
CERROR("Can't create slaves for split, rc %d\n", rc);
}
EXIT;
cleanup:
- if (ma->ma_lmv_size && ma->ma_lmv)
- OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
-
+ OBD_FREE(ma->ma_lmv, lmv_size);
return rc;
}
int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
struct lov_mds_md *lmm, int lmm_size)
{
- struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
- struct obd_device *obd = mdd2obd_dev(mdd);
- struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
- struct dt_object *next = mdd_object_child(obj);
- __u32 seq = lu_object_fid(mdd2lu_obj(obj))->f_seq;
- __u32 oid = lu_object_fid(mdd2lu_obj(obj))->f_oid;
- struct obd_capa *oc;
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ struct obd_device *obd = mdd2obd_dev(mdd);
+ struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
+ struct dt_object *next = mdd_object_child(obj);
+ const struct lu_fid *fid = lu_object_fid(mdd2lu_obj(obj));
+ struct obd_capa *oc;
int rc = 0;
ENTRY;
+ mdd_read_lock(env, obj);
rc = next->do_ops->do_attr_get(env, next, tmp_la,
mdd_object_capa(env, obj));
+ mdd_read_unlock(env, obj);
if (rc)
RETURN(rc);
oc = next->do_ops->do_capa_get(env, next, NULL, CAPA_OPC_MDS_DEFAULT);
if (IS_ERR(oc))
oc = NULL;
+
+ /*
+ * Wangdi: please fix this. OST will oops if this is called.
+ */
+/*
rc = mds_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
- lmm_size, NULL, seq, oid, oc);
+ lmm_size, NULL, fid_seq(fid), fid_oid(fid), oc);
+*/
capa_put(oc);
RETURN(rc);
if (ma->ma_valid & MA_LOV)
RETURN(0);
- LASSERT(ma->ma_lmm != NULL && ma->ma_lmm_size > 0);
lmm_size = ma->ma_lmm_size;
rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &lmm_size,
MDS_LOV_MD_NAME);
struct mdd_device *mdd = mdo2mdd(obj);
struct thandle *handle;
struct lov_mds_md *lmm = NULL;
- int rc = 0, lmm_size = 0, max_size = 0;
+ int rc, lmm_size = 0, max_size = 0;
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
ENTRY;
ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
max_size = mdd_lov_mdsize(env, mdd);
OBD_ALLOC(lmm, max_size);
+ lmm_size = max_size;
if (lmm == NULL)
GOTO(cleanup, rc = -ENOMEM);
}
cleanup:
mdd_trans_stop(env, mdd, rc, handle);
- if (rc == 0 && lmm_size) {
+ if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
/*set obd attr, if needed*/
rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size);
}