From: tianying Date: Mon, 13 Oct 2003 11:06:24 +0000 (+0000) Subject: Make two functions for mds_cleanup_orphans, and put mds_log_op_unlink into X-Git-Tag: v1_7_0_51~2^7~410 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=a8f93e8dadb894b3e460172273b057498b910a84;p=fs%2Flustre-release.git Make two functions for mds_cleanup_orphans, and put mds_log_op_unlink into a transaction, remove unused mds_unlink_orphan func. --- diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 03cca52..0d55df4 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -457,8 +457,8 @@ void target_abort_recovery(void *data) OBP(obd, postsetup)(obd); /* when recovery was abort, cleanup orphans for mds */ - if (OBT(obd) && OBP(obd, postcleanup)) { - rc = OBP(obd, postcleanup)(obd); + if (OBT(obd) && OBP(obd, postrecov)) { + rc = OBP(obd, postrecov)(obd); CERROR("Cleanup %d orphans after recovery was abort!\n", rc); } @@ -742,17 +742,17 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) obd->obd_name); obd->obd_recovering = 0; + if (OBT(obd) && OBP(obd, postsetup)) + OBP(obd, postsetup)(obd); + /* when recovering finished, cleanup orphans for mds */ - /* there should be no orphan cleaned up for this condition */ - if (OBT(obd) && OBP(obd, postcleanup)) { + if (OBT(obd) && OBP(obd, postrecov)) { CERROR("cleanup orphans after all clients recovered\n"); - rc2 = OBP(obd, postcleanup)(obd); - LASSERT(rc2 == 0); + rc2 = OBP(obd, postrecov)(obd); + //LASSERT(rc2 == 0); + CERROR("cleanup %d orphans\n", rc2); } - if (OBT(obd) && OBP(obd, postsetup)) - OBP(obd, postsetup)(obd); - list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); DEBUG_REQ(D_ERROR, req, "delayed:"); diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index 351969f..491fda5 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -101,6 +101,129 @@ out_lock: RETURN(rc); } +static int mds_osc_destroy(struct mds_obd *mds, struct ptlrpc_request *request) +{ + struct mds_body *body; + struct lov_mds_md *lmm = NULL; + struct lov_stripe_md *lsm = NULL; + struct obd_trans_info oti = { 0 }; + struct obdo *oa; + int rc; + ENTRY; + + body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body)); + if (!(body->valid & OBD_MD_FLEASIZE)) + RETURN(0); + if (body->eadatasize == 0) { + CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n"); + RETURN(rc = -EPROTO); + } + + lmm = lustre_msg_buf(request->rq_repmsg, 1, body->eadatasize); + LASSERT(lmm != NULL); + + rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize); + if (rc < 0) { + CERROR("Error unpack md %p\n", lmm); + RETURN(rc); + } else { + LASSERT(rc >= sizeof(*lsm)); + rc = 0; + } + + oa = obdo_alloc(); + if (oa == NULL) + GOTO(out_free_memmd, rc = -ENOMEM); + oa->o_id = lsm->lsm_object_id; + oa->o_mode = body->mode & S_IFMT; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE; + +#ifdef ENABLE_ORPHANS + if (body->valid & OBD_MD_FLCOOKIE) { + oa->o_valid |= OBD_MD_FLCOOKIE; + oti.oti_logcookies = + lustre_msg_buf(request->rq_repmsg, 2, + sizeof(struct llog_cookie) * + lsm->lsm_stripe_count); + if (oti.oti_logcookies == NULL) + oa->o_valid &= ~OBD_MD_FLCOOKIE; + body->valid &= ~OBD_MD_FLCOOKIE; + } +#endif + + rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti); + obdo_free(oa); + if (rc) + CERROR("destroy orphan objid 0x"LPX64" on ost error " + "%d\n", lsm->lsm_object_id, rc); +out_free_memmd: + obd_free_memmd(mds->mds_osc_exp, &lsm); + RETURN(rc); +} + +static int mds_unlink(struct obd_device *obd, struct dentry *dchild, + struct inode *inode, struct inode *pending_dir) +{ + struct mds_obd *mds = &obd->u.mds; + struct mds_body *body; + void *handle = NULL; + struct ptlrpc_request *req; + int lengths[3] = {sizeof(struct mds_body), + mds->mds_max_mdsize, + mds->mds_max_cookiesize}; + int rc; + ENTRY; + + LASSERT(mds->mds_osc_obd != NULL); + OBD_ALLOC(req, sizeof(*req)); + if (!req) { + CERROR("request allocation out of memory\n"); + GOTO(err_alloc_req, rc = -ENOMEM); + } + rc = lustre_pack_msg(3, lengths, NULL, &req->rq_replen, + &req->rq_repmsg); + if (rc) { + CERROR("cannot pack request %d\n", rc); + GOTO(out_free_req, rc); + } + body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); + LASSERT(body != NULL); + + mds_pack_inode2body(body, inode); + mds_pack_md(obd, req->rq_repmsg, 1, body, inode); + + handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK_LOG, NULL); + if (IS_ERR(handle)) { + rc = PTR_ERR(handle); + CERROR("error fsfilt_start: %d\n", rc); + handle = NULL; + GOTO(out_free_req, rc); + } + rc = vfs_unlink(pending_dir, dchild); + if (rc) + CERROR("error unlinking orphan from PENDING directory"); + +#ifdef ENABLE_ORPHANS + if ((body->valid & OBD_MD_FLEASIZE)) { + if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0) + body->valid |= OBD_MD_FLCOOKIE; + } +#endif + if (handle) { + int err = fsfilt_commit(obd, pending_dir, handle, 0); + if (err) { + CERROR("error committing orphan unlink: %d\n", + err); + rc = err; + GOTO(out_free_req, rc); + } + } + rc = mds_osc_destroy(mds, req); +out_free_req: + OBD_FREE(req, sizeof(*req)); +err_alloc_req: + RETURN(rc); +} int mds_cleanup_orphans(struct obd_device *obd) { @@ -111,16 +234,6 @@ int mds_cleanup_orphans(struct obd_device *obd) struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode; struct l_linux_dirent *dirent, *ptr; unsigned int count = pending_dir->i_size; - void *handle = NULL; - struct lov_mds_md *lmm = NULL; - struct lov_stripe_md *lsm = NULL; - struct obd_trans_info oti = { 0 }; - struct obdo *oa; - struct ptlrpc_request *req; - struct mds_body *body; - int lengths[3] = {sizeof(struct mds_body), - mds->mds_max_mdsize, - mds->mds_max_cookiesize}; int rc = 0, rc2 = 0, item = 0; ENTRY; @@ -172,105 +285,18 @@ int mds_cleanup_orphans(struct obd_device *obd) GOTO(next, rc2 = 0); } - CDEBUG(D_ERROR, "cleanup orphan %s start on mds and ost:\n", + CDEBUG(D_ERROR, "start to remove %s from mds and ost!\n", ptr->d_name); - - LASSERT(mds->mds_osc_obd != NULL); - - OBD_ALLOC(req, sizeof(*req)); - if (!req) { - CERROR("request allocation out of memory\n"); - GOTO(err_lov_conn, rc2 = -ENOMEM); - } - rc2 = lustre_pack_msg(3, lengths, NULL, &req->rq_replen, - &req->rq_repmsg); - if (rc2) { - CERROR("cannot pack request %d\n", rc2); - OBD_FREE(req, sizeof(*req)); - GOTO(out_free_req, rc2); - } - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); - LASSERT(body != NULL); - - mds_pack_inode2body(body, child_inode); - mds_pack_md(obd, req->rq_repmsg, 1, body, child_inode); - lmm = lustre_msg_buf(req->rq_repmsg, 1, 0); - -#ifdef ENABLE_ORPHANS - if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg, 1) > 0) - oa->o_valid |= OBD_MD_FLCOOKIE; -#endif - - rc2 = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, - body->eadatasize); - if (rc2 < 0) { - CERROR("Error unpack md %p\n", lmm); - GOTO(out_free_req, rc2 = 0); + rc2 = mds_unlink(obd, dchild, child_inode, pending_dir); + if (rc2 == 0) { + item ++; + CDEBUG(D_ERROR, "removed orphan %s object successfully!\n", + ptr->d_name); } else { - LASSERT(rc2 >= sizeof(*lsm)); - rc2 = 0; - } - - oa = obdo_alloc(); - if (oa == NULL) - GOTO(err_alloc_oa, rc2 = -ENOMEM); - - oa->o_id = lsm->lsm_object_id; - oa->o_mode = child_inode->i_mode & S_IFMT; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE; - -#ifdef ENABLE_ORPHANS - if (oa->o_valid & OBD_MD_FLCOOKIE) { - oti.oti_logcookies = - lustre_msg_buf(req->rq_repmsg, 2, - sizeof(struct llog_cookie) * - lsm->lsm_stripe_count); - if (oti.oti_logcookies == NULL) - oa->o_valid &= ~OBD_MD_FLCOOKIE; - } -#endif - - rc2 = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti); - obdo_free(oa); - if (rc2) { - CERROR("destroy orphan objid 0x"LPX64" on ost error " - "%d\n", lsm->lsm_object_id, rc2); - GOTO(out_free_memmd, rc2 = 0); - } - item ++; - - CDEBUG(D_ERROR, "removed orphan %s object from ost\n", - ptr->d_name); - - handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL); - if (IS_ERR(handle)) { - rc2 = PTR_ERR(handle); - CERROR("error fsfilt_start: %d\n", rc2); - handle = NULL; - GOTO(err_alloc_oa, rc2); - } - rc2 = vfs_unlink(pending_dir, dchild); - if (rc2) { - CERROR("error unlinking orphan from PENDING directory"); - CERROR("%s: rc %d\n", ptr->d_name, rc2); - } - if (handle) { - int err = fsfilt_commit(obd, pending_dir, handle, 0); - if (err) { - CERROR("error committing orphan unlink: %d\n", - err); - rc2 = err; - GOTO(err_alloc_oa, rc2); - } + l_dput(dchild); + up(&pending_dir->i_sem); + GOTO(err_out, rc2); } - - CDEBUG(D_ERROR, "removed orphan %s from mds successfully!\n", - ptr->d_name); - -out_free_memmd: - obd_free_memmd(mds->mds_osc_exp, &lsm); -out_free_req: - OBD_FREE(req, sizeof(*req)); next: l_dput(dchild); up(&pending_dir->i_sem); @@ -288,17 +314,7 @@ err_open: mntput(mds->mds_vfsmnt); l_dput(mds->mds_pending_dir); goto err_pop; - err_alloc_dirent: filp_close(file, 0); goto err_pop; - -err_alloc_oa: - obd_free_memmd(mds->mds_osc_exp, &lsm); - OBD_FREE(req, sizeof(*req)); - -err_lov_conn: - l_dput(dchild); - up(&pending_dir->i_sem); - goto err_out; }