struct dentry *pending_child;
char fidname[LL_FID_NAMELEN];
int fidlen = 0, rc;
+ unsigned mode;
ENTRY;
LASSERT(!mds_inode_is_orphan(dchild->d_inode));
fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
dchild->d_inode->i_generation);
- CWARN("pending destroy of %dx open file %s = %s\n",
- mds_open_orphan_count(dchild->d_inode),
- rec->ur_name, fidname);
+ CDEBUG(D_HA, "pending destroy of %dx open file %s = %s\n",
+ mds_open_orphan_count(dchild->d_inode), rec->ur_name, fidname);
pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
if (IS_ERR(pending_child))
GOTO(out_dput, rc = 0);
}
- *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
- if (IS_ERR(*handle))
- GOTO(out_dput, rc = PTR_ERR(*handle));
-
- lock_kernel();
- rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
- unlock_kernel();
+ /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
+ * for linking and return real mode back then -bzzz */
+ mode = dchild->d_inode->i_mode;
+ dchild->d_inode->i_mode = S_IFREG;
+ rc = vfs_link(dchild, pending_dir, pending_child);
if (rc)
- CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
- dparent->d_inode->i_ino, rec->ur_name, rc);
+ CERROR("error linking orphan %s to PENDING: rc = %d\n",
+ rec->ur_name, rc);
else
mds_inode_set_orphan(dchild->d_inode);
+
+ /* return mode and correct i_nlink if inode is directory */
+ LASSERT(dchild->d_inode->i_nlink == 1);
+ dchild->d_inode->i_mode = mode;
+ if ((mode & S_IFMT) == S_IFDIR) {
+ dchild->d_inode->i_nlink++;
+ pending_dir->i_nlink++;
+ }
+ mark_inode_dirty(dchild->d_inode);
+
out_dput:
dput(pending_child);
out_lock:
RETURN(rc);
}
-static int mds_osc_destroy_orphan(struct mds_obd *mds,
- struct ptlrpc_request *request)
+static int mds_osc_destroy_orphan(struct mds_obd *mds,
+ struct inode *inode,
+ struct lov_mds_md *lmm,
+ int lmm_size,
+ struct llog_cookie *logcookies,
+ int log_unlink)
{
- struct mds_body *body;
- struct lov_mds_md *lmm = NULL;
struct lov_stripe_md *lsm = NULL;
struct obd_trans_info oti = { 0 };
struct obdo *oa;
int rc;
ENTRY;
- body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
- if (!(body->valid & OBD_MD_FLEASIZE))
+ if (lmm_size == 0)
RETURN(0);
- if (body->eadatasize == 0) {
- CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
- RETURN(rc = -EPROTO);
- }
-
- lmm = lustre_msg_buf(request->rq_repmsg, 1, body->eadatasize);
- LASSERT(lmm != NULL);
- rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize);
+ rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
if (rc < 0) {
CERROR("Error unpack md %p\n", lmm);
RETURN(rc);
if (oa == NULL)
GOTO(out_free_memmd, rc = -ENOMEM);
oa->o_id = lsm->lsm_object_id;
- oa->o_mode = body->mode & S_IFMT;
- oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
+ oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+ oa->o_mode = inode->i_mode & S_IFMT;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
-#ifdef ENABLE_ORPHANS
- if (body->valid & OBD_MD_FLCOOKIE) {
+ if (log_unlink && logcookies) {
oa->o_valid |= OBD_MD_FLCOOKIE;
- oti.oti_logcookies =
- lustre_msg_buf(request->rq_repmsg, 2,
- sizeof(struct llog_cookie) *
- lsm->lsm_stripe_count);
- if (oti.oti_logcookies == NULL)
- oa->o_valid &= ~OBD_MD_FLCOOKIE;
- body->valid &= ~OBD_MD_FLCOOKIE;
+ oti.oti_logcookies = logcookies;
}
-#endif
rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
obdo_free(oa);
- if (rc)
- CERROR("destroy orphan objid 0x"LPX64" on ost error "
+ if (rc)
+ CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
"%d\n", lsm->lsm_object_id, rc);
out_free_memmd:
obd_free_memmd(mds->mds_osc_exp, &lsm);
struct inode *inode, struct inode *pending_dir)
{
struct mds_obd *mds = &obd->u.mds;
- struct mds_body *body;
+ struct lov_mds_md *lmm = NULL;
+ struct llog_cookie *logcookies = NULL;
+ int lmm_size = 0, log_unlink = 0;
void *handle = NULL;
- struct ptlrpc_request *req;
- int lengths[3] = {sizeof(struct mds_body),
- mds->mds_max_mdsize,
- mds->mds_max_cookiesize};
- int rc;
+ int rc, err;
ENTRY;
LASSERT(mds->mds_osc_obd != NULL);
- OBD_ALLOC(req, sizeof(*req));
- if (!req) {
- CERROR("request allocation out of memory\n");
- GOTO(err_alloc_req, rc = -ENOMEM);
- }
- rc = lustre_pack_reply(req, 3, lengths, NULL);
- if (rc) {
- CERROR("cannot pack request %d\n", rc);
- GOTO(out_free_req, rc);
- }
- body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
- LASSERT(body != NULL);
- mds_pack_inode2body(body, inode);
- mds_pack_md(obd, req->rq_repmsg, 1, body, inode, 1);
+ OBD_ALLOC(lmm, mds->mds_max_mdsize);
+ if (lmm == NULL)
+ RETURN(-ENOMEM);
+
+ down(&inode->i_sem);
+ rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
+ up(&inode->i_sem);
+
+ if (rc < 0) {
+ CERROR("Error %d reading eadata for ino %lu\n",
+ rc, inode->i_ino);
+ GOTO(out_free_lmm, rc);
+ } else if (rc > 0) {
+ lmm_size = rc;
+ rc = mds_convert_lov_ea(obd, inode, lmm, lmm_size);
+ if (rc > 0)
+ lmm_size = rc;
+ rc = 0;
+ }
- handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK_LOG, NULL);
+ handle = fsfilt_start_log(obd, pending_dir, FSFILT_OP_UNLINK, NULL,
+ le32_to_cpu(lmm->lmm_stripe_count));
if (IS_ERR(handle)) {
rc = PTR_ERR(handle);
CERROR("error fsfilt_start: %d\n", rc);
handle = NULL;
- GOTO(out_free_msg, rc);
+ GOTO(out_free_lmm, rc);
+ }
+
+ down(&inode->i_sem);
+ rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
+ up(&inode->i_sem);
+
+ if (rc < 0) {
+ CERROR("Error %d reading eadata for ino %lu\n",
+ rc, inode->i_ino);
+ GOTO(out_free_lmm, rc);
+ } else if (rc > 0) {
+ lmm_size = rc;
+ rc = 0;
}
- rc = vfs_unlink(pending_dir, dchild);
- if (rc)
+
+ if (S_ISDIR(inode->i_mode))
+ rc = vfs_rmdir(pending_dir, dchild);
+ else
+ rc = vfs_unlink(pending_dir, dchild);
+
+ if (rc)
CERROR("error %d unlinking orphan %*s from PENDING directory\n",
rc, dchild->d_name.len, dchild->d_name.name);
-#ifdef ENABLE_ORPHANS
- if ((body->valid & OBD_MD_FLEASIZE)) {
- if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0)
- body->valid |= OBD_MD_FLCOOKIE;
+ if (!rc && lmm_size) {
+ OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
+ if (logcookies == NULL)
+ rc = -ENOMEM;
+ else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies,
+ mds->mds_max_cookiesize) > 0)
+ log_unlink = 1;
}
-#endif
- if (handle) {
- int err = fsfilt_commit(obd, pending_dir, handle, 0);
- if (err) {
- CERROR("error committing orphan unlink: %d\n", err);
+ err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0);
+ if (err) {
+ CERROR("error committing orphan unlink: %d\n", err);
+ if (!rc)
rc = err;
- GOTO(out_free_msg, rc);
- }
}
- rc = mds_osc_destroy_orphan(mds, req);
-out_free_msg:
- OBD_FREE(req->rq_repmsg, req->rq_replen);
- req->rq_repmsg = NULL;
-out_free_req:
- OBD_FREE(req, sizeof(*req));
-err_alloc_req:
+ if (!rc) {
+ rc = mds_osc_destroy_orphan(mds, inode, lmm, lmm_size,
+ logcookies, log_unlink);
+ }
+
+ if (logcookies != NULL)
+ OBD_FREE(logcookies, mds->mds_max_cookiesize);
+out_free_lmm:
+ OBD_FREE(lmm, mds->mds_max_mdsize);
RETURN(rc);
}
int mds_cleanup_orphans(struct obd_device *obd)
{
struct mds_obd *mds = &obd->u.mds;
- struct obd_run_ctxt saved;
+ struct lvfs_run_ctxt saved;
struct file *file;
- struct dentry *dchild;
+ struct dentry *dchild, *dentry;
+ struct vfsmount *mnt;
struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
- struct l_linux_dirent *dirent, *ptr;
- unsigned int count = pending_dir->i_size;
- int rc = 0, rc2 = 0, item = 0;
+ struct l_linux_dirent *dirent, *n;
+ struct list_head dentry_list;
+ char d_name[LL_FID_NAMELEN];
+ __u64 i = 0;
+ int rc = 0, item = 0, namlen;
ENTRY;
- push_ctxt(&saved, &obd->obd_ctxt, NULL);
- dget(mds->mds_pending_dir);
- mntget(mds->mds_vfsmnt);
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ dentry = dget(mds->mds_pending_dir);
+ if (IS_ERR(dentry))
+ GOTO(err_pop, rc = PTR_ERR(dentry));
+ mnt = mntget(mds->mds_vfsmnt);
+ if (IS_ERR(mnt))
+ GOTO(err_mntget, rc = PTR_ERR(mnt));
+
file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt,
O_RDONLY | O_LARGEFILE);
if (IS_ERR(file))
- GOTO(err_open, rc2 = PTR_ERR(file));
+ GOTO(err_pop, rc = PTR_ERR(file));
- OBD_ALLOC(dirent, count);
- if (dirent == NULL)
- GOTO(err_alloc_dirent, rc2 = -ENOMEM);
-
- rc = l_readdir(file, dirent, count);
+ INIT_LIST_HEAD(&dentry_list);
+ rc = l_readdir(file, &dentry_list);
filp_close(file, 0);
if (rc < 0)
- GOTO(err_out, rc2 = rc);
+ GOTO(err_out, rc);
+
+ list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
+ i ++;
+ list_del(&dirent->lld_list);
+
+ namlen = strlen(dirent->lld_name);
+ LASSERT(sizeof(d_name) >= namlen + 1);
+ strcpy(d_name, dirent->lld_name);
+ OBD_FREE(dirent, sizeof(*dirent));
- for (ptr = dirent; (char *)ptr < (char *)dirent + rc;
- (char *)ptr += ptr->d_reclen) {
- int namlen = strlen(ptr->d_name);
+ CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
+ i, d_name);
- if (((namlen == 1) && !strcmp(ptr->d_name, ".")) ||
- ((namlen == 2) && !strcmp(ptr->d_name, "..")))
+ if (((namlen == 1) && !strcmp(d_name, ".")) ||
+ ((namlen == 2) && !strcmp(d_name, ".."))) {
continue;
+ }
down(&pending_dir->i_sem);
- dchild = lookup_one_len(ptr->d_name, mds->mds_pending_dir,
- namlen);
+ dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
if (IS_ERR(dchild)) {
up(&pending_dir->i_sem);
- GOTO(err_out, rc2 = PTR_ERR(dchild));
+ GOTO(err_out, rc = PTR_ERR(dchild));
}
if (!dchild->d_inode) {
- CDEBUG(D_ERROR, "orphan %s has been removed\n",
- ptr->d_name);
- GOTO(next, rc2 = 0);
+ CERROR("orphan %s has been removed\n", d_name);
+ GOTO(next, rc = 0);
}
child_inode = dchild->d_inode;
if (mds_inode_is_orphan(child_inode) &&
mds_open_orphan_count(child_inode)) {
- CWARN("orphan %s was re-opened during recovery\n",
- ptr->d_name);
- GOTO(next, rc2 = 0);
+ CWARN("orphan %s was re-opened during recovery\n", d_name);
+ GOTO(next, rc = 0);
}
- rc2 = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
- if (rc2 == 0) {
+ rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
+ if (rc == 0) {
item ++;
- CWARN("removed orphan %s from MDS and OST\n",
- ptr->d_name);
+ CWARN("removed orphan %s from MDS and OST\n", d_name);
} else {
- l_dput(dchild);
- up(&pending_dir->i_sem);
- GOTO(err_out, rc2);
+ CDEBUG(D_INODE, "removed orphan %s from MDS/OST failed,"
+ " rc = %d\n", d_name, rc);
+ rc = 0;
}
next:
l_dput(dchild);
up(&pending_dir->i_sem);
}
err_out:
- OBD_FREE(dirent, count);
+ list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
+ list_del(&dirent->lld_list);
+ OBD_FREE(dirent, sizeof(*dirent));
+ }
err_pop:
- pop_ctxt(&saved, &obd->obd_ctxt, NULL);
- if (rc2 == 0)
- rc2 = item;
-
- RETURN(rc2);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ if (rc == 0)
+ rc = item;
+ RETURN(rc);
-err_open:
- mntput(mds->mds_vfsmnt);
+err_mntget:
l_dput(mds->mds_pending_dir);
goto err_pop;
-err_alloc_dirent:
- filp_close(file, 0);
- goto err_pop;
}