/* IT_OPEN is intended to open (and create, possible) an object.
* parent (pfid) may be splitted dir */
- mds = pfid->mds;
- obj = lmv_grab_obj(obd, pfid, 0);
+repeat:
+ mds = rpfid.mds;
+ obj = lmv_grab_obj(obd, &rpfid, 0);
if (obj) {
/* directory is already splitted, so we have to forward
* request to the right MDS */
CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
}
- rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, len,
- lmm, lmmsize, cfid, it, flags, reqp, cb_blocking);
-
+ rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
+ len, lmm, lmmsize, cfid, it, flags, reqp,
+ cb_blocking);
lmv_put_obj(obj);
+ if (rc == -ERESTART) {
+ /* directory got splitted. time to update local object
+ * and repeat the request with proper MDS */
+ LASSERT(fid_equal(pfid, &rpfid));
+ rc = lmv_get_mea_and_update_object(exp, &rpfid);
+ if (rc == 0) {
+ ptlrpc_req_finished(*reqp);
+ goto repeat;
+ }
+ }
if (rc != 0)
RETURN(rc);
RETURN(rc);
}
- if (rc == -ESTALE) {
+ if (rc == -ERESTART) {
/* directory got splitted since last update. this shouldn't
* be becasue splitting causes lock revocation, so revalidate
* had to fail and lookup on dir had to return mea */
struct ll_fid *, struct lookup_intent *, int,
ldlm_blocking_callback cb_blocking);
void lmv_cleanup_objs(struct obd_device *obd);
+int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *);
static inline struct mea *
is_body_of_splitted_dir(struct ptlrpc_request *req, int offset)
RETURN(rc);
}
+int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct ptlrpc_request *req = NULL;
+ struct lustre_md md;
+ int mealen, rc;
+
+ md.mea = NULL;
+ mealen = MEA_SIZE_LMV(lmv);
+
+ /* time to update mea of parent fid */
+ rc = md_getattr(lmv->tgts[fid->mds].exp, fid,
+ OBD_MD_FLEASIZE, mealen, &req);
+ if (rc)
+ GOTO(cleanup, rc);
+ rc = mdc_req2lustre_md(req, 0, NULL, exp, &md);
+ if (rc)
+ GOTO(cleanup, rc);
+ if (md.mea == NULL)
+ GOTO(cleanup, rc = -ENODATA);
+ rc = lmv_create_obj_from_attrs(exp, fid, md.mea);
+ obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea);
+
+cleanup:
+ if (req)
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
+
int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
const void *data, int datalen, int mode, __u32 uid,
__u32 gid, __u64 rdev, struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct mea *mea = op_data->mea1;
struct mds_body *mds_body;
- int rc, i, mds, free_mea = 0;
struct lmv_obj *obj;
+ int rc, mds;
ENTRY;
+
lmv_connect(obd);
- /* TODO: where to create new directories?
- * current design don't support directory on a slave MDS,
- * but we lookup by name may forward any request in slave
- */
repeat:
obj = lmv_grab_obj(obd, &op_data->fid1, 0);
if (obj) {
mds = raw_name2idx(obj->objcount, op_data->name,
- op_data->namelen - 1);
+ op_data->namelen);
op_data->fid1 = obj->objs[mds].fid;
lmv_put_obj(obj);
}
- CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu (mea 0x%p)\n",
+ CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n",
op_data->namelen, op_data->name,
(unsigned long) op_data->fid1.mds,
(unsigned long) op_data->fid1.id,
- (unsigned long) op_data->fid1.generation, mea);
+ (unsigned long) op_data->fid1.generation);
rc = md_create(lmv->tgts[op_data->fid1.mds].exp, op_data, data,
datalen, mode, uid, gid, rdev, request);
if (rc == 0) {
op_data->fid1.mds);
LASSERT(mds_body->valid & OBD_MD_MDS ||
mds_body->mds == op_data->fid1.mds);
- } else if (rc == -ESTALE) {
- struct ptlrpc_request *req = NULL;
- struct lustre_md md;
- int mealen;
-
- LBUG(); /* FIXME ASAP */
- CDEBUG(D_OTHER, "it seems MDS splitted dir\n");
- LASSERT(mea == NULL);
-
- mealen = sizeof(struct ll_fid)*lmv->count + sizeof(struct mea);
- /* time to update mea of parent fid */
- i = op_data->fid1.mds;
- rc = md_getattr(lmv->tgts[i].exp, &op_data->fid1,
- OBD_MD_FLEASIZE, mealen, &req);
- LASSERT(rc == 0);
- md.mea = NULL;
- rc = mdc_req2lustre_md(req, 0, NULL, exp, &md);
- LASSERT(rc == 0);
- LASSERT(md.mea != NULL);
- mea = md.mea;
- ptlrpc_req_finished(req);
- free_mea = 1;
-
- goto repeat;
+ } else if (rc == -ERESTART) {
+ /* directory got splitted. time to update local object
+ * and repeat the request with proper MDS */
+ rc = lmv_get_mea_and_update_object(exp, &op_data->fid1);
+ if (rc == 0) {
+ ptlrpc_req_finished(*request);
+ goto repeat;
+ }
}
- if (free_mea)
- obd_free_memmd(exp, (struct lov_stripe_md**) &mea);
RETURN(rc);
}
ENTRY;
lmv_connect(obd);
CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu\n",
- namelen - 1, filename, (unsigned long) fid->mds,
+ namelen, filename, (unsigned long) fid->mds,
(unsigned long) fid->id, (unsigned long) fid->generation);
obj = lmv_grab_obj(obd, fid, 0);
if (obj) {
/* directory is splitted. look for right mds for this name */
- mds = raw_name2idx(obj->objcount, filename, namelen - 1);
+ mds = raw_name2idx(obj->objcount, filename, namelen);
rfid = obj->objs[mds].fid;
lmv_put_obj(obj);
}
continue;
oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE
- | OBD_MD_FLUID | OBD_MD_FLGID;
+ | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
rc = obd_create(lmv->tgts[c].exp, oa, &obj_mdp, oti);
/* FIXME: error handling here */
static int mdt_obj_create(struct ptlrpc_request *req)
{
+ unsigned int tmpname = ll_insecure_random_int();
+ struct ldlm_res_id res_id = { .name = {0} };
struct obd_export *exp = req->rq_export;
struct obd_device *obd = exp->exp_obd;
struct mds_obd *mds = &obd->u.mds;
int rc, size = sizeof(*repbody);
char fidname[LL_FID_NAMELEN];
struct inode *parent_inode;
+ struct lustre_handle lockh;
struct obd_run_ctxt saved;
- int err, namelen, mealen;
+ ldlm_policy_data_t policy;
+ int mealen, flags = 0;
struct obd_ucred uc;
struct dentry *new;
struct mea *mea;
repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
- if (!(body->oa.o_valid & OBD_MD_FLID)) {
- /* this is request from another MDS to create remove dir inode */
- unsigned int tmpname = ll_insecure_random_int();
+ handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+ LASSERT(!IS_ERR(handle));
- handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+ sprintf(fidname, "%u", tmpname);
+ new = simple_mkdir(mds->mds_objects_dir, fidname,
+ body->oa.o_mode, 1);
+ LASSERT(!IS_ERR(new));
+ LASSERT(new->d_inode != NULL);
+
+ if (body->oa.o_valid & OBD_MD_FLID) {
+ /* this is new object for splitted dir. we have to
+ * prevent recursive splitting on it -bzzz */
+ mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
+ OBD_ALLOC(mea, mealen);
+ LASSERT(mea != NULL);
+ mea->mea_count = 0;
+ down(&new->d_inode->i_sem);
+ handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
LASSERT(!IS_ERR(handle));
-
- sprintf(fidname, "%u", tmpname);
- new = simple_mkdir(mds->mds_objects_dir, fidname,
- body->oa.o_mode, 1);
- LASSERT(!IS_ERR(new));
- LASSERT(new->d_inode != NULL);
-
- obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
- repbody->oa.o_id = new->d_inode->i_ino;
- repbody->oa.o_generation = new->d_inode->i_generation;
- repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
-
- rc = fsfilt_del_dir_entry(obd, new);
+ rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
LASSERT(rc == 0);
-
- rc = fsfilt_commit(obd, parent_inode, handle, 0);
+ fsfilt_commit(obd, new->d_inode, handle, 0);
LASSERT(rc == 0);
-
- CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
- (unsigned long) new->d_inode->i_ino,
- (unsigned long) new->d_inode->i_generation,
- (unsigned) new->d_inode->i_mode);
-
- l_dput(new);
- pop_ctxt(&saved, &obd->obd_ctxt, &uc);
- RETURN(0);
+ up(&new->d_inode->i_sem);
+ OBD_FREE(mea, mealen);
}
+ obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+ repbody->oa.o_id = new->d_inode->i_ino;
+ repbody->oa.o_generation = new->d_inode->i_generation;
+ repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
- repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
- memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-
- namelen = ll_fid2str(fidname, body->oa.o_id, body->oa.o_generation);
-
down(&parent_inode->i_sem);
- new = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
- if (new->d_inode != NULL) {
- CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
- repbody->oa.o_id, repbody->oa.o_generation);
- LBUG();
- }
- handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
- FSFILT_OP_MKDIR, NULL);
- /* FIXME: error handling here */
- LASSERT(!IS_ERR(handle));
-
- rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
+ rc = fsfilt_del_dir_entry(obd, new);
up(&parent_inode->i_sem);
- /* FIXME: error handling here */
- if (rc)
- CERROR("vfs_mkdir() returned %d\n", rc);
LASSERT(rc == 0);
-
- /* mark this object non-splittable */
- mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
- OBD_ALLOC(mea, mealen);
- LASSERT(mea != NULL);
- mea->mea_count = 0;
- down(&new->d_inode->i_sem);
- handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL);
- LASSERT(!IS_ERR(handle));
- rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
- LASSERT(rc == 0);
- fsfilt_commit(obd, new->d_inode, handle, 0);
- LASSERT(rc == 0);
- up(&new->d_inode->i_sem);
- OBD_FREE(mea, mealen);
- err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
- handle, 0);
- /* FIXME: error handling here */
- LASSERT(err == 0);
+ rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
+ LASSERT(rc == 0);
- obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
- repbody->oa.o_id = new->d_inode->i_ino;
- repbody->oa.o_generation = new->d_inode->i_generation;
- CDEBUG(D_OTHER, "created dirobj: %lu, %lu mode %o, uid %u, gid %u\n",
- (unsigned long) repbody->oa.o_id,
+ res_id.name[0] = new->d_inode->i_ino;
+ res_id.name[1] = new->d_inode->i_generation;
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS, &policy,
+ LCK_EX, &flags, mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, &lockh);
+ LASSERT(rc == ELDLM_OK);
+
+ CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
(unsigned long) new->d_inode->i_ino,
- (unsigned) new->d_inode->i_mode,
- (unsigned) new->d_inode->i_uid,
- (unsigned) new->d_inode->i_gid);
- dput(new);
+ (unsigned long) new->d_inode->i_generation,
+ (unsigned) new->d_inode->i_mode);
+
+ l_dput(new);
pop_ctxt(&saved, &obd->obd_ctxt, &uc);
+ ptlrpc_save_lock(req, &lockh, LCK_EX);
RETURN(0);
}
if (rc <= 0) {
OBD_FREE(*mea, *mea_size);
*mea = NULL;
- *mea_size = 0;
}
if (rc > 0)
rc = 0;
OBD_ALLOC(file_name, nlen);
if (!file_name)
RETURN(-ENOMEM);
- i = sprintf(file_name, "__iopen__/%u",
- (unsigned) dentry->d_inode->i_ino);
+ i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino);
file = filp_open(file_name, O_RDONLY, 0);
if (IS_ERR(file)) {
if (dentry->d_inode->i_ino == mds->mds_rootfid.id)
RETURN(0);
-#if 1
+ /* we want to split only large dirs. this may be already
+ * splitted dir or a slave dir created during splitting */
if (dir->i_size < MAX_DIR_SIZE)
RETURN(0);
-#endif
/* check is directory marked non-splittable */
if (mea && *mea)
RETURN(0);
- CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n",
- obd->obd_name, dir->i_ino,
- (unsigned long) dir->i_generation, mea);
+ CDEBUG(D_OTHER, "%s: split directory %lu/%lu\n",
+ obd->obd_name, dir->i_ino, (unsigned long) dir->i_generation);
if (mea == NULL)
mea = &tmea;
mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL);
/* FIXME: Actually we may only want to allocate enough space for
- necessary amount of stripes, but on the other hand with this approach
- of allocating maximal possible amount of MDS slots, it would be
- easier to split the dir over more MDSes */
+ * necessary amount of stripes, but on the other hand with this
+ * approach of allocating maximal possible amount of MDS slots,
+ * it would be easier to split the dir over more MDSes */
rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea);
if (!(*mea))
RETURN(-ENOMEM);
OBD_MD_FLMTIME | OBD_MD_FLCTIME |
OBD_MD_FLUID | OBD_MD_FLGID);
oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
- oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
+ oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
oa->o_mode = dir->i_mode;
CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n",
obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid);
if (mea->mea_master != i) {
CERROR("inapropriate MDS(%d) for %s. should be %d\n",
mea->mea_master, rec->ur_name, i);
- GOTO(cleanup, rc = -ESTALE);
+ GOTO(cleanup, rc = -ERESTART);
}
}
if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) {
if (rc > 0) {
/* dir got splitted */
- GOTO(cleanup, rc = -ESTALE);
+ GOTO(cleanup, rc = -ERESTART);
} else {
/* error happened during spitting */
GOTO(cleanup, rc);
if (mea->mea_master != i) {
CERROR("inapropriate MDS(%d) for %s. should be %d\n",
mea->mea_master, rec->ur_name, i);
- GOTO(cleanup, rc = -ESTALE);
+ GOTO(cleanup, rc = -ERESTART);
}
}
if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) {
if (rc > 0) {
/* dir got splitted */
- GOTO(cleanup, rc = -ESTALE);
+ GOTO(cleanup, rc = -ERESTART);
} else {
/* error happened during spitting */
GOTO(cleanup, rc);
if (rec->ur_eadata)
nstripes = *(u16 *)rec->ur_eadata;
-#if 1
- /* this is for current testing yet. after the testing
- * directory will split if size reaches some limite -bzzz */
- if (rc == 0) {
-#else
if (rc == 0 && nstripes) {
-#endif
/* FIXME: error handling here */
- mds_try_to_split_dir(obd, dchild, NULL, nstripes);
+ mds_try_to_split_dir(obd, dchild,
+ NULL, nstripes);
}
} else if (!DENTRY_VALID(dchild)) {
/* inode will be created on another MDS */