X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmds%2Fmds_fs.c;h=1d798439b0ada88b542d95dc7f2eb90fca63c0a5;hb=09f865aa6470f6b130c5e9679be733f407679cbc;hp=fccf34f80a78c18297cee1d3c9fdc79bb7e8b598;hpb=7088f04601ade93b6bba2c44cc9c98af485c8e86;p=fs%2Flustre-release.git diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index fccf34f..1d79843 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -167,6 +167,12 @@ int mds_client_free(struct obd_export *exp, int clear_client) LBUG(); } + + /* Make sure the server's last_transno is up to date. Do this + * after the client is freed so we know all the client's + * transactions have been committed. */ + mds_update_server_data(exp->exp_obd, 1); + free_and_out: OBD_FREE(med->med_mcd, sizeof(*med->med_mcd)); @@ -309,9 +315,10 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) /* These exports are cleaned up by mds_disconnect(), so they * need to be set up like real exports as mds_connect() does. */ - CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 - " srv lr: "LPU64"\n", mcd->mcd_uuid, cl_idx, - last_transno, le64_to_cpu(msd->msd_last_transno)); + CDEBUG(D_HA|D_WARNING,"RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx, + last_transno, le64_to_cpu(msd->msd_last_transno), + mcd->mcd_last_xid); exp = class_new_export(obd); if (exp == NULL) @@ -327,6 +334,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) spin_lock_init(&med->med_open_lock); mcd = NULL; + exp->exp_replay_needed = 1; obd->obd_recoverable_clients++; obd->obd_max_recoverable_clients++; class_export_put(exp); @@ -344,7 +352,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) "last_transno "LPU64"\n", obd->obd_name, obd->obd_recoverable_clients, mds->mds_last_transno); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_recovering = 1; + target_start_recovery_thread(obd, mds_handle); } if (mcd) @@ -371,7 +379,7 @@ static int mds_fs_post_setup(struct obd_device *obd) struct dentry *de = mds_fid2dentry(mds, &mds->mds_rootfid, NULL); int rc = 0; - rc = fsfilt_post_setup(obd); + rc = fsfilt_post_setup(obd, de); if (rc) GOTO(out, rc); @@ -470,6 +478,14 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) } mds->mds_fids_dir = dentry; + dentry = simple_mkdir(current->fs->pwd, "UNNAMED", 0777, 1); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot create UNNAMED directory: rc = %d\n", rc); + GOTO(err_unnamed, rc); + } + mds->mds_unnamed_dir = dentry; + /* open and test the last rcvd file */ file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644); if (IS_ERR(file)) { @@ -520,6 +536,8 @@ err_client: err_last_rcvd: if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0)) CERROR("can't close %s after error\n", LAST_RCVD); +err_unnamed: + dput(mds->mds_unnamed_dir); err_fids: dput(mds->mds_fids_dir); err_objects: @@ -566,6 +584,10 @@ int mds_fs_cleanup(struct obd_device *obd, int flags) if (rc) CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc); } + if (mds->mds_unnamed_dir != NULL) { + l_dput(mds->mds_unnamed_dir); + mds->mds_unnamed_dir = NULL; + } if (mds->mds_fids_dir != NULL) { l_dput(mds->mds_fids_dir); mds->mds_fids_dir = NULL; @@ -599,9 +621,8 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, { struct mds_obd *mds = &exp->exp_obd->u.mds; struct inode *parent_inode = mds->mds_objects_dir->d_inode; - unsigned int tmpname = ll_insecure_random_int(); struct file *filp; - struct dentry *new_child; + struct dentry *dchild; struct lvfs_run_ctxt saved; char fidname[LL_FID_NAMELEN]; void *handle; @@ -609,17 +630,53 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, ENTRY; push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - - sprintf(fidname, "OBJECTS/%u", tmpname); + down(&parent_inode->i_sem); + if (oa->o_id) { + namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation); + + dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen); + if (IS_ERR(dchild)) + GOTO(out_pop, rc = PTR_ERR(dchild)); + + if (dchild->d_inode == NULL) { + struct dentry_params dp; + struct inode *inode; + + CWARN("creating log with ID "LPU64"\n", oa->o_id); + + dchild->d_fsdata = (void *) &dp; + dp.p_ptr = NULL; + dp.p_inum = oa->o_id; + rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL); + if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id) + dchild->d_fsdata = NULL; + if (rc) { + CDEBUG(D_INODE, "err during create: %d\n", rc); + dput(dchild); + GOTO(out_pop, rc); + } + inode = dchild->d_inode; + LASSERT(inode->i_ino == oa->o_id); + inode->i_generation = oa->o_generation; + CDEBUG(D_HA, "recreated ino %lu with gen %u\n", + inode->i_ino, inode->i_generation); + mark_inode_dirty(inode); + } else { + CWARN("it should be here!\n"); + } + GOTO(out_pop, rc); + } + + sprintf(fidname, "OBJECTS/%u.%u",ll_insecure_random_int(),current->pid); filp = filp_open(fidname, O_CREAT | O_EXCL, 0644); if (IS_ERR(filp)) { rc = PTR_ERR(filp); if (rc == -EEXIST) { - CERROR("impossible object name collision %u\n", - tmpname); + CERROR("impossible object name collision %s\n", + fidname); LBUG(); } - CERROR("error creating tmp object %u: rc %d\n", tmpname, rc); + CERROR("error creating tmp object %s: rc %d\n", fidname, rc); GOTO(out_pop, rc); } @@ -628,15 +685,15 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, oa->o_id = filp->f_dentry->d_inode->i_ino; oa->o_generation = filp->f_dentry->d_inode->i_generation; namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation); + CWARN("created log anonymous "LPU64"/%u\n", oa->o_id, oa->o_generation); - down(&parent_inode->i_sem); - new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen); + dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen); - if (IS_ERR(new_child)) { + if (IS_ERR(dchild)) { CERROR("getting neg dentry for obj rename: %d\n", rc); - GOTO(out_close, rc = PTR_ERR(new_child)); + GOTO(out_close, rc = PTR_ERR(dchild)); } - if (new_child->d_inode != NULL) { + if (dchild->d_inode != NULL) { CERROR("impossible non-negative obj dentry " LPU64":%u!\n", oa->o_id, oa->o_generation); LBUG(); @@ -649,7 +706,7 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, lock_kernel(); rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry, - mds->mds_objects_dir->d_inode, new_child); + mds->mds_objects_dir->d_inode, dchild); unlock_kernel(); if (rc) CERROR("error renaming new object "LPU64":%u: rc %d\n", @@ -660,20 +717,19 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, if (!err) { oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num; oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP; - } - else if (!rc) + } else if (!rc) rc = err; out_dput: - dput(new_child); + dput(dchild); out_close: - up(&parent_inode->i_sem); err = filp_close(filp, 0); if (err) { - CERROR("closing tmpfile %u: rc %d\n", tmpname, rc); + CERROR("closing tmpfile %s: rc %d\n", fidname, rc); if (!rc) rc = err; } out_pop: + up(&parent_inode->i_sem); pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); RETURN(rc); }