From 36a8c0119fa95894815f55f1908e81db7a9e1178 Mon Sep 17 00:00:00 2001 From: tappro Date: Sat, 20 Sep 2008 12:34:11 +0000 Subject: [PATCH] - vbr orphan handling b:15392,15391 i:adilger, bzzz --- lustre/mds/mds_unlink_open.c | 113 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 110 insertions(+), 3 deletions(-) diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index cb9361c..ac50192 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -122,7 +122,7 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild, ENTRY; LASSERT(mds->mds_osc_obd != NULL); - + /* We don't need to do any of these other things for orhpan dirs, * especially not mds_get_md (may get a default LOV EA, bug 4554) */ mode = inode->i_mode; @@ -183,6 +183,19 @@ out_free_lmm: RETURN(rc); } +static __u64 mds_orphans_max_version(struct obd_device *obd) +{ + struct obd_export *exp; + __u32 epoch = lr_epoch(obd->u.mds.mds_last_transno); + spin_lock(&obd->obd_dev_lock); + list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain) { + struct lu_export_data *led = &exp->exp_target_data; + epoch = min(epoch, le32_to_cpu(led->led_lcd->lcd_first_epoch)); + } + spin_unlock(&obd->obd_dev_lock); + return (__u64)epoch << LR_EPOCH_BITS; +} + /* Delete inodes which were previously open-unlinked but were not reopened * during MDS recovery for whatever reason (e.g. client also failed, recovery * aborted, etc). */ @@ -198,6 +211,7 @@ int mds_cleanup_pending(struct obd_device *obd) struct list_head dentry_list; char d_name[LL_FID_NAMELEN]; unsigned long inum; + __u64 max_version; int i = 0, rc = 0, item = 0, namlen; ENTRY; @@ -216,13 +230,19 @@ int mds_cleanup_pending(struct obd_device *obd) if (IS_ERR(file)) GOTO(err_pop, rc = PTR_ERR(file)); - INIT_LIST_HEAD(&dentry_list); + CFS_INIT_LIST_HEAD(&dentry_list); rc = l_readdir(file, &dentry_list); filp_close(file, 0); if (rc < 0) GOTO(err_out, rc); + /** Get maximum version for orphans to delete. All other orphans may be + * needed for delayed clients */ + max_version = mds_orphans_max_version(obd); + list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) { + __u64 version; + i++; list_del(&dirent->lld_list); @@ -230,7 +250,7 @@ int mds_cleanup_pending(struct obd_device *obd) LASSERT(sizeof(d_name) >= namlen + 1); strcpy(d_name, dirent->lld_name); inum = dirent->lld_ino; - OBD_FREE(dirent, sizeof(*dirent)); + OBD_FREE_PTR(dirent); CDEBUG(D_INODE, "entry %d of PENDING DIR: %s\n", i, d_name); @@ -266,6 +286,17 @@ int mds_cleanup_pending(struct obd_device *obd) obd->obd_name, d_name); GOTO(next, rc = 0); } + /** Keep orphans for possible use by delayed exports. Remove + * orphans with version lower than minimal one of all exports */ + version = fsfilt_get_version(obd, child_inode); + if ((__s64)version != -EOPNOTSUPP && + version >= max_version) { + MDS_UP_READ_ORPHAN_SEM(child_inode); + CDEBUG(D_INFO, + "%s: orphan %s is needed for delayed exports\n", + obd->obd_name, d_name); + GOTO(next, rc = 0); + } MDS_UP_READ_ORPHAN_SEM(child_inode); rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir); @@ -296,3 +327,79 @@ err_mntget: l_dput(mds->mds_pending_dir); goto err_pop; } + +/** + * Determine there is no orphan with the same inode number. That may happens + * since unlink replay don't delete inode but keep orphan for delayed clients. + * Therefore replays like 'create, unlink, create' will fail due to inode can't + * be reused. + */ +int mds_check_stale_orphan(struct obd_device *obd, struct ll_fid *fid) +{ + struct mds_obd *mds = &obd->u.mds; + char fidname[32]; + struct dentry *result; + struct inode *inode, *pending_dir = mds->mds_pending_dir->d_inode; + int fidlen = 0, rc = 0; + + /* no need in checks*/ + if (fid->id == 0 || obd->obd_recovering == 0) + RETURN(0); + + /** open by fid like mds_fid2dentry does */ + snprintf(fidname, sizeof(fidname), "0x%lx", (unsigned long)(fid->id)); + fidlen = strlen(fidname); + result = mds_lookup(obd, fidname, mds->mds_fid_de, fidlen); + if (IS_ERR(result)) + RETURN(0); + inode = result->d_inode; + if (!inode) + GOTO(out, rc = 0); + + if (fid->generation && (inode->i_generation == fid->generation)) { + CWARN("The same "LPU64"/%u exists already\n", + fid->id, fid->generation); + GOTO(out, rc = -EFAULT); + } + + LOCK_INODE_MUTEX(pending_dir); + MDS_DOWN_READ_ORPHAN_SEM(inode); + if (mds_inode_is_orphan(inode)) { + struct dentry *orphan; + if (mds_orphan_open_count(inode) > 0) { + CERROR("Orphan "LPU64"/%u is in use!\n", + fid->id, fid->generation); + GOTO(unlock_child, rc = -EFAULT); + } + + /** Found orphan in pending dir and delete it */ + fidlen = ll_fid2str(fidname, fid->id, inode->i_generation); + orphan = lookup_one_len(fidname, mds->mds_pending_dir, fidlen); + if (IS_ERR(orphan)) { + rc = PTR_ERR(orphan); + CERROR("error looking up %s in PENDING: rc = %d\n", + fidname, rc); + GOTO(unlock_child, rc); + } + if (orphan->d_inode != inode) { + l_dput(orphan); + CWARN("%s: Found wrong orphan %s %p/%p\n", + obd->obd_name, fidname, orphan->d_inode, inode); + GOTO(unlock_child, rc = -EFAULT); + } + MDS_UP_READ_ORPHAN_SEM(inode); + + rc = mds_unlink_orphan(obd, orphan, inode, pending_dir); + CDEBUG(D_INODE, "%s: removed orphan %s: rc %d\n", + obd->obd_name, fidname, rc); + l_dput(orphan); + GOTO(unlock, rc); + } +unlock_child: + MDS_UP_READ_ORPHAN_SEM(inode); +unlock: + UNLOCK_INODE_MUTEX(pending_dir); +out: + l_dput(result); + RETURN(0); +} -- 1.8.3.1