X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmds%2Fmds_fs.c;h=6c8b7225a832f7800ba837f60ed7d7ef3cc7cdea;hb=926c6309185a25a8ac1541cfa67910325ed8626f;hp=9bbb11ae3318b05229b9bd0b384d83b11eeaaa22;hpb=4721137e38a657ab5fdccb2b75c7a7d0e3957a4d;p=fs%2Flustre-release.git diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index 9bbb11a..6c8b722 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -41,6 +41,7 @@ #include #include +#include #include "mds_internal.h" /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */ @@ -95,18 +96,19 @@ int mds_client_add(struct obd_device *obd, struct mds_obd *mds, cl_idx, med->med_mcd->mcd_uuid); med->med_idx = cl_idx; - med->med_off = MDS_LR_CLIENT_START + (cl_idx * MDS_LR_CLIENT_SIZE); + med->med_off = le32_to_cpu(mds->mds_server_data->msd_client_start) + + (cl_idx * le16_to_cpu(mds->mds_server_data->msd_client_size)); if (new_client) { - struct obd_run_ctxt saved; + struct lvfs_run_ctxt saved; loff_t off = med->med_off; struct file *file = mds->mds_rcvd_filp; int rc; - push_ctxt(&saved, &obd->obd_ctxt, NULL); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = fsfilt_write_record(obd, file, med->med_mcd, sizeof(*med->med_mcd), &off, 1); - pop_ctxt(&saved, &obd->obd_ctxt, NULL); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (rc) return rc; @@ -123,11 +125,10 @@ int mds_client_free(struct obd_export *exp, int clear_client) struct mds_obd *mds = &exp->exp_obd->u.mds; struct obd_device *obd = exp->exp_obd; struct mds_client_data zero_mcd; - struct obd_run_ctxt saved; + struct lvfs_run_ctxt saved; int rc; unsigned long *bitmap = mds->mds_client_bitmap; - LASSERT(bitmap); if (!med->med_mcd) RETURN(0); @@ -138,6 +139,8 @@ int mds_client_free(struct obd_export *exp, int clear_client) CDEBUG(D_INFO, "freeing client at idx %u (%lld)with UUID '%s'\n", med->med_idx, med->med_off, med->med_mcd->mcd_uuid); + LASSERT(bitmap); + /* Clear the bit _after_ zeroing out the client so we don't race with mds_client_add and zero out new clients.*/ if (!test_bit(med->med_idx, bitmap)) { @@ -148,10 +151,10 @@ int mds_client_free(struct obd_export *exp, int clear_client) if (clear_client) { memset(&zero_mcd, 0, sizeof zero_mcd); - push_ctxt(&saved, &obd->obd_ctxt, NULL); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd, sizeof(zero_mcd), &med->med_off, 1); - pop_ctxt(&saved, &obd->obd_ctxt, NULL); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); CDEBUG(rc == 0 ? D_INFO : D_ERROR, "zeroing out client %s idx %u in %s rc %d\n", @@ -164,6 +167,12 @@ int mds_client_free(struct obd_export *exp, int clear_client) LBUG(); } + + /* Make sure the server's last_transno is up to date. Do this + * after the client is freed so we know all the client's + * transactions have been committed. */ + mds_update_server_data(exp->exp_obd, 1); + free_and_out: OBD_FREE(med->med_mcd, sizeof(*med->med_mcd)); @@ -192,10 +201,10 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) ENTRY; /* ensure padding in the struct is the correct size */ - LASSERT (offsetof(struct mds_server_data, msd_padding) + - sizeof(msd->msd_padding) == MDS_LR_SERVER_SIZE); - LASSERT (offsetof(struct mds_client_data, mcd_padding) + - sizeof(mcd->mcd_padding) == MDS_LR_CLIENT_SIZE); + LASSERT(offsetof(struct mds_server_data, msd_padding) + + sizeof(msd->msd_padding) == MDS_LR_SERVER_SIZE); + LASSERT(offsetof(struct mds_client_data, mcd_padding) + + sizeof(mcd->mcd_padding) == MDS_LR_CLIENT_SIZE); OBD_ALLOC_WAIT(msd, sizeof(*msd)); if (!msd) @@ -215,7 +224,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid)); msd->msd_last_transno = 0; - mount_count = msd->msd_mount_count = 0; + mount_count = msd->msd_mount_count = 0; msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE); msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START); msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE); @@ -264,8 +273,9 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n", obd->obd_name, last_rcvd_size); CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name, - last_rcvd_size <= MDS_LR_CLIENT_START ? 0 : - (last_rcvd_size - MDS_LR_CLIENT_START) / MDS_LR_CLIENT_SIZE); + last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 : + (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) / + le16_to_cpu(msd->msd_client_size)); /* When we do a clean MDS shutdown, we save the last_transno into * the header. If we find clients with higher last_transno values @@ -305,9 +315,10 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) /* These exports are cleaned up by mds_disconnect(), so they * need to be set up like real exports as mds_connect() does. */ - CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 - " srv lr: "LPU64"\n", mcd->mcd_uuid, cl_idx, - last_transno, le64_to_cpu(msd->msd_last_transno)); + CDEBUG(D_HA|D_WARNING,"RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 + " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx, + last_transno, le64_to_cpu(msd->msd_last_transno), + mcd->mcd_last_xid); exp = class_new_export(obd); if (exp == NULL) @@ -323,6 +334,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) spin_lock_init(&med->med_open_lock); mcd = NULL; + exp->exp_replay_needed = 1; obd->obd_recoverable_clients++; obd->obd_max_recoverable_clients++; class_export_put(exp); @@ -340,7 +352,7 @@ static int mds_read_last_rcvd(struct obd_device *obd, struct file *file) "last_transno "LPU64"\n", obd->obd_name, obd->obd_recoverable_clients, mds->mds_last_transno); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_recovering = 1; + target_start_recovery_thread(obd, mds_handle); } if (mcd) @@ -361,10 +373,30 @@ err_msd: RETURN(rc); } +static int mds_fs_post_setup(struct obd_device *obd) +{ + struct mds_obd *mds = &obd->u.mds; + struct dentry *de = mds_fid2dentry(mds, &mds->mds_rootfid, NULL); + int rc = 0; + + rc = fsfilt_post_setup(obd, de); + if (rc) + GOTO(out, rc); + + fsfilt_set_fs_flags(obd, de->d_inode, + SM_DO_REC | SM_DO_COW); + fsfilt_set_fs_flags(obd, mds->mds_pending_dir->d_inode, + SM_DO_REC | SM_DO_COW); + fsfilt_set_mds_flags(obd, mds->mds_sb); +out: + l_dput(de); + return rc; +} + int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) { struct mds_obd *mds = &obd->u.mds; - struct obd_run_ctxt saved; + struct lvfs_run_ctxt saved; struct dentry *dentry; struct file *file; int rc; @@ -379,14 +411,14 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) fsfilt_setup(obd, mds->mds_sb); - OBD_SET_CTXT_MAGIC(&obd->obd_ctxt); - obd->obd_ctxt.pwdmnt = mnt; - obd->obd_ctxt.pwd = mnt->mnt_root; - obd->obd_ctxt.fs = get_ds(); - obd->obd_ctxt.cb_ops = mds_lvfs_ops; + OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); + obd->obd_lvfs_ctxt.pwdmnt = mnt; + obd->obd_lvfs_ctxt.pwd = mnt->mnt_root; + obd->obd_lvfs_ctxt.fs = get_ds(); + obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops; /* setup the directory tree */ - push_ctxt(&saved, &obd->obd_ctxt, NULL); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755, 0); if (IS_ERR(dentry)) { rc = PTR_ERR(dentry); @@ -402,11 +434,16 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) dentry = lookup_one_len("__iopen__", current->fs->pwd, strlen("__iopen__")); - if (IS_ERR(dentry) || !dentry->d_inode) { - rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT; - CERROR("cannot open iopen FH directory: rc = %d\n", rc); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc); GOTO(err_pop, rc); } + if (!dentry->d_inode) { + rc = -ENOENT; + CERROR("__iopen__ directory has no inode? rc = %d\n", rc); + GOTO(err_fid, rc); + } mds->mds_fid_de = dentry; dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777, 1); @@ -416,7 +453,7 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) GOTO(err_fid, rc); } mds->mds_pending_dir = dentry; - + dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777, 1); if (IS_ERR(dentry)) { rc = PTR_ERR(dentry); @@ -433,6 +470,22 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) } mds->mds_objects_dir = dentry; + dentry = simple_mkdir(current->fs->pwd, "FIDS", 0777, 1); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot create FIDS directory: rc = %d\n", rc); + GOTO(err_fids, rc); + } + mds->mds_fids_dir = dentry; + + dentry = simple_mkdir(current->fs->pwd, "UNNAMED", 0777, 1); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot create UNNAMED directory: rc = %d\n", rc); + GOTO(err_unnamed, rc); + } + mds->mds_unnamed_dir = dentry; + /* open and test the last rcvd file */ file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644); if (IS_ERR(file)) { @@ -467,8 +520,12 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) GOTO(err_lov_objid, rc = -ENOENT); } err_pop: - pop_ctxt(&saved, &obd->obd_ctxt, NULL); - + if (!rc) { + rc = mds_fs_post_setup(obd); + if (rc) + CERROR("can not post setup fsfilt\n"); + } + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); return rc; err_lov_objid: @@ -479,6 +536,10 @@ err_client: err_last_rcvd: if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0)) CERROR("can't close %s after error\n", LAST_RCVD); +err_unnamed: + dput(mds->mds_unnamed_dir); +err_fids: + dput(mds->mds_fids_dir); err_objects: dput(mds->mds_objects_dir); err_logs: @@ -490,11 +551,17 @@ err_fid: goto err_pop; } +static int mds_fs_post_cleanup(struct obd_device *obd) +{ + int rc = 0; + rc = fsfilt_post_cleanup(obd); + return rc; +} int mds_fs_cleanup(struct obd_device *obd, int flags) { struct mds_obd *mds = &obd->u.mds; - struct obd_run_ctxt saved; + struct lvfs_run_ctxt saved; int rc = 0; if (flags & OBD_OPT_FAILOVER) @@ -504,7 +571,7 @@ int mds_fs_cleanup(struct obd_device *obd, int flags) class_disconnect_exports(obd, flags); /* cleans up client info too */ mds_server_free_data(mds); - push_ctxt(&saved, &obd->obd_ctxt, NULL); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (mds->mds_rcvd_filp) { rc = filp_close(mds->mds_rcvd_filp, 0); mds->mds_rcvd_filp = NULL; @@ -517,6 +584,14 @@ int mds_fs_cleanup(struct obd_device *obd, int flags) if (rc) CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc); } + if (mds->mds_unnamed_dir != NULL) { + l_dput(mds->mds_unnamed_dir); + mds->mds_unnamed_dir = NULL; + } + if (mds->mds_fids_dir != NULL) { + l_dput(mds->mds_fids_dir); + mds->mds_fids_dir = NULL; + } if (mds->mds_objects_dir != NULL) { l_dput(mds->mds_objects_dir); mds->mds_objects_dir = NULL; @@ -529,7 +604,9 @@ int mds_fs_cleanup(struct obd_device *obd, int flags) l_dput(mds->mds_pending_dir); mds->mds_pending_dir = NULL; } - pop_ctxt(&saved, &obd->obd_ctxt, NULL); + rc = mds_fs_post_cleanup(obd); + + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); shrink_dcache_parent(mds->mds_fid_de); dput(mds->mds_fid_de); @@ -540,31 +617,64 @@ int mds_fs_cleanup(struct obd_device *obd, int flags) * performance sensitive, it is accomplished by creating a file, checking the * fid, and renaming it. */ int mds_obd_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) + struct lov_stripe_md **ea, struct obd_trans_info *oti) { struct mds_obd *mds = &exp->exp_obd->u.mds; struct inode *parent_inode = mds->mds_objects_dir->d_inode; - unsigned int tmpname = ll_insecure_random_int(); struct file *filp; - struct dentry *new_child; - struct obd_run_ctxt saved; + struct dentry *dchild; + struct lvfs_run_ctxt saved; char fidname[LL_FID_NAMELEN]; void *handle; int rc = 0, err, namelen; ENTRY; - push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); - - sprintf(fidname, "OBJECTS/%u", tmpname); + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); + down(&parent_inode->i_sem); + if (oa->o_id) { + namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation); + + dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen); + if (IS_ERR(dchild)) + GOTO(out_pop, rc = PTR_ERR(dchild)); + + if (dchild->d_inode == NULL) { + struct dentry_params dp; + struct inode *inode; + + dchild->d_fsdata = (void *) &dp; + dp.p_ptr = NULL; + dp.p_inum = oa->o_id; + rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL); + if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id) + dchild->d_fsdata = NULL; + if (rc) { + CDEBUG(D_INODE, "err during create: %d\n", rc); + dput(dchild); + GOTO(out_pop, rc); + } + inode = dchild->d_inode; + LASSERT(inode->i_ino == oa->o_id); + inode->i_generation = oa->o_generation; + CDEBUG(D_HA, "recreated ino %lu with gen %u\n", + inode->i_ino, inode->i_generation); + mark_inode_dirty(inode); + } else { + CWARN("it should be here!\n"); + } + GOTO(out_pop, rc); + } + + sprintf(fidname, "OBJECTS/%u.%u",ll_insecure_random_int(),current->pid); filp = filp_open(fidname, O_CREAT | O_EXCL, 0644); if (IS_ERR(filp)) { rc = PTR_ERR(filp); if (rc == -EEXIST) { - CERROR("impossible object name collision %u\n", - tmpname); + CERROR("impossible object name collision %s\n", + fidname); LBUG(); } - CERROR("error creating tmp object %u: rc %d\n", tmpname, rc); + CERROR("error creating tmp object %s: rc %d\n", fidname, rc); GOTO(out_pop, rc); } @@ -574,14 +684,13 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, oa->o_generation = filp->f_dentry->d_inode->i_generation; namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation); - down(&parent_inode->i_sem); - new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen); + dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen); - if (IS_ERR(new_child)) { + if (IS_ERR(dchild)) { CERROR("getting neg dentry for obj rename: %d\n", rc); - GOTO(out_close, rc = PTR_ERR(new_child)); + GOTO(out_close, rc = PTR_ERR(dchild)); } - if (new_child->d_inode != NULL) { + if (dchild->d_inode != NULL) { CERROR("impossible non-negative obj dentry " LPU64":%u!\n", oa->o_id, oa->o_generation); LBUG(); @@ -589,35 +698,36 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode, FSFILT_OP_RENAME, NULL); - if (IS_ERR(handle)) + if (IS_ERR(handle)) GOTO(out_dput, rc = PTR_ERR(handle)); - + lock_kernel(); rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry, - mds->mds_objects_dir->d_inode, new_child); + mds->mds_objects_dir->d_inode, dchild); unlock_kernel(); if (rc) CERROR("error renaming new object "LPU64":%u: rc %d\n", oa->o_id, oa->o_generation, rc); - err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode, - handle, 0); - if (!err) - oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER; - else if (!rc) + err = fsfilt_commit(exp->exp_obd, mds->mds_sb, + mds->mds_objects_dir->d_inode, handle, 0); + if (!err) { + oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num; + oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP; + } else if (!rc) rc = err; out_dput: - dput(new_child); + dput(dchild); out_close: - up(&parent_inode->i_sem); err = filp_close(filp, 0); if (err) { - CERROR("closing tmpfile %u: rc %d\n", tmpname, rc); + CERROR("closing tmpfile %s: rc %d\n", fidname, rc); if (!rc) rc = err; } out_pop: - pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); + up(&parent_inode->i_sem); + pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); RETURN(rc); } @@ -627,42 +737,46 @@ int mds_obd_destroy(struct obd_export *exp, struct obdo *oa, struct mds_obd *mds = &exp->exp_obd->u.mds; struct inode *parent_inode = mds->mds_objects_dir->d_inode; struct obd_device *obd = exp->exp_obd; - struct obd_run_ctxt saved; + struct lvfs_run_ctxt saved; char fidname[LL_FID_NAMELEN]; struct dentry *de; void *handle; int err, namelen, rc = 0; ENTRY; - push_ctxt(&saved, &obd->obd_ctxt, NULL); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation); down(&parent_inode->i_sem); de = lookup_one_len(fidname, mds->mds_objects_dir, namelen); if (de == NULL || de->d_inode == NULL) { - CERROR("destroying non-existent object "LPU64"\n", oa->o_id); + CERROR("destroying non-existent object "LPU64" %s\n", + oa->o_id, fidname); GOTO(out_dput, rc = IS_ERR(de) ? PTR_ERR(de) : -ENOENT); } - handle = fsfilt_start(obd, mds->mds_objects_dir->d_inode, - FSFILT_OP_UNLINK_LOG, oti); - if (IS_ERR(handle)) { + /* Stripe count is 1 here since this is some MDS specific stuff + that is unlinked, not spanned across multiple OSTs */ + handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode, + FSFILT_OP_UNLINK, oti, 1); + + if (IS_ERR(handle)) GOTO(out_dput, rc = PTR_ERR(handle)); - } rc = vfs_unlink(mds->mds_objects_dir->d_inode, de); if (rc) CERROR("error destroying object "LPU64":%u: rc %d\n", oa->o_id, oa->o_generation, rc); - err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0); + err = fsfilt_commit(obd, mds->mds_sb, mds->mds_objects_dir->d_inode, + handle, 0); if (err && !rc) rc = err; out_dput: if (de != NULL) l_dput(de); up(&parent_inode->i_sem); - pop_ctxt(&saved, &obd->obd_ctxt, NULL); + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); RETURN(rc); }