CDEBUG(D_OTHER, "forward to MDS #%u ("DFID3")\n",
mds, PFID3(&rpid));
- rpid = obj->lo_objs[mds].li_fid;
+ rpid = obj->lo_inodes[mds].li_fid;
lmv_obj_put(obj);
}
/* in fact, we need not this with current intent_lock(),
* but it may change some day */
if (!lu_fid_eq(pid, cid)){
- rpid = obj->lo_objs[mds].li_fid;
+ rpid = obj->lo_inodes[mds].li_fid;
mds = lmv_fld_lookup(obd, &rpid);
}
lmv_obj_put(obj);
/* directory is already splitted. calculate mds */
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)name, len);
- rpid = obj->lo_objs[mds].li_fid;
+ rpid = obj->lo_inodes[mds].li_fid;
mds = lmv_fld_lookup(obd, &rpid);
lmv_obj_put(obj);
RETURN(rc);
}
-void lmv_update_body_from_obj(struct mdt_body *body, struct lmv_inode *obj)
+void lmv_update_body(struct mdt_body *body, struct lmv_inode *lino)
{
/* update size */
- body->size += obj->li_size;
+ body->size += lino->li_size;
}
/* this is not used currently */
lmv_obj_lock(obj);
for (i = 0; i < obj->lo_objcount; i++) {
- struct lu_fid fid = obj->lo_objs[i].li_fid;
+ struct lu_fid fid = obj->lo_inodes[i].li_fid;
struct md_op_data op_data = { { 0 } };
struct ptlrpc_request *req = NULL;
struct lookup_intent it;
mds = lmv_fld_lookup(obd, &fid);
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, &op_data,
NULL, 0, &it, 0, &req,
- lmv_dirobj_blocking_ast, 0);
+ lmv_blocking_ast, 0);
lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
if (rc > 0 && req == NULL) {
body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
LASSERT(body2);
- obj->lo_objs[i].li_size = body2->size;
+ obj->lo_inodes[i].li_size = body2->size;
CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long)obj->lo_objs[i].li_size);
+ (unsigned long)obj->lo_inodes[i].li_size);
LDLM_LOCK_PUT(lock);
if (req)
ptlrpc_req_finished(req);
release_lock:
- lmv_update_body_from_obj(body, obj->lo_objs + i);
+ lmv_update_body(body, obj->lo_inodes + i);
if (it.d.lustre.it_lock_mode)
ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
if (obj) {
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)name, len);
- rpid = obj->lo_objs[mds].li_fid;
+ rpid = obj->lo_inodes[mds].li_fid;
lmv_obj_put(obj);
}
mds = lmv_fld_lookup(obd, &rpid);
/* directory is already splitted. calculate mds */
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)name, len);
- rpid = obj->lo_objs[mds].li_fid;
+ rpid = obj->lo_inodes[mds].li_fid;
mds = lmv_fld_lookup(obd, &rpid);
}
lmv_obj_put(obj);
lmv_obj_lock(obj);
for (i = 0; i < obj->lo_objcount; i++) {
- struct lu_fid fid = obj->lo_objs[i].li_fid;
+ struct lu_fid fid = obj->lo_inodes[i].li_fid;
struct md_op_data op_data = { { 0 } };
struct lustre_handle *lockh = NULL;
struct ptlrpc_request *req = NULL;
memset(&it, 0, sizeof(it));
it.it_op = IT_GETATTR;
- cb = lmv_dirobj_blocking_ast;
+ cb = lmv_blocking_ast;
if (lu_fid_eq(&fid, &obj->lo_fid)) {
if (master_valid) {
LASSERT(body);
update:
- obj->lo_objs[i].li_size = body->size;
+ obj->lo_inodes[i].li_size = body->size;
CDEBUG(D_OTHER, "fresh: %lu\n",
- (unsigned long)obj->lo_objs[i].li_size);
+ (unsigned long)obj->lo_inodes[i].li_size);
if (req)
ptlrpc_req_finished(req);
release_lock:
- size += obj->lo_objs[i].li_size;
+ size += obj->lo_inodes[i].li_size;
if (it.d.lustre.it_lock_mode)
ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
}
/* skip master obj. */
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_objs[i].li_fid))
+ if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
continue;
- body->size += obj->lo_objs[i].li_size;
+ body->size += obj->lo_inodes[i].li_size;
}
lmv_obj_unlock(obj);
if (obj) {
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
op_data->name, op_data->namelen);
- op_data->fid1 = obj->lo_objs[mds].li_fid;
+ op_data->fid1 = obj->lo_inodes[mds].li_fid;
lmv_obj_put(obj);
}
* name */
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)op_data->name, op_data->namelen);
- op_data->fid1 = obj->lo_objs[mds].li_fid;
+ op_data->fid1 = obj->lo_inodes[mds].li_fid;
lmv_obj_put(obj);
}
}
/* directory is splitted. look for right mds for this name */
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
filename, namelen - 1);
- rid = obj->lo_objs[mds].li_fid;
+ rid = obj->lo_inodes[mds].li_fid;
lmv_obj_put(obj);
}
if (obj) {
rc = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
op_data->name, op_data->namelen);
- op_data->fid2 = obj->lo_objs[rc].li_fid;
+ op_data->fid2 = obj->lo_inodes[rc].li_fid;
lmv_obj_put(obj);
}
if (obj) {
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)new, newlen);
- op_data->fid2 = obj->lo_objs[mds].li_fid;
+ op_data->fid2 = obj->lo_inodes[mds].li_fid;
CDEBUG(D_OTHER, "forward to MDS #%u ("DFID3")\n", mds,
PFID3(&op_data->fid2));
lmv_obj_put(obj);
*/
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)old, oldlen);
- op_data->fid1 = obj->lo_objs[mds].li_fid;
+ op_data->fid1 = obj->lo_inodes[mds].li_fid;
CDEBUG(D_OTHER, "forward to MDS #%u ("DFID3")\n", mds,
PFID3(&op_data->fid1));
lmv_obj_put(obj);
mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)new, newlen);
- op_data->fid2 = obj->lo_objs[mds].li_fid;
+ op_data->fid2 = obj->lo_inodes[mds].li_fid;
CDEBUG(D_OTHER, "forward to MDS #%u ("DFID3")\n", mds,
PFID3(&op_data->fid2));
lmv_obj_put(obj);
if (obj) {
for (i = 0; i < obj->lo_objcount; i++) {
- op_data->fid1 = obj->lo_objs[i].li_fid;
+ op_data->fid1 = obj->lo_inodes[i].li_fid;
mds = lmv_fld_lookup(obd, &op_data->fid1);
rc = md_setattr(lmv->tgts[mds].ltd_exp,
op_data, iattr, ea, ealen, ea2,
ea2len, &req);
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_objs[i].li_fid)) {
+ if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
/*
* this is master object and this request should
* be returned back to llite.
RETURN(rc);
}
-int lmv_dirobj_blocking_ast(struct ldlm_lock *lock,
- struct ldlm_lock_desc *desc,
- void *data, int flag)
+/* main purpose of LMV blocking ast is to remove splitted directory
+ * LMV presentation object (struct lmv_obj) attached to the lock
+ * being revoked. */
+int lmv_blocking_ast(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc,
+ void *data, int flag)
{
struct lustre_handle lockh;
struct lmv_obj *obj;
/* find dirobj containing page with requested offset. */
for (i = 0; i < obj->lo_objcount; i++) {
- if (offset < obj->lo_objs[i].li_size)
+ if (offset < obj->lo_inodes[i].li_size)
break;
- offset -= obj->lo_objs[i].li_size;
+ offset -= obj->lo_inodes[i].li_size;
}
- rid = obj->lo_objs[i].li_fid;
+ rid = obj->lo_inodes[i].li_fid;
lmv_obj_unlock(obj);
lmv_obj_put(obj);
if (obj) {
i = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
op_data->name, op_data->namelen);
- op_data->fid1 = obj->lo_objs[i].li_fid;
+ op_data->fid1 = obj->lo_inodes[i].li_fid;
lmv_obj_put(obj);
}
CDEBUG(D_OTHER, "unlink '%*s' in "DFID3" -> %u\n",
RETURN(rc);
}
-static int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md **ea,
- struct obd_trans_info *oti)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lov_stripe_md obj_md;
- struct lov_stripe_md *obj_mdp = &obj_md;
- int rc = 0;
- ENTRY;
-
- LASSERT(ea == NULL);
- LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
-
- rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp,
- oa, &obj_mdp, oti);
-
- RETURN(rc);
-}
-
-/*
- * to be called from MDS only. @oa should have correct store cookie and o_fid
- * values for "master" object, as it will be used.
- */
-int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_stripe_md *mea;
- struct lu_fid mid;
- int i, c, rc = 0;
- ENTRY;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- LASSERT(oa != NULL);
-
- if (ea == NULL) {
- rc = lmv_obd_create_single(exp, oa, NULL, oti);
- if (rc)
- CERROR("Can't create object, rc = %d\n", rc);
- RETURN(rc);
- }
-
- if (*ea == NULL) {
- rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
- if (rc < 0) {
- CERROR("obd_alloc_diskmd() failed, error %d\n",
- rc);
- RETURN(rc);
- } else
- rc = 0;
-
- if (*ea == NULL)
- RETURN(-ENOMEM);
- }
-
- /* here we should take care about splitted dir, so store cookie and fid
- * for "master" object should already be allocated and passed in @oa. */
- LASSERT(oa->o_id != 0);
- LASSERT(oa->o_fid != 0);
-
- /* save "master" object fid */
- obdo2fid(oa, &mid);
-
- mea = (struct lmv_stripe_md *)*ea;
- mea->mea_master = -1;
- mea->mea_magic = MEA_MAGIC_ALL_CHARS;
-
- if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
- mea->mea_count = lmv->desc.ld_tgt_count;
-
- for (i = 0, c = 0; c < mea->mea_count && i < lmv->desc.ld_tgt_count; i++) {
- struct lov_stripe_md obj_md;
- struct lov_stripe_md *obj_mdp = &obj_md;
-
- if (lmv->tgts[i].ltd_exp == NULL) {
- /* this is "master" MDS */
- mea->mea_master = i;
- mea->mea_ids[c] = mid;
- c++;
- continue;
- }
-
- /*
- * "master" MDS should always be part of stripped dir,
- * so scan for it.
- */
- if (mea->mea_master == -1 && c == mea->mea_count - 1)
- continue;
-
- oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
- OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
-
- rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
- if (rc) {
- CERROR("obd_create() failed on MDT target %d, "
- "error %d\n", c, rc);
- RETURN(rc);
- }
-
- CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
- i, oa->o_id, oa->o_generation);
-
-
- /*
- * here, when object is created (or it is master and was passed
- * from caller) on desired MDS we save its fid to local mea_ids.
- */
- LASSERT(oa->o_fid);
-
- /*
- * store cookie should be defined here for both cases (master
- * object and not master), because master is already created.
- */
- LASSERT(oa->o_id);
-
- /* fill mea by store cookie and fid */
- obdo2fid(oa, &mea->mea_ids[c]);
- c++;
- }
- LASSERT(c == mea->mea_count);
-
- CDEBUG(D_OTHER, "%d dirobjects created\n",
- (int)mea->mea_count);
-
- RETURN(rc);
-}
-
static int lmv_llog_init(struct obd_device *obd, struct obd_device *tgt,
int count, struct llog_catid *logid)
{
RETURN(mea_size);
}
+#if 0
+/* lmv_create() and lmv_brw() is needed anymore as they purely server stuff and
+ * lmv is going to use only on client. */
+static int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lov_stripe_md obj_md;
+ struct lov_stripe_md *obj_mdp = &obj_md;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(ea == NULL);
+ LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
+
+ rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp,
+ oa, &obj_mdp, oti);
+
+ RETURN(rc);
+}
+
+/*
+ * to be called from MDS only. @oa should have correct store cookie and o_fid
+ * values for "master" object, as it will be used.
+ */
+int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_stripe_md *mea;
+ struct lu_fid mid;
+ int i, c, rc = 0;
+ ENTRY;
+
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
+
+ LASSERT(oa != NULL);
+
+ if (ea == NULL) {
+ rc = lmv_obd_create_single(exp, oa, NULL, oti);
+ if (rc)
+ CERROR("Can't create object, rc = %d\n", rc);
+ RETURN(rc);
+ }
+
+ if (*ea == NULL) {
+ rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
+ if (rc < 0) {
+ CERROR("obd_alloc_diskmd() failed, error %d\n",
+ rc);
+ RETURN(rc);
+ } else
+ rc = 0;
+
+ if (*ea == NULL)
+ RETURN(-ENOMEM);
+ }
+
+ /* here we should take care about splitted dir, so store cookie and fid
+ * for "master" object should already be allocated and passed in @oa. */
+ LASSERT(oa->o_id != 0);
+ LASSERT(oa->o_fid != 0);
+
+ /* save "master" object fid */
+ obdo2fid(oa, &mid);
+
+ mea = (struct lmv_stripe_md *)*ea;
+ mea->mea_master = -1;
+ mea->mea_magic = MEA_MAGIC_ALL_CHARS;
+
+ if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
+ mea->mea_count = lmv->desc.ld_tgt_count;
+
+ for (i = 0, c = 0; c < mea->mea_count && i < lmv->desc.ld_tgt_count; i++) {
+ struct lov_stripe_md obj_md;
+ struct lov_stripe_md *obj_mdp = &obj_md;
+
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ /* this is "master" MDS */
+ mea->mea_master = i;
+ mea->mea_ids[c] = mid;
+ c++;
+ continue;
+ }
+
+ /*
+ * "master" MDS should always be part of stripped dir,
+ * so scan for it.
+ */
+ if (mea->mea_master == -1 && c == mea->mea_count - 1)
+ continue;
+
+ oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
+ OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
+
+ rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
+ if (rc) {
+ CERROR("obd_create() failed on MDT target %d, "
+ "error %d\n", c, rc);
+ RETURN(rc);
+ }
+
+ CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
+ i, oa->o_id, oa->o_generation);
+
+
+ /*
+ * here, when object is created (or it is master and was passed
+ * from caller) on desired MDS we save its fid to local mea_ids.
+ */
+ LASSERT(oa->o_fid);
+
+ /*
+ * store cookie should be defined here for both cases (master
+ * object and not master), because master is already created.
+ */
+ LASSERT(oa->o_id);
+
+ /* fill mea by store cookie and fid */
+ obdo2fid(oa, &mea->mea_ids[c]);
+ c++;
+ }
+ LASSERT(c == mea->mea_count);
+
+ CDEBUG(D_OTHER, "%d dirobjects created\n",
+ (int)mea->mea_count);
+
+ RETURN(rc);
+}
+
int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *ea, obd_count oa_bufs,
struct brw_page *pgarr, struct obd_trans_info *oti)
{
/* splitting is not needed in lmv */
-#if 0
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_stripe_md *mea = (struct lmv_stripe_md *) ea;
err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp,
oa, NULL, oa_bufs, pgarr, oti);
RETURN(err);
-#endif
- return -EINVAL;
}
+#endif
static int lmv_cancel_unused(struct obd_export *exp,
struct lov_stripe_md *lsm,
.o_llog_finish = lmv_llog_finish,
.o_get_info = lmv_get_info,
.o_set_info = lmv_set_info,
- .o_create = lmv_obd_create,
.o_packmd = lmv_packmd,
.o_unpackmd = lmv_unpackmd,
- .o_brw = lmv_brw,
.o_notify = lmv_notify,
.o_iocontrol = lmv_iocontrol,
.o_cancel_unused = lmv_cancel_unused,