struct ll_fid rpfid = *pfid;
struct lmv_obj *obj;
struct mea *mea;
- int rc, mds;
+ int rc, mds, loop = 0;
ENTRY;
/* IT_OPEN is intended to open (and create, possible) an object. Parent
* (pfid) may be splitted dir */
repeat:
+ LASSERT(++loop <= 2);
mds = rpfid.mds;
obj = lmv_grab_obj(obd, &rpfid);
if (obj) {
/* directory is already splitted, so we have to forward
* request to the right MDS */
mds = raw_name2idx(obj->objcount, (char *)name, len);
- CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
+ CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
+ (unsigned long) rpfid.mds, (unsigned long) rpfid.id,
+ (unsigned long) rpfid.generation);
rpfid = obj->objs[mds].fid;
lmv_put_obj(obj);
}
- rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
+ rc = md_intent_lock(lmv->tgts[rpfid.mds].ltd_exp, uctxt, &rpfid, name,
len, lmm, lmmsize, cfid, it, flags, reqp,
cb_blocking);
if (rc == -ERESTART) {
/* in fact, we need not this with current intent_lock(),
* but it may change some day */
rpfid = obj->objs[mds].fid;
+ mds = rpfid.mds;
lmv_put_obj(obj);
}
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, &rpfid, name,
/* directory is already splitted. calculate mds */
mds = raw_name2idx(obj->objcount, (char *) name, len);
rpfid = obj->objs[mds].fid;
+ mds = rpfid.mds;
lmv_put_obj(obj);
CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n",
struct ll_fid rpfid = *pfid;
struct lmv_obj *obj;
struct mea *mea;
- int rc, mds;
+ int rc, mds, loop = 0;
ENTRY;
/* IT_LOOKUP is intended to produce name -> fid resolving (let's call
mds = pfid->mds;
repeat:
+ LASSERT(++loop <= 2);
/* this is lookup. during lookup we have to update all the attributes,
* because returned values will be put in struct inode */
/* directory is already splitted. calculate mds */
mds = raw_name2idx(obj->objcount, (char *)name, len);
rpfid = obj->objs[mds].fid;
+ mds = rpfid.mds;
}
lmv_put_obj(obj);
}
for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
int err;
- err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp,
- len, karg, uarg);
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n", obddev->obd_name, i);
+ continue;
+ }
+
+ err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
if (err) {
if (lmv->tgts[i].active) {
CERROR("error: iocontrol MDC %s on MDT"
RETURN(rc);
for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n", obd->obd_name, i);
+ continue;
+ }
+
rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, &temp, max_age);
if (rc) {
CERROR("can't stat MDS #%d (%s)\n", i,
for (i = 0; i < obj->objcount; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n",
+ obd->obd_name, i);
+ continue;
+ }
+
/* skip master obj. */
if (fid_equal(&obj->fid, &obj->objs[i].fid))
continue;
if (obj) {
/* directory is splitted. look for right mds for this name. */
mds = raw_name2idx(obj->objcount, name, len);
+ mds = obj->objs[mds].fid.mds;
lmv_put_obj(obj);
}
rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, cfid, it, data);
struct lmv_obd *lmv = &obd->u.lmv;
struct mds_body *body;
struct lmv_obj *obj;
- int rc, mds;
+ int rc, mds, loop = 0;
ENTRY;
rc = lmv_check_connect(obd);
if (!lmv->desc.ld_active_tgt_count)
RETURN(-EIO);
repeat:
+ LASSERT(++loop <= 2);
obj = lmv_grab_obj(obd, &op_data->fid1);
if (obj) {
mds = raw_name2idx(obj->objcount, op_data->name,
LASSERT(mea != NULL);
for (i = 0; i < mea->mea_count; i++) {
- if (lmv->tgts[i].ltd_exp == NULL)
- continue;
-
memset(&data2, 0, sizeof(data2));
data2.fid1 = mea->mea_fids[i];
mds = data2.fid1.mds;
+ if (lmv->tgts[mds].ltd_exp == NULL)
+ continue;
+
rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode,
&data2, lockh + i, lmm, lmmsize, cb_completion,
cb_blocking, cb_data);
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct ll_fid rfid = *fid;
- int rc, mds = fid->mds;
+ int rc, mds = fid->mds, loop = 0;
struct mds_body *body;
struct lmv_obj *obj;
ENTRY;
if (rc)
RETURN(rc);
repeat:
+ LASSERT(++loop <= 2);
obj = lmv_grab_obj(obd, fid);
if (obj) {
/* directory is splitted. look for right mds for this name */
(unsigned long)rfid.mds, (unsigned long)rfid.id,
(unsigned long)rfid.generation);
- rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename,
+ rc = md_getattr_name(lmv->tgts[rfid.mds].ltd_exp, &rfid, filename,
namelen, valid, ea_size, request);
if (rc == 0) {
/* this could be cross-node reference. in this case all we have
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- int rc = 0, i = data->fid1.mds;
struct ptlrpc_request *req;
struct mds_body *body;
struct lmv_obj *obj;
+ int rc = 0, i;
ENTRY;
rc = lmv_check_connect(obd);
for (i = 0; i < obj->objcount; i++) {
data->fid1 = obj->objs[i].fid;
- rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr,
- ea, ealen, ea2, ea2len, &req);
+ rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data,
+ iattr, ea, ealen, ea2, ea2len, &req);
if (fid_equal(&obj->fid, &obj->objs[i].fid)) {
/* this is master object and this request should
}
lmv_put_obj(obj);
} else {
- LASSERT(i < lmv->desc.ld_tgt_count);
- rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea,
- ealen, ea2, ea2len, request);
+ LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count);
+ rc = md_setattr(lmv->tgts[data->fid1.mds].ltd_exp, data,
+ iattr, ea, ealen, ea2, ea2len, request);
if (rc == 0) {
body = lustre_msg_buf((*request)->rq_repmsg, 0,
sizeof(*body));
LASSERT(body != NULL);
- LASSERT(body->mds == i);
+ LASSERT(body->mds == data->fid1.mds);
}
}
RETURN(rc);
if (rc)
RETURN(rc);
- rc = md_sync(lmv->tgts[0].ltd_exp, fid, request);
+ CWARN("%s: ->m_sync() isn't implemented yet\n", obd->obd_name);
+ rc = md_sync(lmv->tgts[fid->mds].ltd_exp, fid, request);
RETURN(rc);
}
LASSERT(mea != NULL);
for (i = 0; i < mea->mea_count; i++) {
- if (lmv->tgts[i].ltd_exp == NULL)
- continue;
-
memset(&data2, 0, sizeof(data2));
data2.fid1 = mea->mea_fids[i];
data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
mds = data2.fid1.mds;
+
+ if (lmv->tgts[mds].ltd_exp == NULL)
+ continue;
+
rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req);
CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n",
(unsigned long) mea->mea_fids[i].mds,
rc = lmv_check_connect(obd);
if (rc)
RETURN(ERR_PTR(rc));
+#warning "we need well-desgined readdir() implementation to remove this mess"
obd = lmv->tgts[0].ltd_exp->exp_obd;
EXIT;
return obd;
RETURN(0);
for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
+ if (lmv->tgts[i].ltd_exp == NULL) {
+ CWARN("%s: NULL export for %d\n", obd->obd_name, i);
+ continue;
+ }
+
rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize);
if (rc) {
CERROR("obd_init_ea_size() failed on MDT target %d, "
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct mea *mea;
int i, c, rc = 0;
+ struct mea *mea;
struct ll_fid mfid;
+ int lcount;
ENTRY;
rc = lmv_check_connect(obd);
mea->mea_count = lmv->desc.ld_tgt_count;
mea->mea_master = -1;
-
- for (i = 0, c = 0; c < mea->mea_count &&
- i < lmv->desc.ld_tgt_count; i++) {
+ lcount = lmv->desc.ld_tgt_count;
+ for (i = 0, c = 0; c < mea->mea_count && i < lcount; i++) {
struct lov_stripe_md obj_md;
struct lov_stripe_md *obj_mdp = &obj_md;
c++;
CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
i, oa->o_id, oa->o_generation);
- CDEBUG(D_ERROR, "dirobj at mds %d: "LPU64"/%u\n",
- i, oa->o_id, oa->o_generation);
}
LASSERT(c == mea->mea_count);
CDEBUG(D_OTHER, "%d dirobjects created\n", (int) mea->mea_count);
if (!inode)
RETURN(ERR_PTR(-ENOENT));
+#warning "I think we need something another here -bzzz"
#if 0
/* here we disabled generation check, as root inode i_generation
* of cache mds and real mds are different. */
* should live at this MDS or at another one */
int i;
i = mea_name2idx(mea, name, namelen - 1);
- if (mea->mea_master != i) {
- CERROR("inapropriate MDS(%d) for %s. should be %d\n",
- mea->mea_master, name, i);
+ if (mea->mea_master != mea->mea_fids[i].mds) {
+ CERROR("inapropriate MDS(%d) for %s. should be %d(%d)\n",
+ mea->mea_master, name, mea->mea_fids[i].mds, i);
rc = -ERESTART;
}
}
rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
up(&new->d_inode->i_sem);
OBD_FREE(mea, mealen);
+ CDEBUG(D_OTHER, "%s: mark non-splittable %lu/%u - %d\n",
+ obd->obd_name, new->d_inode->i_ino,
+ new->d_inode->i_generation, flags);
} else if (rc == 0 && body->oa.o_easize) {
- flags = mds_try_to_split_dir(obd, new, NULL, body->oa.o_easize);
- CERROR("%s: splitted %lu/%u - %d\n", obd->obd_name,
- new->d_inode->i_ino, new->d_inode->i_generation, flags);
+ mds_try_to_split_dir(obd, new, NULL, body->oa.o_easize);
}
cleanup: