Whamcloud - gitweb
Make two functions for mds_cleanup_orphans, and put mds_log_op_unlink into
authortianying <tianying>
Mon, 13 Oct 2003 11:06:24 +0000 (11:06 +0000)
committertianying <tianying>
Mon, 13 Oct 2003 11:06:24 +0000 (11:06 +0000)
a transaction, remove unused mds_unlink_orphan func.

lustre/ldlm/ldlm_lib.c
lustre/mds/mds_unlink_open.c

index 03cca52..0d55df4 100644 (file)
@@ -457,8 +457,8 @@ void target_abort_recovery(void *data)
                 OBP(obd, postsetup)(obd);
 
         /* when recovery was abort, cleanup orphans for mds */
-        if (OBT(obd) && OBP(obd, postcleanup)) {
-                rc = OBP(obd, postcleanup)(obd);
+        if (OBT(obd) && OBP(obd, postrecov)) {
+                rc = OBP(obd, postrecov)(obd);
                 CERROR("Cleanup %d orphans after recovery was abort!\n", rc);
         }
 
@@ -742,17 +742,17 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                        obd->obd_name);
                 obd->obd_recovering = 0;
 
+                if (OBT(obd) && OBP(obd, postsetup))
+                        OBP(obd, postsetup)(obd);
+
                 /* when recovering finished, cleanup orphans for mds       */
-                /* there should be no orphan cleaned up for this condition */
-                if (OBT(obd) && OBP(obd, postcleanup)) {
+                if (OBT(obd) && OBP(obd, postrecov)) {
                         CERROR("cleanup orphans after all clients recovered\n");
-                        rc2 = OBP(obd, postcleanup)(obd);
-                        LASSERT(rc2 == 0);
+                        rc2 = OBP(obd, postrecov)(obd);
+                        //LASSERT(rc2 == 0);
+                        CERROR("cleanup %d orphans\n", rc2);
                 }
 
-                if (OBT(obd) && OBP(obd, postsetup))
-                        OBP(obd, postsetup)(obd);
-
                 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
                         DEBUG_REQ(D_ERROR, req, "delayed:");
index 351969f..491fda5 100644 (file)
@@ -101,6 +101,129 @@ out_lock:
         RETURN(rc);
 }
 
+static int mds_osc_destroy(struct mds_obd *mds, struct ptlrpc_request *request)
+{
+        struct mds_body *body;
+        struct lov_mds_md *lmm = NULL;
+        struct lov_stripe_md *lsm = NULL;
+        struct obd_trans_info oti = { 0 };
+        struct obdo *oa;
+        int rc;
+        ENTRY;
+
+        body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
+        if (!(body->valid & OBD_MD_FLEASIZE))
+                RETURN(0);
+        if (body->eadatasize == 0) {
+                CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
+                RETURN(rc = -EPROTO); 
+        }
+
+        lmm = lustre_msg_buf(request->rq_repmsg, 1, body->eadatasize);
+        LASSERT(lmm != NULL);
+
+        rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize);
+        if (rc < 0) {
+                CERROR("Error unpack md %p\n", lmm);
+                RETURN(rc);
+        } else {
+                LASSERT(rc >= sizeof(*lsm));
+                rc = 0;
+        }
+
+        oa = obdo_alloc();
+        if (oa == NULL)
+                GOTO(out_free_memmd, rc = -ENOMEM);
+        oa->o_id = lsm->lsm_object_id;
+        oa->o_mode = body->mode & S_IFMT;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
+
+#ifdef ENABLE_ORPHANS
+        if (body->valid & OBD_MD_FLCOOKIE) {
+                oa->o_valid |= OBD_MD_FLCOOKIE;
+                oti.oti_logcookies = 
+                        lustre_msg_buf(request->rq_repmsg, 2,
+                                       sizeof(struct llog_cookie) *
+                                       lsm->lsm_stripe_count);
+                if (oti.oti_logcookies == NULL)
+                        oa->o_valid &= ~OBD_MD_FLCOOKIE;
+                        body->valid &= ~OBD_MD_FLCOOKIE;
+        }
+#endif
+
+        rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
+        obdo_free(oa);
+        if (rc) 
+                CERROR("destroy orphan objid 0x"LPX64" on ost error "
+                       "%d\n", lsm->lsm_object_id, rc);
+out_free_memmd:
+        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        RETURN(rc);
+}
+
+static int mds_unlink(struct obd_device *obd, struct dentry *dchild,
+                      struct inode *inode, struct inode *pending_dir)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct mds_body *body;
+        void *handle = NULL;
+        struct ptlrpc_request *req;
+        int lengths[3] = {sizeof(struct mds_body),
+                          mds->mds_max_mdsize,
+                          mds->mds_max_cookiesize};
+        int rc;
+        ENTRY;
+
+        LASSERT(mds->mds_osc_obd != NULL);
+        OBD_ALLOC(req, sizeof(*req));
+        if (!req) {
+                CERROR("request allocation out of memory\n");
+                GOTO(err_alloc_req, rc = -ENOMEM);
+        }
+        rc = lustre_pack_msg(3, lengths, NULL, &req->rq_replen,
+                                      &req->rq_repmsg);
+        if (rc) {
+                CERROR("cannot pack request %d\n", rc);
+                GOTO(out_free_req, rc);
+        }
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+        LASSERT(body != NULL);
+
+        mds_pack_inode2body(body, inode);
+        mds_pack_md(obd, req->rq_repmsg, 1, body, inode);
+
+        handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK_LOG, NULL);
+        if (IS_ERR(handle)) {
+                rc = PTR_ERR(handle);
+                CERROR("error fsfilt_start: %d\n", rc);
+                handle = NULL;
+                GOTO(out_free_req, rc);
+        }
+        rc = vfs_unlink(pending_dir, dchild);
+        if (rc) 
+                CERROR("error unlinking orphan from PENDING directory");
+
+#ifdef ENABLE_ORPHANS
+        if ((body->valid & OBD_MD_FLEASIZE)) {
+                if (mds_log_op_unlink(obd, inode, req->rq_repmsg, 1) > 0)
+                        body->valid |= OBD_MD_FLCOOKIE;
+        }
+#endif
+        if (handle) {
+                int err = fsfilt_commit(obd, pending_dir, handle, 0);
+                if (err) {
+                        CERROR("error committing orphan unlink: %d\n",
+                               err);
+                        rc = err;
+                        GOTO(out_free_req, rc);
+                }
+        }
+        rc = mds_osc_destroy(mds, req);
+out_free_req:
+        OBD_FREE(req, sizeof(*req));
+err_alloc_req:
+        RETURN(rc);
+}
 
 int mds_cleanup_orphans(struct obd_device *obd)
 {
@@ -111,16 +234,6 @@ int mds_cleanup_orphans(struct obd_device *obd)
         struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
         struct l_linux_dirent *dirent, *ptr;
         unsigned int count = pending_dir->i_size;
-        void *handle = NULL;
-        struct lov_mds_md *lmm = NULL;
-        struct lov_stripe_md *lsm = NULL;
-        struct obd_trans_info oti = { 0 };
-        struct obdo *oa;
-        struct ptlrpc_request *req;
-        struct mds_body *body;
-        int lengths[3] = {sizeof(struct mds_body),
-                          mds->mds_max_mdsize,
-                          mds->mds_max_cookiesize};
         int rc = 0, rc2 = 0, item = 0;
         ENTRY;
 
@@ -172,105 +285,18 @@ int mds_cleanup_orphans(struct obd_device *obd)
                         GOTO(next, rc2 = 0);
                 }
 
