From b5f041057359892d0a77d72639882c0caecc5b24 Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 20 May 2004 13:23:46 +0000 Subject: [PATCH] - new routine lmv_get_mea_and_update_object() to be called for MDS's reply with -ERESTART which notifies directory got splitted - lmv_intent_open() recognizes a dir gets splitted during request, retrieves mea and repeats the request using proper MDS - lmv_create() recognizes a dir gets splitted during request, retrieves mea and repeats the request using proper MDS - bug fixed in lmv_getattr_name(): it passed wrong namelen to raw_name2idx() - lmv_obd_create() sets OBD_MD_FLID to flag MDSs to mark created object unsplittable. we have to differ two possible requests: 1) to create remote inode for cross-node mkdir(); 2) to create slave objects. last ones must not be splitted recursively - mdt_obj_create() has been rewritten to comply just described rules. also, it takes a lock on newly created inode. this is needed for recovery - bug fixed in scan_and_distribute(): it tried to open an inode using decimal number and this caused iopen_lookup() to find alias dentries - mds_get_lmv_attr() should return right mea size for given conf. in any case - minor cleanups --- lustre/lmv/lmv_intent.c | 23 ++++++--- lustre/lmv/lmv_internal.h | 1 + lustre/lmv/lmv_obd.c | 84 +++++++++++++++++-------------- lustre/mds/handler.c | 126 ++++++++++++++++++---------------------------- lustre/mds/mds_lmv.c | 21 ++++---- lustre/mds/mds_open.c | 4 +- lustre/mds/mds_reint.c | 13 ++--- 7 files changed, 128 insertions(+), 144 deletions(-) diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index 81e4baa..8d18bbb 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -131,8 +131,9 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt, /* IT_OPEN is intended to open (and create, possible) an object. * parent (pfid) may be splitted dir */ - mds = pfid->mds; - obj = lmv_grab_obj(obd, pfid, 0); +repeat: + mds = rpfid.mds; + obj = lmv_grab_obj(obd, &rpfid, 0); if (obj) { /* directory is already splitted, so we have to forward * request to the right MDS */ @@ -141,10 +142,20 @@ int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt, CDEBUG(D_OTHER, "forward to MDS #%u\n", mds); } - rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, len, - lmm, lmmsize, cfid, it, flags, reqp, cb_blocking); - + rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, + len, lmm, lmmsize, cfid, it, flags, reqp, + cb_blocking); lmv_put_obj(obj); + if (rc == -ERESTART) { + /* directory got splitted. time to update local object + * and repeat the request with proper MDS */ + LASSERT(fid_equal(pfid, &rpfid)); + rc = lmv_get_mea_and_update_object(exp, &rpfid); + if (rc == 0) { + ptlrpc_req_finished(*reqp); + goto repeat; + } + } if (rc != 0) RETURN(rc); @@ -498,7 +509,7 @@ repeat: RETURN(rc); } - if (rc == -ESTALE) { + if (rc == -ERESTART) { /* directory got splitted since last update. this shouldn't * be becasue splitting causes lock revocation, so revalidate * had to fail and lookup on dir had to return mea */ diff --git a/lustre/lmv/lmv_internal.h b/lustre/lmv/lmv_internal.h index e531834..30688f2 100644 --- a/lustre/lmv/lmv_internal.h +++ b/lustre/lmv/lmv_internal.h @@ -50,6 +50,7 @@ int lmv_revalidate_slaves(struct obd_export *, struct ptlrpc_request **, struct ll_fid *, struct lookup_intent *, int, ldlm_blocking_callback cb_blocking); void lmv_cleanup_objs(struct obd_device *obd); +int lmv_get_mea_and_update_object(struct obd_export *, struct ll_fid *); static inline struct mea * is_body_of_splitted_dir(struct ptlrpc_request *req, int offset) diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index ce78065..0441c14 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -450,36 +450,62 @@ int lmv_close(struct obd_export *exp, struct obdo *obdo, RETURN(rc); } +int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid) +{ + struct obd_device *obd = exp->exp_obd; + struct lmv_obd *lmv = &obd->u.lmv; + struct ptlrpc_request *req = NULL; + struct lustre_md md; + int mealen, rc; + + md.mea = NULL; + mealen = MEA_SIZE_LMV(lmv); + + /* time to update mea of parent fid */ + rc = md_getattr(lmv->tgts[fid->mds].exp, fid, + OBD_MD_FLEASIZE, mealen, &req); + if (rc) + GOTO(cleanup, rc); + rc = mdc_req2lustre_md(req, 0, NULL, exp, &md); + if (rc) + GOTO(cleanup, rc); + if (md.mea == NULL) + GOTO(cleanup, rc = -ENODATA); + rc = lmv_create_obj_from_attrs(exp, fid, md.mea); + obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea); + +cleanup: + if (req) + ptlrpc_req_finished(req); + RETURN(rc); +} + int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data, const void *data, int datalen, int mode, __u32 uid, __u32 gid, __u64 rdev, struct ptlrpc_request **request) { struct obd_device *obd = exp->exp_obd; struct lmv_obd *lmv = &obd->u.lmv; - struct mea *mea = op_data->mea1; struct mds_body *mds_body; - int rc, i, mds, free_mea = 0; struct lmv_obj *obj; + int rc, mds; ENTRY; + lmv_connect(obd); - /* TODO: where to create new directories? - * current design don't support directory on a slave MDS, - * but we lookup by name may forward any request in slave - */ repeat: obj = lmv_grab_obj(obd, &op_data->fid1, 0); if (obj) { mds = raw_name2idx(obj->objcount, op_data->name, - op_data->namelen - 1); + op_data->namelen); op_data->fid1 = obj->objs[mds].fid; lmv_put_obj(obj); } - CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu (mea 0x%p)\n", + CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n", op_data->namelen, op_data->name, (unsigned long) op_data->fid1.mds, (unsigned long) op_data->fid1.id, - (unsigned long) op_data->fid1.generation, mea); + (unsigned long) op_data->fid1.generation); rc = md_create(lmv->tgts[op_data->fid1.mds].exp, op_data, data, datalen, mode, uid, gid, rdev, request); if (rc == 0) { @@ -494,33 +520,15 @@ repeat: op_data->fid1.mds); LASSERT(mds_body->valid & OBD_MD_MDS || mds_body->mds == op_data->fid1.mds); - } else if (rc == -ESTALE) { - struct ptlrpc_request *req = NULL; - struct lustre_md md; - int mealen; - - LBUG(); /* FIXME ASAP */ - CDEBUG(D_OTHER, "it seems MDS splitted dir\n"); - LASSERT(mea == NULL); - - mealen = sizeof(struct ll_fid)*lmv->count + sizeof(struct mea); - /* time to update mea of parent fid */ - i = op_data->fid1.mds; - rc = md_getattr(lmv->tgts[i].exp, &op_data->fid1, - OBD_MD_FLEASIZE, mealen, &req); - LASSERT(rc == 0); - md.mea = NULL; - rc = mdc_req2lustre_md(req, 0, NULL, exp, &md); - LASSERT(rc == 0); - LASSERT(md.mea != NULL); - mea = md.mea; - ptlrpc_req_finished(req); - free_mea = 1; - - goto repeat; + } else if (rc == -ERESTART) { + /* directory got splitted. time to update local object + * and repeat the request with proper MDS */ + rc = lmv_get_mea_and_update_object(exp, &op_data->fid1); + if (rc == 0) { + ptlrpc_req_finished(*request); + goto repeat; + } } - if (free_mea) - obd_free_memmd(exp, (struct lov_stripe_md**) &mea); RETURN(rc); } @@ -582,12 +590,12 @@ int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid, ENTRY; lmv_connect(obd); CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu\n", - namelen - 1, filename, (unsigned long) fid->mds, + namelen, filename, (unsigned long) fid->mds, (unsigned long) fid->id, (unsigned long) fid->generation); obj = lmv_grab_obj(obd, fid, 0); if (obj) { /* directory is splitted. look for right mds for this name */ - mds = raw_name2idx(obj->objcount, filename, namelen - 1); + mds = raw_name2idx(obj->objcount, filename, namelen); rfid = obj->objs[mds].fid; lmv_put_obj(obj); } @@ -1014,7 +1022,7 @@ int lmv_obd_create(struct obd_export *exp, struct obdo *oa, continue; oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE - | OBD_MD_FLUID | OBD_MD_FLGID; + | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID; rc = obd_create(lmv->tgts[c].exp, oa, &obj_mdp, oti); /* FIXME: error handling here */ diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 9d9e64e..761760d 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1171,6 +1171,8 @@ static char *reint_names[] = { static int mdt_obj_create(struct ptlrpc_request *req) { + unsigned int tmpname = ll_insecure_random_int(); + struct ldlm_res_id res_id = { .name = {0} }; struct obd_export *exp = req->rq_export; struct obd_device *obd = exp->exp_obd; struct mds_obd *mds = &obd->u.mds; @@ -1178,8 +1180,10 @@ static int mdt_obj_create(struct ptlrpc_request *req) int rc, size = sizeof(*repbody); char fidname[LL_FID_NAMELEN]; struct inode *parent_inode; + struct lustre_handle lockh; struct obd_run_ctxt saved; - int err, namelen, mealen; + ldlm_policy_data_t policy; + int mealen, flags = 0; struct obd_ucred uc; struct dentry *new; struct mea *mea; @@ -1203,95 +1207,63 @@ static int mdt_obj_create(struct ptlrpc_request *req) repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); - if (!(body->oa.o_valid & OBD_MD_FLID)) { - /* this is request from another MDS to create remove dir inode */ - unsigned int tmpname = ll_insecure_random_int(); + handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL); + LASSERT(!IS_ERR(handle)); - handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL); + sprintf(fidname, "%u", tmpname); + new = simple_mkdir(mds->mds_objects_dir, fidname, + body->oa.o_mode, 1); + LASSERT(!IS_ERR(new)); + LASSERT(new->d_inode != NULL); + + if (body->oa.o_valid & OBD_MD_FLID) { + /* this is new object for splitted dir. we have to + * prevent recursive splitting on it -bzzz */ + mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL); + OBD_ALLOC(mea, mealen); + LASSERT(mea != NULL); + mea->mea_count = 0; + down(&new->d_inode->i_sem); + handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL); LASSERT(!IS_ERR(handle)); - - sprintf(fidname, "%u", tmpname); - new = simple_mkdir(mds->mds_objects_dir, fidname, - body->oa.o_mode, 1); - LASSERT(!IS_ERR(new)); - LASSERT(new->d_inode != NULL); - - obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS); - repbody->oa.o_id = new->d_inode->i_ino; - repbody->oa.o_generation = new->d_inode->i_generation; - repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER; - - rc = fsfilt_del_dir_entry(obd, new); + rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen); LASSERT(rc == 0); - - rc = fsfilt_commit(obd, parent_inode, handle, 0); + fsfilt_commit(obd, new->d_inode, handle, 0); LASSERT(rc == 0); - - CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n", - (unsigned long) new->d_inode->i_ino, - (unsigned long) new->d_inode->i_generation, - (unsigned) new->d_inode->i_mode); - - l_dput(new); - pop_ctxt(&saved, &obd->obd_ctxt, &uc); - RETURN(0); + up(&new->d_inode->i_sem); + OBD_FREE(mea, mealen); } + obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS); + repbody->oa.o_id = new->d_inode->i_ino; + repbody->oa.o_generation = new->d_inode->i_generation; + repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER; - repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); - memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); - - namelen = ll_fid2str(fidname, body->oa.o_id, body->oa.o_generation); - down(&parent_inode->i_sem); - new = lookup_one_len(fidname, mds->mds_objects_dir, namelen); - if (new->d_inode != NULL) { - CERROR("impossible non-negative obj dentry " LPU64":%u!\n", - repbody->oa.o_id, repbody->oa.o_generation); - LBUG(); - } - handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode, - FSFILT_OP_MKDIR, NULL); - /* FIXME: error handling here */ - LASSERT(!IS_ERR(handle)); - - rc = vfs_mkdir(parent_inode, new, body->oa.o_mode); + rc = fsfilt_del_dir_entry(obd, new); up(&parent_inode->i_sem); - /* FIXME: error handling here */ - if (rc) - CERROR("vfs_mkdir() returned %d\n", rc); LASSERT(rc == 0); - - /* mark this object non-splittable */ - mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL); - OBD_ALLOC(mea, mealen); - LASSERT(mea != NULL); - mea->mea_count = 0; - down(&new->d_inode->i_sem); - handle = fsfilt_start(obd, new->d_inode, FSFILT_OP_SETATTR, NULL); - LASSERT(!IS_ERR(handle)); - rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen); - LASSERT(rc == 0); - fsfilt_commit(obd, new->d_inode, handle, 0); - LASSERT(rc == 0); - up(&new->d_inode->i_sem); - OBD_FREE(mea, mealen); - err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode, - handle, 0); - /* FIXME: error handling here */ - LASSERT(err == 0); + rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0); + LASSERT(rc == 0); - obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS); - repbody->oa.o_id = new->d_inode->i_ino; - repbody->oa.o_generation = new->d_inode->i_generation; - CDEBUG(D_OTHER, "created dirobj: %lu, %lu mode %o, uid %u, gid %u\n", - (unsigned long) repbody->oa.o_id, + res_id.name[0] = new->d_inode->i_ino; + res_id.name[1] = new->d_inode->i_generation; + policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, + res_id, LDLM_IBITS, &policy, + LCK_EX, &flags, mds_blocking_ast, + ldlm_completion_ast, NULL, NULL, + NULL, 0, NULL, &lockh); + LASSERT(rc == ELDLM_OK); + + CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n", (unsigned long) new->d_inode->i_ino, - (unsigned) new->d_inode->i_mode, - (unsigned) new->d_inode->i_uid, - (unsigned) new->d_inode->i_gid); - dput(new); + (unsigned long) new->d_inode->i_generation, + (unsigned) new->d_inode->i_mode); + + l_dput(new); pop_ctxt(&saved, &obd->obd_ctxt, &uc); + ptlrpc_save_lock(req, &lockh, LCK_EX); RETURN(0); } diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index 3a5c838..c4ea3af 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -169,7 +169,6 @@ int mds_get_lmv_attr(struct obd_device *obd, struct inode *inode, if (rc <= 0) { OBD_FREE(*mea, *mea_size); *mea = NULL; - *mea_size = 0; } if (rc > 0) rc = 0; @@ -358,8 +357,7 @@ int scan_and_distribute(struct obd_device *obd, struct dentry *dentry, OBD_ALLOC(file_name, nlen); if (!file_name) RETURN(-ENOMEM); - i = sprintf(file_name, "__iopen__/%u", - (unsigned) dentry->d_inode->i_ino); + i = sprintf(file_name, "__iopen__/0x%lx", dentry->d_inode->i_ino); file = filp_open(file_name, O_RDONLY, 0); if (IS_ERR(file)) { @@ -421,27 +419,26 @@ int mds_try_to_split_dir(struct obd_device *obd, if (dentry->d_inode->i_ino == mds->mds_rootfid.id) RETURN(0); -#if 1 + /* we want to split only large dirs. this may be already + * splitted dir or a slave dir created during splitting */ if (dir->i_size < MAX_DIR_SIZE) RETURN(0); -#endif /* check is directory marked non-splittable */ if (mea && *mea) RETURN(0); - CDEBUG(D_OTHER, "%s: split directory %lu/%lu (mea 0x%p)\n", - obd->obd_name, dir->i_ino, - (unsigned long) dir->i_generation, mea); + CDEBUG(D_OTHER, "%s: split directory %lu/%lu\n", + obd->obd_name, dir->i_ino, (unsigned long) dir->i_generation); if (mea == NULL) mea = &tmea; mea_size = obd_size_diskmd(mds->mds_lmv_exp, NULL); /* FIXME: Actually we may only want to allocate enough space for - necessary amount of stripes, but on the other hand with this approach - of allocating maximal possible amount of MDS slots, it would be - easier to split the dir over more MDSes */ + * necessary amount of stripes, but on the other hand with this + * approach of allocating maximal possible amount of MDS slots, + * it would be easier to split the dir over more MDSes */ rc = obd_alloc_diskmd(mds->mds_lmv_exp, (void *) mea); if (!(*mea)) RETURN(-ENOMEM); @@ -460,7 +457,7 @@ int mds_try_to_split_dir(struct obd_device *obd, OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLUID | OBD_MD_FLGID); oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num; - oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLGROUP; + oa->o_valid |= OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP; oa->o_mode = dir->i_mode; CDEBUG(D_OTHER, "%s: create subdirs with mode %o, uid %u, gid %u\n", obd->obd_name, dir->i_mode, dir->i_uid, dir->i_gid); diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 1a95d04..2546cc6 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -874,7 +874,7 @@ int mds_open(struct mds_update_record *rec, int offset, if (mea->mea_master != i) { CERROR("inapropriate MDS(%d) for %s. should be %d\n", mea->mea_master, rec->ur_name, i); - GOTO(cleanup, rc = -ESTALE); + GOTO(cleanup, rc = -ERESTART); } } @@ -939,7 +939,7 @@ got_child: if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) { if (rc > 0) { /* dir got splitted */ - GOTO(cleanup, rc = -ESTALE); + GOTO(cleanup, rc = -ERESTART); } else { /* error happened during spitting */ GOTO(cleanup, rc); diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index d3615d4..bb56334 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -578,7 +578,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (mea->mea_master != i) { CERROR("inapropriate MDS(%d) for %s. should be %d\n", mea->mea_master, rec->ur_name, i); - GOTO(cleanup, rc = -ESTALE); + GOTO(cleanup, rc = -ERESTART); } } @@ -597,7 +597,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) { if (rc > 0) { /* dir got splitted */ - GOTO(cleanup, rc = -ESTALE); + GOTO(cleanup, rc = -ERESTART); } else { /* error happened during spitting */ GOTO(cleanup, rc); @@ -644,15 +644,10 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (rec->ur_eadata) nstripes = *(u16 *)rec->ur_eadata; -#if 1 - /* this is for current testing yet. after the testing - * directory will split if size reaches some limite -bzzz */ - if (rc == 0) { -#else if (rc == 0 && nstripes) { -#endif /* FIXME: error handling here */ - mds_try_to_split_dir(obd, dchild, NULL, nstripes); + mds_try_to_split_dir(obd, dchild, + NULL, nstripes); } } else if (!DENTRY_VALID(dchild)) { /* inode will be created on another MDS */ -- 1.8.3.1