-                CDEBUG(D_ERROR, "cleanup orphan %s start on mds and ost:\n",
+                CDEBUG(D_ERROR, "start to remove %s from mds and ost!\n",
                        ptr->d_name);
-
-                LASSERT(mds->mds_osc_obd != NULL);
-
-                OBD_ALLOC(req, sizeof(*req));
-                if (!req) {
-                        CERROR("request allocation out of memory\n");
-                        GOTO(err_lov_conn, rc2 = -ENOMEM);
-                }
-                rc2 = lustre_pack_msg(3, lengths, NULL, &req->rq_replen,
-                                      &req->rq_repmsg);
-                if (rc2) {
-                        CERROR("cannot pack request %d\n", rc2);
-                        OBD_FREE(req, sizeof(*req));
-                        GOTO(out_free_req, rc2);
-                }
-                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
-                LASSERT(body != NULL);
-
-                mds_pack_inode2body(body, child_inode);
-                mds_pack_md(obd, req->rq_repmsg, 1, body, child_inode);
-                lmm = lustre_msg_buf(req->rq_repmsg, 1, 0);
-
-#ifdef ENABLE_ORPHANS
-                if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg, 1) > 0)
-                        oa->o_valid |= OBD_MD_FLCOOKIE;
-#endif
-
-                rc2 = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm,
-                                   body->eadatasize);
-                if (rc2 < 0) {
-                        CERROR("Error unpack md %p\n", lmm);
-                        GOTO(out_free_req, rc2 = 0);
+                rc2 = mds_unlink(obd, dchild, child_inode, pending_dir);
+                if (rc2 == 0) {
+                        item ++;
+                        CDEBUG(D_ERROR, "removed orphan %s object successfully!\n",
+                               ptr->d_name);
                 } else {
-                        LASSERT(rc2 >= sizeof(*lsm));
-                        rc2 = 0;
-                }
-
-                oa = obdo_alloc();
-                if (oa == NULL)
-                        GOTO(err_alloc_oa, rc2 = -ENOMEM);
-
-                oa->o_id = lsm->lsm_object_id;
-                oa->o_mode = child_inode->i_mode & S_IFMT;
-                oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
-
-#ifdef ENABLE_ORPHANS
-                if (oa->o_valid & OBD_MD_FLCOOKIE) {
-                        oti.oti_logcookies =
-                                lustre_msg_buf(req->rq_repmsg, 2,
-                                               sizeof(struct llog_cookie) *
-                                               lsm->lsm_stripe_count);
-                        if (oti.oti_logcookies == NULL)
-                                oa->o_valid &= ~OBD_MD_FLCOOKIE;
-                }
-#endif
-
-                rc2 = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
-                obdo_free(oa);
-                if (rc2) {
-                        CERROR("destroy orphan objid 0x"LPX64" on ost error "
-                               "%d\n", lsm->lsm_object_id, rc2);
-                        GOTO(out_free_memmd, rc2 = 0);
-                }
-                item ++;
-
-                CDEBUG(D_ERROR, "removed orphan %s object from ost\n",
-                       ptr->d_name);
-
-                handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL);
-                if (IS_ERR(handle)) {
-                        rc2 = PTR_ERR(handle);
-                        CERROR("error fsfilt_start: %d\n", rc2);
-                        handle = NULL;
-                        GOTO(err_alloc_oa, rc2);
-                }
-                rc2 = vfs_unlink(pending_dir, dchild);
-                if (rc2) {
-                        CERROR("error unlinking orphan from PENDING directory");
-                        CERROR("%s: rc %d\n", ptr->d_name, rc2);
-                }
-                if (handle) {
-                        int err = fsfilt_commit(obd, pending_dir, handle, 0);
-                        if (err) {
-                                CERROR("error committing orphan unlink: %d\n",
-                                       err);
-                                rc2 = err;
-                                GOTO(err_alloc_oa, rc2);
-                        }
+                        l_dput(dchild); 
+                        up(&pending_dir->i_sem);
+                        GOTO(err_out, rc2);
                 }
-
-                CDEBUG(D_ERROR, "removed orphan %s from mds successfully!\n",
-                       ptr->d_name);
-
-out_free_memmd:
-                obd_free_memmd(mds->mds_osc_exp, &lsm);
-out_free_req:
-                OBD_FREE(req, sizeof(*req));
 next:
                 l_dput(dchild);
                 up(&pending_dir->i_sem);
@@ -288,17 +314,7 @@ err_open:
         mntput(mds->mds_vfsmnt);
         l_dput(mds->mds_pending_dir);
         goto err_pop;
-
 err_alloc_dirent:
         filp_close(file, 0);
         goto err_pop;
-
-err_alloc_oa:
-        obd_free_memmd(mds->mds_osc_exp, &lsm);
-        OBD_FREE(req, sizeof(*req));
-
-err_lov_conn:
-        l_dput(dchild);
-        up(&pending_dir->i_sem);
-        goto err_out;
 }